feat(messaging): expose pending (gathered-but-unpooled) messages via handle#575
Draft
kariy wants to merge 3 commits into
Draft
feat(messaging): expose pending (gathered-but-unpooled) messages via handle#575kariy wants to merge 3 commits into
kariy wants to merge 3 commits into
Conversation
Track messages gathered from the settlement chain but not yet accepted by the tx pool in a volatile in-memory buffer, queryable through MessagingServiceHandle::pending_messages(). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
56de2f4 to
416a0d6
Compare
Drop a redundant clone on a Copy `Address`, factor a complex test-helper return type into a `StreamHarness` alias, and inline `format!` args. These targets aren't linted by the `-p katana` CI clippy, so they had accumulated unnoticed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Extract the per-batch gather/insert/commit logic from the spawned task in `start()` into a `process_batch` async fn (behavior-preserving), so the buffer wiring is unit-testable without a real settlement chain. Add tests for the three paths the buffer cares about: pool accepts all (pending clears), pool rejects (messages stay pending), and partial failure (only the un-pooled tail stays pending). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds a way to observe messages that have been picked up from the settlement layer but not yet placed into the transaction pool — the in-flight window the service previously held only as a transient local variable, with no way to query it.
The buffer is in-memory and volatile rather than DB-backed because this state is genuinely transient: a message is "pending" only between being gathered and being accepted by the pool, and the existing L1→L2 index already records the durable post-insert outcome. Persisting it would conflate in-flight state with history. It is created fresh on each
start()and discarded on shutdown, so a query reflects only the currently running drain loop, never the past.Entries are keyed by their
(block, tx_index)position to keep the retry path correct: when a pool insert fails the batch breaks and the unprocessed tail is re-gathered on the next tick, and keying by position means those re-gathered messages overwrite their prior entry instead of duplicating. A message enters the buffer the moment its batch is gathered (it has been "picked up") and leaves the instant the pool accepts it, which is exactly the window in question.The buffer is guarded by a
std::sync::Mutexrather than an async lock because it is only ever held for brief synchronous insert/remove/snapshot operations, never across an.await. It is surfaced throughMessagingServiceHandle::pending_messages()as a snapshot ordered by position; an RPC endpoint is deliberately left out of this change so the core primitive can land first.