From c1088a9afc3115e7c524c15c5294d8eda3956fea Mon Sep 17 00:00:00 2001 From: miguelc Date: Tue, 9 Jun 2026 17:09:53 -0400 Subject: [PATCH 1/2] feat: harden ExecuteTransaction verification (#304) - ExecutePendingTransactions now schedules children with bounded concurrency (pLimit) + Promise.allSettled instead of a serial await loop, so one slow tx no longer blocks the rest. Matches the SupplierStatus pattern. - A START_TO_CLOSE verify timeout is now classified as inconclusive (own verifyTimedOut flag + distinct log line) rather than an unexpected verification error, keeping it distinguishable from a genuine not-found. for triage. - getTransactionFromBlock clamps the scan window to the chain head (skips future/never-produced heights) and fetches candidate blocks in parallel with a bounded worker pool (BLOCK_SCAN_CONCURRENCY) to cap the per-scan RPC burst. --- .../workflows/ExecutePendingTransactions.ts | 52 ++++++--- .../src/workflows/ExecuteTransaction.ts | 16 ++- packages/pocket/src/getTransaction.test.ts | 61 ++++++++++ packages/pocket/src/index.ts | 107 +++++++++++++----- 4 files changed, 192 insertions(+), 44 deletions(-) diff --git a/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts b/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts index 45f4aa31..b90b0e63 100644 --- a/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts +++ b/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts @@ -1,9 +1,15 @@ import {proxyActivities, WorkflowIdReusePolicy} from "@temporalio/workflow"; import { delegatorActivities } from '@/activities'; -import {executeChild} from "@temporalio/workflow"; +import {executeChild, log} from "@temporalio/workflow"; + +// @ts-expect-error p-limit is ESM-only; its default export has no CJS types under this build's module resolution +import pLimit from 'p-limit' export interface ExecutePendingTransactionsArgs {} + +const MAX_CONCURRENT_TRANSACTIONS = 10 + export async function ExecutePendingTransactions(args: ExecutePendingTransactionsArgs) { const { listTransactions } = proxyActivities>({ @@ -15,21 +21,33 @@ export async function ExecutePendingTransactions(args: ExecutePendingTransaction const txs = await listTransactions(); - for (const {id, createdAt} of txs) { - const workflowId = `ExecuteTransaction-${id}-${createdAt}`; - await executeChild("ExecuteTransaction", { - workflowId, - args: [{ transactionId: id }], - workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, - retry: { - maximumAttempts: 5, - }, - }).catch((err) => { - if (err.name === "WorkflowExecutionAlreadyStartedError") { - console.log(`Workflow with ID=${workflowId} is already running, skipping.`); - } else { - throw err; - } - }); + const limit = pLimit(MAX_CONCURRENT_TRANSACTIONS); + + const childPromises = txs.map(({ id, createdAt }) => + limit(() => { + const workflowId = `ExecuteTransaction-${id}-${createdAt}`; + return executeChild("ExecuteTransaction", { + workflowId, + args: [{ transactionId: id }], + workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY, + retry: { + maximumAttempts: 5, + }, + }).catch((err) => { + if (err.name === "WorkflowExecutionAlreadyStartedError") { + log.info(`Workflow with ID=${workflowId} is already running, skipping.`); + } else { + throw err; + } + }); + }) + ); + +const results = await Promise.allSettled(childPromises); + + for (const r of results) { + if (r.status === "rejected") { + log.warn("ExecutePendingTransactions: child workflow failed", { reason: String(r.reason) }); + } } } diff --git a/apps/middleman-workflows/src/workflows/ExecuteTransaction.ts b/apps/middleman-workflows/src/workflows/ExecuteTransaction.ts index 950fba29..f18a52d2 100644 --- a/apps/middleman-workflows/src/workflows/ExecuteTransaction.ts +++ b/apps/middleman-workflows/src/workflows/ExecuteTransaction.ts @@ -2,6 +2,7 @@ import { ActivityFailure, ApplicationFailure, proxyActivities, + TimeoutFailure, WorkflowError, } from '@temporalio/workflow' import { delegatorActivities } from "@/activities"; @@ -29,6 +30,13 @@ function isTxNotFoundFailure(err: unknown): boolean { return false } +function isStartToCloseTimeout(err: unknown): boolean { + if (err instanceof ActivityFailure && err.cause instanceof TimeoutFailure) { + return err.cause.timeoutType === 'START_TO_CLOSE' + } + return false +} + /** * Extracts the owner + operator addresses from a Stake transaction's unsigned payload. * Used for the Tier 4 fallback (supplier state check) when TX lookup fails — both are @@ -182,6 +190,7 @@ export async function ExecuteTransaction(args: TransactionArgs) { let txFoundOnChain = false; let supplierFallbackHit = false; let verifyErroredUnexpectedly = false; + let verifyTimedOut = false; try { // Retries are driven by Temporal's activity retry policy — one attempt per block @@ -198,7 +207,10 @@ export async function ExecuteTransaction(args: TransactionArgs) { // we never rethrow into an indefinite Pending loop) but record *why* so a real // verification error stays triageable instead of being mislabeled a clean // "not found". - verifyErroredUnexpectedly = !isTxNotFoundFailure(err); + // A clean not-found and a start-to-close timeout are both inconclusive (no + // proof the tx failed); only anything else counts as an unexpected error. + verifyTimedOut = isStartToCloseTimeout(err); + verifyErroredUnexpectedly = !isTxNotFoundFailure(err) && !verifyTimedOut; // Need both addresses to validate ownership — without the expected owner the // supplier fallback can't prove the on-chain supplier is ours, so we skip it @@ -231,6 +243,8 @@ export async function ExecuteTransaction(args: TransactionArgs) { verificationLog = 'verified via supplier state fallback (tx hash not found)'; } else if (verifyErroredUnexpectedly) { verificationLog = `verification errored (not a clean not-found) after ${TX_EXPIRATION_BLOCKS} retries; marked failure for triage (baseHeight=${baseHeight})`; + } else if (verifyTimedOut) { + verificationLog = `verify timed out (inconclusive, treated as not-found) after ${TX_EXPIRATION_BLOCKS} retries (baseHeight=${baseHeight})`; } else { verificationLog = `tx not found on-chain after ${TX_EXPIRATION_BLOCKS} retries (baseHeight=${baseHeight})`; } diff --git a/packages/pocket/src/getTransaction.test.ts b/packages/pocket/src/getTransaction.test.ts index a54e3ef6..d73c628b 100644 --- a/packages/pocket/src/getTransaction.test.ts +++ b/packages/pocket/src/getTransaction.test.ts @@ -6,6 +6,7 @@ import { toHex } from '@cosmjs/encoding' // --------------------------------------------------------------------------- const mockGetTx = jest.fn() +const mockGetHeight = jest.fn() const mockBlock = jest.fn() const mockBlockResults = jest.fn() const mockDisconnect = jest.fn() @@ -18,6 +19,7 @@ jest.mock('@cosmjs/stargate', () => { StargateClient: { create: jest.fn().mockResolvedValue({ getTx: mockGetTx, + getHeight: mockGetHeight, disconnect: mockDisconnect, }), }, @@ -71,6 +73,9 @@ const txHash = toHex(txHashBytes).toUpperCase() describe('PocketBlockchain.getTransaction', () => { beforeEach(() => { jest.clearAllMocks() + // Default: chain head far ahead so the Tier-3 block scan walks its full + // window unless a test overrides it to exercise the head-cap. + mockGetHeight.mockResolvedValue(10_000_000) }) // 1. Tier 1 success @@ -269,4 +274,60 @@ describe('PocketBlockchain.getTransaction', () => { code: 0, }) }) + + // 8. Block scan caps at chain head: never fetch future heights + it('does not scan past the current chain head during block scan', async () => { + mockGetTx.mockResolvedValue(null) + mockFetch.mockResolvedValue({ ok: false }) + mockBlock.mockResolvedValue({ block: { txs: [] } }) // tx never present + + const startHeight = 1000 + // Head is only 2 blocks past the start: window must be 1000..1002 inclusive. + mockGetHeight.mockResolvedValue(startHeight + 2) + + const bc = await createInstance('http://api.example.com') + const result = await bc.getTransaction(txHash, startHeight) + + expect(result).toBeNull() + expect(mockBlock).toHaveBeenCalledWith(startHeight) + expect(mockBlock).toHaveBeenCalledWith(startHeight + 1) + expect(mockBlock).toHaveBeenCalledWith(startHeight + 2) + // The default maxBlocks is 30, but the head cap must stop the scan at +2. + expect(mockBlock).not.toHaveBeenCalledWith(startHeight + 3) + expect(mockBlock).toHaveBeenCalledTimes(3) + }) + + // 9. Tx height ahead of chain head: scan is skipped entirely + it('skips the block scan when the tx height is ahead of the chain head', async () => { + mockGetTx.mockResolvedValue(null) + mockFetch.mockResolvedValue({ ok: false }) + + const startHeight = 5000 + mockGetHeight.mockResolvedValue(startHeight - 1) // head below tx height + + const bc = await createInstance('http://api.example.com') + const result = await bc.getTransaction(txHash, startHeight) + + expect(result).toBeNull() + expect(mockBlock).not.toHaveBeenCalled() + }) + + // 10. getHeight failure: fall back to scanning the full maxBlocks window + it('scans the full window when reading the chain head fails', async () => { + mockGetTx.mockResolvedValue(null) + mockFetch.mockResolvedValue({ ok: false }) + mockBlock.mockResolvedValue({ block: { txs: [] } }) // tx never present + mockGetHeight.mockRejectedValue(new Error('rpc unreachable')) // head unknown + + const startHeight = 2000 + const bc = await createInstance('http://api.example.com') + const result = await bc.getTransaction(txHash, startHeight) + + expect(result).toBeNull() + // Fallback window is the full default maxBlocks (30): startHeight .. startHeight+29. + expect(mockBlock).toHaveBeenCalledWith(startHeight) + expect(mockBlock).toHaveBeenCalledWith(startHeight + 29) + expect(mockBlock).not.toHaveBeenCalledWith(startHeight + 30) + expect(mockBlock).toHaveBeenCalledTimes(30) + }) }) diff --git a/packages/pocket/src/index.ts b/packages/pocket/src/index.ts index 890176f5..21bae41d 100644 --- a/packages/pocket/src/index.ts +++ b/packages/pocket/src/index.ts @@ -59,6 +59,36 @@ export function isSequenceMismatchError(errorMessage: string): boolean { return errorMessage.includes('account sequence mismatch') } +/** + * Maps `items` through `fn` with at most `limit` calls in flight at once, preserving + * input order in the returned array. Used to bound the Tier-3 block scan's parallel + * `comet.block()` fan-out: firing all ~30 heights at once (and up to MAX_CONCURRENT + * ExecuteTransactions doing the same) could burst hundreds of concurrent RPCs at a + * single node and trip its rate limits. A small pool keeps most of the latency win + * of parallelism without the unbounded burst (issue #304). + */ +async function mapWithConcurrency( + items: T[], + limit: number, + fn: (item: T) => Promise, +): Promise { + const results = new Array(items.length) + let next = 0 + async function worker(): Promise { + while (true) { + const i = next++ + if (i >= items.length) return + results[i] = await fn(items[i]!) + } + } + const workerCount = Math.min(limit, items.length) + await Promise.all(Array.from({ length: workerCount }, () => worker())) + return results +} + +/** Max concurrent `comet.block()` fetches per Tier-3 block scan (issue #304). */ +const BLOCK_SCAN_CONCURRENCY = 8 + /** * Creates a Protobuf-based RPC client for querying a blockchain using a QueryClient. * @@ -356,35 +386,60 @@ export class PocketBlockchain { const comet = await this.getCometClient() const normalizedHash = txHash.toUpperCase() - for (let h = startHeight; h < startHeight + maxBlocks; h++) { + + let latestHeight: number + try { + latestHeight = await this.getHeight() + } catch (error) { + this.logger.warn({ txHash, startHeight, error }, 'Block scan: failed to read chain head, scanning full window') + latestHeight = startHeight + maxBlocks - 1 + } + + const endHeight = Math.min(startHeight + maxBlocks - 1, latestHeight) + if (endHeight < startHeight) { + // The tx height is ahead of the current head; nothing to scan yet. + return null + } + + const heights: number[] = [] + for (let h = startHeight; h <= endHeight; h++) heights.push(h) + + // Fetch candidate blocks in parallel, but bounded + const blocks = await mapWithConcurrency(heights, BLOCK_SCAN_CONCURRENCY, async (h) => { try { - const block = await comet.block(h) - const txs = block.block.txs - for (let i = 0; i < txs.length; i++) { - const txBytes = txs[i] - if (!txBytes) continue - const hash = toHex(sha256(txBytes)).toUpperCase() - if (hash === normalizedHash) { - const results = await comet.blockResults(h) - const txData = results.results[i] - if (!txData) { - this.logger.warn({ txHash, height: h, index: i }, 'Block results missing entry for matched TX') - return null - } - return { - hash: txHash, - height: h, - index: i, - gasUsed: txData.gasUsed, - gasWanted: txData.gasWanted, - success: txData.code === 0, - code: txData.code, - } - } - } + return { h, block: await comet.block(h) } } catch (error) { this.logger.warn({ txHash, height: h, error }, 'Block scan error at height') - continue + return { h, block: null } + } + }) + + // Iterate in ascending height order so we deterministically return the first + // (lowest-height) match, preserving the previous sequential behavior. + for (const { h, block } of blocks) { + if (!block) continue + const txs = block.block.txs + for (let i = 0; i < txs.length; i++) { + const txBytes = txs[i] + if (!txBytes) continue + const hash = toHex(sha256(txBytes)).toUpperCase() + if (hash === normalizedHash) { + const results = await comet.blockResults(h) + const txData = results.results[i] + if (!txData) { + this.logger.warn({ txHash, height: h, index: i }, 'Block results missing entry for matched TX') + return null + } + return { + hash: txHash, + height: h, + index: i, + gasUsed: txData.gasUsed, + gasWanted: txData.gasWanted, + success: txData.code === 0, + code: txData.code, + } + } } } return null From a3b36524464c67a4a60eac5822dbf3c943c5e0af Mon Sep 17 00:00:00 2001 From: Jorge Cuesta Date: Wed, 10 Jun 2026 13:38:24 -0400 Subject: [PATCH 2/2] fix: fail ExecutePendingTransactions when all child workflows fail Align with the SupplierStatus/SupplierRemediation pattern: tolerate a partial failure (one bad tx must not block the rest) but surface a WorkflowError when every scheduled child failed, so a systemic problem (RPC/DB down) no longer completes green. Guard the empty-batch case where every() over [] would vacuously throw. Also fix the stray unindented results line. --- .../src/workflows/ExecutePendingTransactions.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts b/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts index b90b0e63..81bcbfce 100644 --- a/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts +++ b/apps/middleman-workflows/src/workflows/ExecutePendingTransactions.ts @@ -1,6 +1,6 @@ import {proxyActivities, WorkflowIdReusePolicy} from "@temporalio/workflow"; import { delegatorActivities } from '@/activities'; -import {executeChild, log} from "@temporalio/workflow"; +import {executeChild, log, WorkflowError} from "@temporalio/workflow"; // @ts-expect-error p-limit is ESM-only; its default export has no CJS types under this build's module resolution import pLimit from 'p-limit' @@ -43,11 +43,22 @@ export async function ExecutePendingTransactions(args: ExecutePendingTransaction }) ); -const results = await Promise.allSettled(childPromises); + const results = await Promise.allSettled(childPromises); for (const r of results) { if (r.status === "rejected") { log.warn("ExecutePendingTransactions: child workflow failed", { reason: String(r.reason) }); } } + + // Match the SupplierStatus pattern: a partial failure is tolerated (one bad tx + // shouldn't block the rest), but if every scheduled child failed the run is a + // systemic problem (e.g. RPC/DB down) and must surface as a workflow failure + // rather than completing green. Guard against the empty-batch case where + // `every` would vacuously return true. + const allFailed = + results.length > 0 && results.every((r) => r.status === "rejected"); + if (allFailed) { + throw new WorkflowError("ExecutePendingTransactions: all child workflows failed"); + } }