Skip to content

feat(messaging): JSON-RPC namespace for live checkpoint management#573

Open
kariy wants to merge 12 commits into
mainfrom
feat/messaging-checkpoint-rpc
Open

feat(messaging): JSON-RPC namespace for live checkpoint management#573
kariy wants to merge 12 commits into
mainfrom
feat/messaging-checkpoint-rpc

Conversation

@kariy
Copy link
Copy Markdown
Member

@kariy kariy commented May 22, 2026

Why this exists

The messaging service persists a (block, tx_index) checkpoint after every successful L1Handler pool insert, and on restart resume_cursor reads that checkpoint and resumes one position past it. Because commit_message only ever advances the checkpoint monotonically and the MessageStream's in-memory cursor only ever moves forward, an operator who discovers that messages from earlier blocks were missed — settlement-chain RPC bug, contract upgrade that briefly changed the event schema, misconfigured starting block, anything — has no recovery mechanism short of dropping the database. This PR closes that hole by adding a JSON-RPC messaging namespace (getCheckpoint, setCheckpoint, resetCheckpoint) that reads and mutates the persisted checkpoint, and crucially also reaches into the running drain task to rewind its in-memory cursor live, so the recovery doesn't require a node restart.

Design choices, and why

The first question was how aggressively the RPC should affect a running messenger. A DB-only mutation that takes effect on the next restart would have been the simplest implementation, but it would have made every recovery a planned-downtime event. The opposite extreme — stopping and respawning the messaging task from inside the RPC handler — couples the RPC layer to messaging lifecycle and risks dropping in-flight gather results. The chosen middle path is a mpsc::Sender owned by a MessagingController (which the RPC handler holds a clone of) that signals the drain task to rewind. The task picks up the signal as a third tokio::select! arm alongside messenger.next() and shutdown_rx, and dispatches into a new Messenger::rewind(block, tx_index) method that sets the cursor fields on MessageStream and resets phase to Idle. Resetting the phase is what makes mid-gather rewind correct: any in-flight gather future is dropped and re-issued on the next tick from the new cursor, which avoids a race where the abandoned gather's result.to_block + 1 would have silently overwritten the rewind on completion. The alternative of tracking a rewind_pending flag and suppressing the cursor advance was rejected as adding state-machine complexity for the modest gain of preserving one HTTP round trip's worth of work that we're about to re-do anyway.

The control channel is mpsc::channel(1) with send().await. Rewinds don't burst in practice and natural back-pressure is preferable to a deep queue whose oldest entries become irrelevant the moment a newer rewind lands. When the receiver has been dropped — meaning the drain task is gone — the controller logs a warning and returns Ok. Failing the RPC would be misleading because the DB write is the source of truth: if the task is down, the next start will pick up the new checkpoint through resume_cursor anyway.

getCheckpoint returns the DB value, not the live in-memory cursor. This matches resume_cursor's contract and is the value an operator actually wants to reason about ("what's the last message we successfully processed?"). Exposing the in-memory cursor would surface a distinct concept ("what's the last block we've inspected, even if it had no messages?") that's less actionable and would diverge from the persisted state in ways that are confusing during recovery. setCheckpoint(100, 5) follows the existing "last processed" convention, so the live cursor is advanced to (100, 6) to match what a restart would do. resetCheckpoint semantically means "no prior checkpoint", which is distinct from setCheckpoint(0, 0) (that would resume at (0, 1), skipping tx 0 of block 0). To distinguish those two states correctly the provider trait gains a delete_messaging_checkpoint method backed by a real row deletion on the MessagingCheckpoints table.

The Messenger trait previously had a blanket impl for any Stream<Item = MessagingOutcome> + Send + Unpin. That impl had to come off so we could add rewind and have a single explicit implementation for MessageStream<C, T>. The blanket only had one in-crate consumer (Box<dyn Messenger> in service.rs), so removing it is safe; the MockCollector test path still satisfies the trait through the explicit impl.

MessagingService::start stays &self so it composes the same way as the rest of the messaging surface. The single-take semantic for the rewind receiver is preserved by storing it in a std::sync::Mutex<Option<Receiver>> and .lock().take()-ing on the first call; a second call sees None and returns an error. Mutex contention is a non-issue — start() runs once at boot, and the controller never touches the receiver side. Cloning a MessagingService produces an instance with an empty receiver mutex; the sender is still shared via channel cloning, so the clone can hand out controllers, but it can never itself be started — which matches the intent (you start one service, you can mint many controllers).

MessagingService is generic over the transaction pool type — MessagingService<P, Pl = TxPool>. That genericity landed on main independently while this branch was in review, so the branch now rides on it rather than introducing it; the TxPool default keeps the node wiring unchanged. What this branch contributes is a reason to exercise it: the lifecycle tests build a Pool<ExecutableTxWithHash, NoopValidator<_>, FiFo<_>> (a no-op-validator pool from katana-pool) so they can construct a MessagingService in isolation — the real TxPool needs a TxValidator with a state provider, block env, chain spec and class cache, none reachable from katana-messaging's dep graph — and assert the start-twice / clone-cannot-start / sender-dropped invariants directly.

set_checkpoint uses tx_index.saturating_add(1) rather than + 1 to avoid a debug-build panic / release-build wrap when an operator (theoretically) passes u64::MAX. It's a defensive one-liner; u64::MAX tx indices are not a real scenario, but the alternative is a panic in a production RPC handler.

The new namespace is messaging, registered behind a RpcModuleKind::Messaging variant. Per the implement-rpc-api skill the codebase already follows, one namespace per logical domain is the convention — extending katana (the existing user-facing namespace) would have mixed admin and user methods, and extending dev would have implied the methods are dev-only, when in fact they're a production operator tool.

MessagingApiError deviates from the #[repr(i32)] discriminant pattern that the existing katana.rs error enum uses, because variants here carry String payloads and Rust's enum-with-data + repr-discriminant combination requires explicit as i32 casts on every match arm. An equivalent code() method preserves the same wire codes (200/201) without that boilerplate. Similarly, the From<provider_api::MessagingCheckpoint> conversion lives in rpc-server rather than rpc-types, because provider-api → rpc-types would have closed a dependency cycle. The wire type itself still lives in rpc-types where the skill expects it.

Test coverage spans unit and integration layers. Unit tests cover the controller, the provider's delete_messaging_checkpoint, the stream's rewind in every phase (Idle, mid-CheckingBlock, mid-Gathering via a pending-future helper on MockCollector), and the service lifecycle invariants (start()-twice errors, clones cannot be started, a dropped rewind sender doesn't kill the drain task). Integration coverage is split to match the layer under test: the RPC-contract tests (getCheckpoint/setCheckpoint/resetCheckpoint round-trips plus the two namespace-registration wiring checks) live in crates/rpc/rpc-server/tests/messaging.rs against a TestNode, while the Anvil-backed live-rewind behaviors (re-gather after setCheckpoint, resume-from-from_block after resetCheckpoint, and pool dedup absorbing the re-gather without duplicating L2 txs) live in crates/messaging/tests/checkpoint.rs, driven through MessagingController against an isolated MessagingService — matching the messaging-integration-test consolidation introduced by #574.

🤖 Generated with Claude Code

@kariy kariy force-pushed the feat/messaging-checkpoint-rpc branch from cff52c2 to bca94f8 Compare May 22, 2026 18:02
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

Codec benchmark diff vs main

Benchmark Baseline (ns) Current (ns) Δ
CompiledClass(fixture)/compress 2697939 2696749 -0.04%
CompiledClass(fixture)/decompress 2937212 2918554 -0.64%
ExecutionCheckpoint/compress 34 34 +0.00%
ExecutionCheckpoint/decompress 25 25 +0.00%
PruningCheckpoint/compress 34 34 +0.00%
PruningCheckpoint/decompress 25 25 +0.00%
VersionedHeader/compress 648 645 -0.46%
VersionedHeader/decompress 823 819 -0.49%
StoredBlockBodyIndices/compress 76 76 +0.00%
StoredBlockBodyIndices/decompress 36 36 +0.00%
StorageEntry/compress 166 160 -3.61%
StorageEntry/decompress 150 139 -7.33%
ContractNonceChange/compress 152 161 +5.92%
ContractNonceChange/decompress 245 233 -4.90%
ContractClassChange/compress 216 200 -7.41%
ContractClassChange/decompress 258 263 +1.94%
ContractStorageEntry/compress 158 152 -3.80%
ContractStorageEntry/decompress 320 322 +0.63%
GenericContractInfo/compress 139 141 +1.44%
GenericContractInfo/decompress 104 104 +0.00%
Felt/compress 86 81 -5.81%
Felt/decompress 57 57 +0.00%
BlockHash/compress 81 82 +1.23%
BlockHash/decompress 57 57 +0.00%
TxHash/compress 81 81 +0.00%
TxHash/decompress 57 56 -1.75%
ClassHash/compress 82 80 -2.44%
ClassHash/decompress 57 56 -1.75%
CompiledClassHash/compress 81 81 +0.00%
CompiledClassHash/decompress 57 57 +0.00%
BlockNumber/compress 47 47 +0.00%
BlockNumber/decompress 25 25 +0.00%
TxNumber/compress 47 47 +0.00%
TxNumber/decompress 25 25 +0.00%
FinalityStatus/compress 1 1 +0.00%
FinalityStatus/decompress 12 12 +0.00%
TypedTransactionExecutionInfo/compress 15511 17873 +15.23%
TypedTransactionExecutionInfo/decompress 3532 3542 +0.28%
VersionedContractClass/compress 375 372 -0.80%
VersionedContractClass/decompress 788 803 +1.90%
MigratedCompiledClassHash/compress 149 145 -2.68%
MigratedCompiledClassHash/decompress 151 141 -6.62%
ContractInfoChangeList/compress 1695 1791 +5.66%
ContractInfoChangeList/decompress 2231 2231 +0.00%
BlockChangeList/compress 712 757 +6.32%
BlockChangeList/decompress 897 916 +2.12%
ReceiptEnvelope/compress 29944 29758 -0.62%
ReceiptEnvelope/decompress 5986 6313 +5.46%
TrieDatabaseValue/compress 164 164 +0.00%
TrieDatabaseValue/decompress 229 217 -5.24%
TrieHistoryEntry/compress 307 294 -4.23%
TrieHistoryEntry/decompress 251 248 -1.20%

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 22, 2026

Runner: AMD EPYC 7763 64-Core Processor (4 cores) · 15Gi RAM

@codecov
Copy link
Copy Markdown

codecov Bot commented May 23, 2026

Codecov Report

❌ Patch coverage is 95.30917% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.93%. Comparing base (9bde0ae) to head (0b7d3f9).
⚠️ Report is 424 commits behind head on main.

Files with missing lines Patch % Lines
crates/rpc/rpc-api/src/error/messaging.rs 0.00% 8 Missing ⚠️
...torage/provider/provider/src/providers/fork/mod.rs 0.00% 7 Missing ⚠️
crates/messaging/src/service.rs 95.29% 4 Missing ⚠️
crates/messaging/src/stream/mod.rs 98.63% 2 Missing ⚠️
crates/node/sequencer/src/lib.rs 94.11% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #573      +/-   ##
==========================================
- Coverage   73.32%   68.93%   -4.40%     
==========================================
  Files         209      324     +115     
  Lines       23132    45685   +22553     
==========================================
+ Hits        16961    31491   +14530     
- Misses       6171    14194    +8023     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

kariy and others added 12 commits May 25, 2026 22:11
Required for the upcoming `messaging_resetCheckpoint` RPC: setting the
checkpoint to (0, 0) doesn't truly reset since `resume_cursor` returns
`(0, 1)` for it. A row delete is the only way to make subsequent reads
fall back to the configured `default_from_block`.
Adds `MessagingController` that encapsulates DB checkpoint operations
and an mpsc-backed signal to live-rewind the running drain task's
in-memory cursor. The `Messenger` trait grows a `rewind` method
implemented on `MessageStream`; rewinding resets `phase` to `Idle`,
abandoning any in-flight gather (the next tick re-fetches from the new
cursor and pool dedup absorbs duplicates).

`MessagingServer::start` is now `&mut self` to .take() the receiver;
clones get `rewind_rx: None` so only the original can be started.
Three methods — `messaging_getCheckpoint`, `messaging_setCheckpoint`,
`messaging_resetCheckpoint` — let operators read and rewind the
persisted messaging checkpoint without restarting the node. The
handler delegates to a `MessagingController` cloned from the running
`MessagingServer`, so writes hit the DB and a signal triggers the
drain task to live-rewind its in-memory cursor.

`MessagingCheckpoint` is mirrored in `katana-rpc-types` rather than
re-exported from `katana-provider-api` to keep the RPC wire format
independent of provider internals (also avoids a dep cycle).
Adds `RpcModuleKind::Messaging` (included in `RpcModulesList::all()`)
and registers `MessagingApiHandler` against the running messaging
server's controller when the namespace is enabled. The messaging
server is now constructed earlier in `build_with_provider` so its
controller can back the API; `launch` takes `&mut self` to pass a
mutable reference into `start()`.

Integration tests cover get/set/reset via the live RPC client against
a TestNode with messaging enabled — the drain task's gather calls fail
against a dummy URL but the controller paths exercised by the RPC
methods are unaffected.
Adds:
- Controller: overrides, u64::MAX saturation, reset-on-fresh idempotency,
  default_from_block snapshot semantics, committed-value reads.
- Stream: rewind during CheckingBlock / mid-Gathering future abandonment,
  fast-forward, same-cursor, and multiple-rewinds last-write-wins paths.

The MockCollector grows oneshot-backed `push_*_pending` variants so tests
can hold futures across a rewind. Also folds in a saturating_add fix to
`set_checkpoint` to prevent debug-build panic / release-build wrap when
tx_index is u64::MAX.
…g` namespace

Adds an Anvil-backed `checkpoint::live` mod that exercises the full
live-rewind path: setCheckpoint to a prior block forces a re-gather,
resetCheckpoint resumes from the configured from_block, and the L1->L2
DupSort index stays single-entry through round-trips (pool dedup contract).

Also adds a `checkpoint::wiring` mod verifying the `messaging` RPC
namespace registers only when both `config.messaging.is_some()` AND
`RpcModuleKind::Messaging` is present in `apis`.
Provider: multi-id delete preserves sibling rows; set→delete→set round-trip
leaves the table usable. Both guard against subtle MDBX cache or key-handling
bugs that a single-id test would miss.

RPC: a second `messaging_setCheckpoint` overwrites the first — last write wins.
…l type

Replaces the hard-coded `pool: TxPool` field with a generic `Pool` type
parameter bounded on `TransactionPool<Transaction = ExecutableTxWithHash>`,
unblocking lifecycle tests that previously couldn't construct a `TxValidator`
from `katana-messaging`'s dep graph. The sequencer continues to instantiate
`MessagingServer<P, TxPool>` so the running binary's behavior is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the three lifecycle tests that were skipped earlier because the
old hard-coded `pool: TxPool` field made `MessagingServer` impossible
to construct from `katana-messaging`'s own dep graph (a `TxValidator`
needs a state provider, block env, etc.). With the generic-pool refactor
in the prior commit, a `NoopValidator`-backed pool is enough to satisfy
the trait bounds and exercise:

- start() rejects a second call on the same instance
- a clone cannot be started (rewind_rx was scrubbed on clone)
- the drain task survives a fully-closed rewind channel

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… impl

`MessagingCheckpointProvider` no longer takes an `id: &str` on any of its
three methods — there's exactly one messaging checkpoint per node, and
exposing the underlying table key was leaking a storage implementation
detail through the provider API. The DB impl now holds a private
`MESSAGING_CHECKPOINT_KEY` constant; the trait surface is just
`messaging_checkpoint`, `set_messaging_checkpoint`, and
`delete_messaging_checkpoint`. Callers in `katana-messaging` drop their
own `CHECKPOINT_ID` constant accordingly. The `preserves_other_ids`
provider test is removed since the multi-tenant key concept no longer
exists at this layer.
…arness

Move the three Anvil-backed checkpoint tests out of the rpc-server messaging
suite (which no longer carries the alloy/sol dev-deps) into the isolated
katana-messaging harness, re-targeting them at MessagingController instead of
the JSON-RPC client.

Also fixes a rebase artifact in tests/common/mod.rs where
messaging_checkpoint() was still called with the old "messaging" id arg.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@kariy kariy force-pushed the feat/messaging-checkpoint-rpc branch from 75d8f8b to 0b7d3f9 Compare May 26, 2026 04:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant