Skip to content

[DB-1912] Persistent subscriptions: Fix wrong checkpoint when using pinned strategy#5497

Merged
shaan1337 merged 4 commits intomasterfrom
shaan1337/fix-wrong-ps-checkpoint
Apr 14, 2026
Merged

[DB-1912] Persistent subscriptions: Fix wrong checkpoint when using pinned strategy#5497
shaan1337 merged 4 commits intomasterfrom
shaan1337/fix-wrong-ps-checkpoint

Conversation

@shaan1337
Copy link
Copy Markdown
Member

Persistent subscription consumer strategies that can skip events can produce wrong checkpoints when events are skipped. The pinned consumer strategy is currently the only strategy that can skip events.

This PR fixes the following issues:

  • The logic for assigning sequence numbers / previous event positions to OutstandingMessage did not take skipped events into consideration - this could result into wrongly assigned sequence numbers / previous event positions and subsequently to wrong checkpoints too.
  • The checkpointing mechanism only considers messages in the retry list but skipped events were left in the buffer - this could also result into wrong checkpoints.

@shaan1337 shaan1337 force-pushed the shaan1337/fix-wrong-ps-checkpoint branch 5 times, most recently from 84ef7ea to a0ce7ec Compare February 10, 2026 06:05
@shaan1337 shaan1337 changed the title Persistent subscriptions: Fix wrong checkpoint when using pinned strategy [DB-1912] Persistent subscriptions: Fix wrong checkpoint when using pinned strategy Feb 10, 2026
@linear
Copy link
Copy Markdown

linear Bot commented Feb 10, 2026

@shaan1337 shaan1337 marked this pull request as ready for review February 10, 2026 06:08
@shaan1337 shaan1337 requested a review from a team as a code owner February 10, 2026 06:08
@timothycoleman
Copy link
Copy Markdown
Contributor

/review

@qodo-code-review
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Retry Semantics

When the consumer strategy returns Skipped, the code now calls MarkSent() and re-enqueues the message via AddRetry() without incrementing retry count. This changes delivery semantics: verify it cannot lead to duplicate in-flight tracking, reordering, starvation (e.g., repeatedly skipped messages cycling), or an ever-growing retry queue under sustained skipping.

private void TryPushingMessagesToClients() {
	lock (_lock) {
		if (!TryGetStreamBuffer(out var streamBuffer))
			return;

		foreach (StreamBuffer.OutstandingMessagePointer messagePointer in streamBuffer.Scan()) {
			(OutstandingMessage message, bool newSequenceNumberAssigned) =
				OutstandingMessage.ForPushedEvent(messagePointer.Message, _nextSequenceNumber, _lastKnownMessage);

			if (newSequenceNumberAssigned) {
				_lastKnownSequenceNumber = _nextSequenceNumber++;
				_lastKnownMessage = message.EventPosition;
			}

			ConsumerPushResult result =
				_pushClients.PushMessageToClient(message);
			if (result == ConsumerPushResult.Sent) {
				messagePointer.MarkSent();
				MarkBeginProcessing(message);
			} else if (result == ConsumerPushResult.Skipped) {
				// The consumer strategy skipped the message - add it back to the stream buffer as a retry.
				// we don't increment the retry count as it's an internal retry.
				messagePointer.MarkSent();
				streamBuffer.AddRetry(message);
			} else if (result == ConsumerPushResult.NoMoreCapacity) {
				return;
			}
Position Tracking

The update of last-known sequence/position is now based on message.EventPosition when a new sequence is assigned, whereas previous logic derived the position via EventSource.GetStreamPositionFor(message.ResolvedEvent). Confirm these are equivalent for all event sources (e.g., link events vs resolved events, category streams, multi-stream sources) and won’t regress checkpoint correctness.

	lock (_lock) {
		if (!TryGetStreamBuffer(out var streamBuffer))
			yield break;

		foreach (var messagePointer in streamBuffer.Scan().Take(count)) {
			messagePointer.MarkSent();
			(OutstandingMessage message, bool newSequenceNumberAssigned) =
				OutstandingMessage.ForPushedEvent(messagePointer.Message, _nextSequenceNumber, _lastKnownMessage);

			if (newSequenceNumberAssigned) {
				_lastKnownSequenceNumber = _nextSequenceNumber++;
				_lastKnownMessage = message.EventPosition;
			}

			MarkBeginProcessing(message);
			yield return (messagePointer.Message.ResolvedEvent, messagePointer.Message.RetryCount);
		}
	}
}

private void MarkBeginProcessing(OutstandingMessage message) {
	_statistics.IncrementProcessed();

	StartMessage(message,
		_settings.MessageTimeout == TimeSpan.MaxValue
			? DateTime.MaxValue
			: DateTime.UtcNow + _settings.MessageTimeout);
Test Robustness

The new test relies on hashing/partitioning to route events to specific clients and on precise buffering behavior. Consider validating/locking down the routing assumption (e.g., assert which client each event is intended for) to avoid flakiness if the hashing implementation/seed or pinned strategy behavior changes.

public class CheckpointingWithSkippedEvents {
	[Test]
	public void checkpointing_works_when_events_are_skipped_by_the_consumer_strategy() {
		var categoryVersion = 0;
		var stream1Version = 0;
		var stream2Version = 0;

		IPersistentSubscriptionStreamPosition checkpoint = null;
		var client1Envelope = new FakeEnvelope();
		var client2Envelope = new FakeEnvelope();
		var reader = new FakeCheckpointReader();
		const string subscriptionStream = "$ce-streamName";
		var sub = new KurrentDB.Core.Services.PersistentSubscription.PersistentSubscription(
			PersistentSubscriptionToStreamParamsBuilder.CreateFor(subscriptionStream, "groupName")
				.WithEventLoader(new FakeStreamReader())
				.WithCheckpointReader(reader)
				.WithCheckpointWriter(new FakeCheckpointWriter(x => checkpoint = x))
				.WithMessageParker(new FakeMessageParker())
				.MinimumToCheckPoint(1)
				.MaximumToCheckPoint(1)
				.CustomConsumerStrategy(new PinnedPersistentSubscriptionConsumerStrategy(new XXHashUnsafe()))
				.StartFromCurrent());
		reader.Load(null);

		var correlationId = Guid.NewGuid();
		sub.AddClient(Guid.NewGuid(), Guid.NewGuid(), "connection-1", client1Envelope, maxInFlight: 1, "foo", "bar");
		sub.AddClient(Guid.NewGuid(), Guid.NewGuid(), "connection-2", client2Envelope, maxInFlight: 1000, "foo", "bar");

		// write three messages intended for client 1
		var message_1_1 = WriteEventForStream1(); // pushed immediately
		var message_1_2 = WriteEventForStream1(); // buffered - it's skipped by the consumer strategy as `maxInFlight` = 1
		var message_1_3 = WriteEventForStream1(); // buffered - it's skipped by the consumer strategy as `maxInFlight` = 1

		// write three messages intended for client 2
		var message_2_1 = WriteEventForStream2(); // pushed immediately
		var message_2_2 = WriteEventForStream2(); // pushed immediately
		var message_2_3 = WriteEventForStream2(); // pushed immediately

		Assert.That(client1Envelope.Replies.Count, Is.EqualTo(1));
		Assert.That(client2Envelope.Replies.Count, Is.EqualTo(3));
		Assert.True(sub.TryGetStreamBuffer(out var streamBuffer));
		Assert.That(streamBuffer.BufferCount, Is.EqualTo(2));
		Assert.AreEqual(null, checkpoint);

		// checkpoint should be null - no messages have been acknowledged yet
		Assert.IsNull(checkpoint);

		// acknowledge message_1_1 for client 1
		sub.AcknowledgeMessagesProcessed(correlationId, [
			message_1_1,
		]);

		// checkpoint should now be at #0
		Assert.AreEqual(new PersistentSubscriptionSingleStreamPosition(0), checkpoint);

		// acknowledge all the messages for client 2
		sub.AcknowledgeMessagesProcessed(correlationId, [
			message_2_1,
			message_2_2,
			message_2_3,
		]);

		// checkpoint should still be at #0, as message_1_2 has not been acknowledged yet
		Assert.AreEqual(new PersistentSubscriptionSingleStreamPosition(0), checkpoint);

		// acknowledge message_1_2.
		sub.AcknowledgeMessagesProcessed(correlationId, [
			message_1_2,
		]);

		// checkpoint should now be at #1, as message_1_3 has not been acknowledged yet
		Assert.AreEqual(new PersistentSubscriptionSingleStreamPosition(1), checkpoint);

		// acknowledge message_1_3.
		sub.AcknowledgeMessagesProcessed(correlationId, [
			message_1_3,
		]);

		// checkpoint should now be at #5, as all messages have been acknowledged
		Assert.AreEqual(new PersistentSubscriptionSingleStreamPosition(5), checkpoint);

		Guid WriteEventForStream1() {
			var id = Guid.NewGuid();
			sub.NotifyLiveSubscriptionMessage(Helper.BuildLinkEvent(id, subscriptionStream, categoryVersion++,
				Helper.BuildFakeEvent(Guid.NewGuid(), "type", "streamName-1", stream1Version++), false));
			return id;
		}

		Guid WriteEventForStream2() {
			var id = Guid.NewGuid();
			sub.NotifyLiveSubscriptionMessage(Helper.BuildLinkEvent(id, subscriptionStream, categoryVersion++,
				Helper.BuildFakeEvent(Guid.NewGuid(), "type", "streamName-2", stream2Version++), false));
			return id;
		}
	}
📄 References
  1. No matching references available

shaan1337 and others added 4 commits April 8, 2026 15:58
Co-Authored-By: Timothy Coleman <timothy.coleman@gmail.com>
… the consumer strategy skips events

* When a consumer strategy skips an event, don't keep it in the buffer but add it to the retry list instead.
The checkpointing mechanism considers the lowest message in `_retry` - not the lowest message in `_buffer`.
Thus leaving it in the buffer may result in a wrong checkpoint being produced.

* Always update the last known event when a new sequence number has been assigned
A new sequence number being assigned means that we are seeing an event that was not processed before.
In other words, it's the last event we've read from the source stream,
so we update the last known sequence number / last known event position accordingly
the newSequenceNumberAssigned check isn't enough, because in the NoMoreCapacity case we discard the message that the _lastKnown vars are based on.
then next time we call OutstandingMessage.ForPushedEvent the _lastKnownMessage we pass in will be wrong, potentially leading to an incorrect checkpoint (see test)
- safer because Scan() says we shouldn't add to each linked list while scanning it. before this commit we could add to _retry while scanning through it. now we only add to _retry while scanning _buffer
- more efficient because before this commit adding to _retry involved searching through it, now we only add to _retry when skipping an event in _buffer in which case we know the event needs to go to the back of retry.
@timothycoleman timothycoleman force-pushed the shaan1337/fix-wrong-ps-checkpoint branch from a0ce7ec to ad80b7e Compare April 8, 2026 15:00
@timothycoleman
Copy link
Copy Markdown
Contributor

Originally (before pinned strategies)

  • Events were leaving the _buffer in order, and assigned sequence numbers as they did so.
  • Checkpointing worked by looking at the lowest sequence number that is in flight or waiting to be retried. i.e. the lowest one that we've started but not finished.

With the addition of pinned strategies

  • Introduced 'Skipped' consumer push result, where we leave an event in the _buffer (with no sequence number) if its consumer is too busy.
  • Events can therefore leave the buffer out of order and receive sequence numbers out of order.
  • Checkpointing unchanged, does not consider events in _buffer that were skipped, only in-flight and retries.

Problems (these only affect pinned strategies):

  1. skipped events, when they are pushed to client later, will be assigned a sequence number out of order, which is not suitable for checkpointing.
  2. skipped events are not visible to the checkpointing mechanism

This PR

To solve this, we

  1. ensure that the sequence numbers are assigned in order even when an event is skipped
  2. ensure that skipped events are visible to the checkpoint mechanism

The easiest way to achieve both these things is, when we skip an event we give it a sequence number and move it to the _retry list immediately. This is natural because conceptually the StreamBuffer of events ready to push is _retry ++ _buffer.

Alternatively, when skipping an event we could give it a sequence number and leave it in the _buffer, and give the checkpointer a way to check the buffer (it would only check the initial section that have sequence numbers, not the whole buffer), but this seems unnecessarily complicated.

Note that:

  1. Backpressure is maintained because StreamBuffer.CanAccept considers the count of _buffer and _retry
  2. Stats will show a higher number of events in retry. this is reasonable enough because although we didn't send them to the client, but we did try to.

@shaan1337 shaan1337 merged commit 451e890 into master Apr 14, 2026
86 checks passed
@shaan1337 shaan1337 deleted the shaan1337/fix-wrong-ps-checkpoint branch April 14, 2026 06:36
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 @shaan1337 Failed to create cherry Pick PR due to error:

RequestError [HttpError]: Merge conflict
   at /home/runner/work/_actions/kurrent-io/Automations/master/cherry-pick-pr-for-label/node_modules/@octokit/request/dist-node/index.js:66:23
   at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
 status: '409',
 headers: {
   'access-control-allow-origin': '*',
   'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset',
   'content-length': '127',
   'content-security-policy': "default-src 'none'",
   'content-type': 'application/json; charset=utf-8',
   date: 'Tue, 14 Apr 2026 06:37:10 GMT',
   'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin',
   server: 'github.com',
   'strict-transport-security': 'max-age=31536000; includeSubdomains; preload',
   vary: 'Accept-Encoding, Accept, X-Requested-With',
   'x-accepted-github-permissions': 'contents=write',
   'x-content-type-options': 'nosniff',
   'x-frame-options': 'deny',
   'x-github-api-version-selected': '2022-11-28',
   'x-github-media-type': 'github.v3; format=json',
   'x-github-request-id': 'F401:230BC3:2136B40:88AA66D:69DDE095',
   'x-ratelimit-limit': '5000',
   'x-ratelimit-remaining': '4990',
   'x-ratelimit-reset': '1776152226',
   'x-ratelimit-resource': 'core',
   'x-ratelimit-used': '10',
   'x-xss-protection': '0'
 },
 request: {
   method: 'POST',
   url: 'https://api.github.com/repos/kurrent-io/KurrentDB/merges',
   headers: {
     accept: 'application/vnd.github.v3+json',
     'user-agent': 'octokit-core.js/3.3.2 Node.js/20.20.1 (linux; x64)',
     authorization: 'token [REDACTED]',
     'content-type': 'application/json; charset=utf-8'
   },
   body: '{"base":"cherry-pick-cherry-pick/5497/shaan1337/fix-wrong-ps-checkpoint-release/v24.10-01e7b37d-b44d-41d0-89ef-e650209d5b0a","commit_message":"Merge c07492e966e97aff3cf4fff0f27e51c960d21509 into cherry-pick-cherry-pick/5497/shaan1337/fix-wrong-ps-checkpoint-release/v24.10-01e7b37d-b44d-41d0-89ef-e650209d5b0a [skip ci]\\n\\n\\nskip-checks: true\\n","head":"c07492e966e97aff3cf4fff0f27e51c960d21509"}',
   request: { agent: [Agent], hook: [Function: bound bound register] }
 },
 documentation_url: 'https://docs.github.com/rest/branches/branches#merge-a-branch'
}

🚨👉 Check https://github.com/kurrent-io/KurrentDB/actions/runs/24384744335

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaan1337 👉 Created pull request targeting release/v26.0: #5578

alexeyzimarev added a commit that referenced this pull request Apr 16, 2026
* [DB-1964]: Added migration engine for secondary index tables (#5561)

* Initial version of the index DB migration engine

* Fixed regression

* Row must by passed by ref because its instance method mutates the struct fields

* Create schema from script only when fresh setup

* Removed version setup from the script file

* Fixed regression

* Removed redundant migration action

* Fixed occasional NRE

* Align guide with the source code

* Fixed typo

* Review feedback

* Fixed indentation

* Fixed identation

* [DB-1951] Add MSA idempotency tests that correspond to the existing behaviour (#5565)

* Add MSA idempotency tests that correspond to the existing behaviour

* chore: restructure CLAUDE.md with progressive disclosure and add architect review agent (#5572)

Restructure AI documentation for better context efficiency:
- CLAUDE.md reduced from 812 to 150 lines, keeping only rules that affect every coding decision
  (threading model, C# conventions, log levels, parameter design, naming)
- Extracted reference docs to .claude/docs/ with clear "when to fetch" pointers:
  architecture.md, api-v2-patterns.md, testing.md, protocol-v2.md, patterns-and-conventions.md
- New content from Tim Coleman's projections-v2 review (PR #5562):
  - TcsEnvelope<> vs CallbackEnvelope threading rule (was showing anti-pattern as correct)
  - Non-optional parameters / no-fallbacks convention
  - ISystemClient preference over raw IPublisher
  - ClaimsPrincipal threading-through requirement
  - Projections V2 engine architecture and threading model
- Add architect-review agent (.claude/agents/architect-review.md) encoding
  Tim Coleman's review methodology for future agentic code reviews

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Treat writes as idempotent if all events were previously written (#5547)

An idempotent multi-stream write retry should succeed even if a
check-only stream's expected version no longer matches. The events are
already written and the check-only stream's state may have changed
since, ultimately we resend the same response to the client as they
should have received the first time they sent the request.

Previously, a check-only stream (failing or not) caused the entire retry
to fail with WrongExpectedVersion.

* Fix incorrect result stream name for root partition in V2 projections

The double-dash `$projections-{name}--result` was produced when the
partition key was empty. Now uses `ProjectionNamesBuilder.MakeResultStreamName`
— consistent with V1's `MakePartitionResultStreamName`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [DB-1912] Persistent subscriptions: Fix wrong checkpoint when using pinned strategy (#5497)

* Add failing test

Co-Authored-By: Timothy Coleman <timothy.coleman@gmail.com>

* Fixed: Wrong persistent subscription checkpoints can be produced when the consumer strategy skips events

* When a consumer strategy skips an event, don't keep it in the buffer but add it to the retry list instead.
The checkpointing mechanism considers the lowest message in `_retry` - not the lowest message in `_buffer`.
Thus leaving it in the buffer may result in a wrong checkpoint being produced.

* Always update the last known event when a new sequence number has been assigned
A new sequence number being assigned means that we are seeing an event that was not processed before.
In other words, it's the last event we've read from the source stream,
so we update the last known sequence number / last known event position accordingly

* fix: only update the _lastKnown vars when event leaves the buffer

the newSequenceNumberAssigned check isn't enough, because in the NoMoreCapacity case we discard the message that the _lastKnown vars are based on.
then next time we call OutstandingMessage.ForPushedEvent the _lastKnownMessage we pass in will be wrong, potentially leading to an incorrect checkpoint (see test)

* safer and more efficient

- safer because Scan() says we shouldn't add to each linked list while scanning it. before this commit we could add to _retry while scanning through it. now we only add to _retry while scanning _buffer
- more efficient because before this commit adding to _retry involved searching through it, now we only add to _retry when skipping an event in _buffer in which case we know the event needs to go to the back of retry.

---------

Co-authored-by: Timothy Coleman <timothy.coleman@gmail.com>

* Use $ProjectionState event type for V2 state checkpoints instead of Result

because what we are doing here is checkpointing not outputting results.

"Result" event type can be used when we implement outputState() which would emit the state after every event processed (at least, as long as the state changed)

* [DB-1872] Improve pinned persistent subscription performance under burst load (#5576)

* Improve persistent subscription performance under burst load

When events arrive in a burst, each NotifyLiveSubscriptionMessage call
was scanning the entire buffer to push events to clients. Client acks
are queued behind the event flood in the same FIFO queue, so clients
appear full and every scan is fruitless. With N events, this results
in O(N²) total work.

Instead of pushing synchronously on event arrival, we now schedule a
push message on the PS queue. This lets events accumulate in the
buffer cheaply, gives acks a chance to interleave, and batches the
buffer scan.

All PushToClients call sites use the deferred path for consistency,
since the buffer scan cost applies equally to acks, nacks, timeouts,
and client changes.

* update package ref for cve CVE-2026-26171

* [DB-2006]: Migration to major .NEXT version (#5581)

* Migration to major .NEXT version

* Fixed dependency

* Fixed regression

* Fixed regression

* Removed item index

* review: fix regression

* restore previous order

---------

Co-authored-by: Timothy Coleman <timothy.coleman@gmail.com>

* Write V2 state checkpoints to -state streams instead of -result streams

Result streams are for user-visible output (outputState()), not internal
state persistence. V2 was writing $ProjectionState events to -result
streams, conflating checkpointing with output.

State now goes to dedicated -state streams:
- $projections-{name}-state (root partition)
- $projections-{name}-{partition}-state (per partition)

This keeps -result streams clean for future outputState() support and
avoids collisions with the -checkpoint stream used for position.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Rename V2 state event type to $ProjectionState.V2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Rename V2 checkpoint event type to $ProjectionCheckpoint.V2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Remove V1-to-V2 migration test that was silently passing

V2 can't parse V1's checkpoint events (V1 stores state in data,
position in metadata; V2 expects position in data), so it falls
back to starting from the beginning. The test passed because V2
reprocessed everything from scratch, not because migration worked.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Rename ProjectionsStateStreamSuffix to ProjectionsResultStreamSuffix

The constant was confusingly named — it holds "-result", not "-state".

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* better name...

---------

Co-authored-by: Roman Sakno <roman.sakno@kurrent.io>
Co-authored-by: Alexey Zimarev <alex@zimarev.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Shaan Nobee <sniper111@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants