feat(indexer): exactly-once pipeline, backpressure, WebSocket fanout, gap detection#604
Merged
devJaja merged 7 commits intoJun 23, 2026
Conversation
Migration 006 introduces the exactly-once backbone: - raw_events (PK ledger_sequence, event_index) for idempotent staging - indexer_state.processed_cursor advanced only after the domain commit
…gap detection - pipeline.ts: stage raw_events + project to domain tables + advance cursor in a single serialisable transaction; ON CONFLICT DO NOTHING for idempotency - bus.ts: in-process EventEmitter pub/sub with typed + wildcard channels - ratelimit.ts: token-bucket limiter (default 10 req/s, env-configurable) - poller.ts: adaptive poll interval (double when idle, halve on full page) - gap.ts: mid-stream ledger gap detection
Rewrite streamEvents with backpressure (token bucket + adaptive interval), 429 exponential backoff that never drops events, and gap detection that emits a structured gap_detected log and backfills the missing range before continuing. Parses per-ledger event_index from the RPC event id.
- ws.ts: bridge the event bus to /ws clients as { type, payload } frames,
with type subscriptions, 15s ping/pong heartbeat, and clean disconnect
- index.ts: wire stream -> IngestPipeline -> bus -> WebSocket, resume from the
committed cursor, and serve /health + /ws over an HTTP server
- pipeline: crash between raw ingest and domain write -> no duplicate rows on restart; idempotent replay; bus publish only after commit - stream: 429 -> exponential backoff with no dropped events - ws: events reach clients within 200ms, including under a synthetic burst - unit tests for bus, token bucket, adaptive poller, and gap detection
- docs/indexer/WEBSOCKET_API.md: full /ws message schema - README + .env.example: exactly-once pipeline, backpressure, fanout, gap detection, and new env vars - add ws + @types/ws dependencies
|
@shaaibu7 is attempting to deploy a commit to the Jaja's projects Team on Vercel. A member of the Team first needs to authorize it. |
Contributor
|
Solid implementation @distributed-nerd , resolve the conflicts , fix the CI check failing |
devJaja
approved these changes
Jun 23, 2026
Contributor
|
Nice implementation @distributed-nerd LGTM |
5 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #533
Summary
Reworks the indexer's event stream into a production-grade, exactly-once pipeline with backpressure, in-process fanout, and gap detection. Replaces the fixed-interval
getEventspoll inservices/indexer/src/stream.ts.What changed
Exactly-once ingestion
raw_eventsstaging table (migration006_raw_events.sql):(id BIGSERIAL, ledger_sequence BIGINT, event_index INT, contract_id TEXT, topic TEXT[], data JSONB, processed_at TIMESTAMPTZ, PRIMARY KEY(ledger_sequence, event_index)).raw_events(ON CONFLICT DO NOTHING) → project into domain tables via the same tx client → advanceindexer_state.processed_cursoras the last statement beforeCOMMIT. A crash rolls everything back together, so replay yields no duplicate domain rows. (src/pipeline.ts)Backpressure & adaptive polling
RPC_RATE_LIMIT_PER_SEC. (src/ratelimit.ts)src/poller.ts)429responses trigger exponential backoff with retries; the window is re-fetched so no events are dropped.Internal pub/sub fanout
EventBus(NodeEventEmitter) with typed per-event-type channels plus a wildcard. (src/bus.ts)/wsWebSocket handler subscribes to the bus and pushes{ type, payload }JSON frames, with type subscriptions, 15s ping/pong heartbeat, and clean disconnection. (src/ws.ts)docs/indexer/WEBSOCKET_API.md.Gap detection
batch[0].ledger_sequence == cursor + 1. On a gap (e.g. RPC node failover mid-sequence) it emits a structuredgap_detectedlog and backfills the missing range before continuing. (src/gap.ts)Tests
429→ verified exponential backoff and no dropped events.npm test→ 95 passed (13 suites), including all pre-existing indexer handler tests.tsctypechecks clean.Acceptance criteria
Notes
domainProcessoris an injectable seam: the exactly-once mechanism is fully implemented/tested, but wiring the existing typed handlers requires Soroban XDR decoding, which is out of scope for this issue (the previousindex.tsalso only logged events). A clear marker shows where decoded handlers plug in.Dockerfilerunsnpm ci --omit=devbeforetscbuild (which needs dev@types/typescript) — a pre-existing issue left untouched here.