Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import {proxyActivities, WorkflowIdReusePolicy} from "@temporalio/workflow";
import { delegatorActivities } from '@/activities';
import {executeChild} from "@temporalio/workflow";
import {executeChild, log, WorkflowError} from "@temporalio/workflow";

// @ts-expect-error p-limit is ESM-only; its default export has no CJS types under this build's module resolution
import pLimit from 'p-limit'

export interface ExecutePendingTransactionsArgs {}


const MAX_CONCURRENT_TRANSACTIONS = 10

export async function ExecutePendingTransactions(args: ExecutePendingTransactionsArgs) {
const { listTransactions } =
proxyActivities<ReturnType<typeof delegatorActivities>>({
Expand All @@ -15,21 +21,44 @@ export async function ExecutePendingTransactions(args: ExecutePendingTransaction

const txs = await listTransactions();

for (const {id, createdAt} of txs) {
const workflowId = `ExecuteTransaction-${id}-${createdAt}`;
await executeChild("ExecuteTransaction", {
workflowId,
args: [{ transactionId: id }],
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
retry: {
maximumAttempts: 5,
},
}).catch((err) => {
if (err.name === "WorkflowExecutionAlreadyStartedError") {
console.log(`Workflow with ID=${workflowId} is already running, skipping.`);
} else {
throw err;
}
});
const limit = pLimit(MAX_CONCURRENT_TRANSACTIONS);

const childPromises = txs.map(({ id, createdAt }) =>
limit(() => {
const workflowId = `ExecuteTransaction-${id}-${createdAt}`;
return executeChild("ExecuteTransaction", {
workflowId,
args: [{ transactionId: id }],
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY,
retry: {
maximumAttempts: 5,
},
}).catch((err) => {
if (err.name === "WorkflowExecutionAlreadyStartedError") {
log.info(`Workflow with ID=${workflowId} is already running, skipping.`);
} else {
throw err;
}
});
})
);

const results = await Promise.allSettled(childPromises);

for (const r of results) {
if (r.status === "rejected") {
log.warn("ExecutePendingTransactions: child workflow failed", { reason: String(r.reason) });
}
}

// Match the SupplierStatus pattern: a partial failure is tolerated (one bad tx
// shouldn't block the rest), but if every scheduled child failed the run is a
// systemic problem (e.g. RPC/DB down) and must surface as a workflow failure
// rather than completing green. Guard against the empty-batch case where
// `every` would vacuously return true.
const allFailed =
results.length > 0 && results.every((r) => r.status === "rejected");
if (allFailed) {
throw new WorkflowError("ExecutePendingTransactions: all child workflows failed");
}
}
16 changes: 15 additions & 1 deletion apps/middleman-workflows/src/workflows/ExecuteTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
ActivityFailure,
ApplicationFailure,
proxyActivities,
TimeoutFailure,
WorkflowError,
} from '@temporalio/workflow'
import { delegatorActivities } from "@/activities";
Expand Down Expand Up @@ -29,6 +30,13 @@ function isTxNotFoundFailure(err: unknown): boolean {
return false
}

function isStartToCloseTimeout(err: unknown): boolean {
if (err instanceof ActivityFailure && err.cause instanceof TimeoutFailure) {
return err.cause.timeoutType === 'START_TO_CLOSE'
}
return false
}

/**
* Extracts the owner + operator addresses from a Stake transaction's unsigned payload.
* Used for the Tier 4 fallback (supplier state check) when TX lookup fails — both are
Expand Down Expand Up @@ -182,6 +190,7 @@ export async function ExecuteTransaction(args: TransactionArgs) {
let txFoundOnChain = false;
let supplierFallbackHit = false;
let verifyErroredUnexpectedly = false;
let verifyTimedOut = false;

try {
// Retries are driven by Temporal's activity retry policy — one attempt per block
Expand All @@ -198,7 +207,10 @@ export async function ExecuteTransaction(args: TransactionArgs) {
// we never rethrow into an indefinite Pending loop) but record *why* so a real
// verification error stays triageable instead of being mislabeled a clean
// "not found".
verifyErroredUnexpectedly = !isTxNotFoundFailure(err);
// A clean not-found and a start-to-close timeout are both inconclusive (no
// proof the tx failed); only anything else counts as an unexpected error.
verifyTimedOut = isStartToCloseTimeout(err);
verifyErroredUnexpectedly = !isTxNotFoundFailure(err) && !verifyTimedOut;

// Need both addresses to validate ownership — without the expected owner the
// supplier fallback can't prove the on-chain supplier is ours, so we skip it
Expand Down Expand Up @@ -231,6 +243,8 @@ export async function ExecuteTransaction(args: TransactionArgs) {
verificationLog = 'verified via supplier state fallback (tx hash not found)';
} else if (verifyErroredUnexpectedly) {
verificationLog = `verification errored (not a clean not-found) after ${TX_EXPIRATION_BLOCKS} retries; marked failure for triage (baseHeight=${baseHeight})`;
} else if (verifyTimedOut) {
verificationLog = `verify timed out (inconclusive, treated as not-found) after ${TX_EXPIRATION_BLOCKS} retries (baseHeight=${baseHeight})`;
} else {
verificationLog = `tx not found on-chain after ${TX_EXPIRATION_BLOCKS} retries (baseHeight=${baseHeight})`;
}
Expand Down
61 changes: 61 additions & 0 deletions packages/pocket/src/getTransaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { toHex } from '@cosmjs/encoding'
// ---------------------------------------------------------------------------

const mockGetTx = jest.fn()
const mockGetHeight = jest.fn()
const mockBlock = jest.fn()
const mockBlockResults = jest.fn()
const mockDisconnect = jest.fn()
Expand All @@ -18,6 +19,7 @@ jest.mock('@cosmjs/stargate', () => {
StargateClient: {
create: jest.fn().mockResolvedValue({
getTx: mockGetTx,
getHeight: mockGetHeight,
disconnect: mockDisconnect,
}),
},
Expand Down Expand Up @@ -71,6 +73,9 @@ const txHash = toHex(txHashBytes).toUpperCase()
describe('PocketBlockchain.getTransaction', () => {
beforeEach(() => {
jest.clearAllMocks()
// Default: chain head far ahead so the Tier-3 block scan walks its full
// window unless a test overrides it to exercise the head-cap.
mockGetHeight.mockResolvedValue(10_000_000)
})

// 1. Tier 1 success
Expand Down Expand Up @@ -269,4 +274,60 @@ describe('PocketBlockchain.getTransaction', () => {
code: 0,
})
})

// 8. Block scan caps at chain head: never fetch future heights
it('does not scan past the current chain head during block scan', async () => {
mockGetTx.mockResolvedValue(null)
mockFetch.mockResolvedValue({ ok: false })
mockBlock.mockResolvedValue({ block: { txs: [] } }) // tx never present

const startHeight = 1000
// Head is only 2 blocks past the start: window must be 1000..1002 inclusive.
mockGetHeight.mockResolvedValue(startHeight + 2)

const bc = await createInstance('http://api.example.com')
const result = await bc.getTransaction(txHash, startHeight)

expect(result).toBeNull()
expect(mockBlock).toHaveBeenCalledWith(startHeight)
expect(mockBlock).toHaveBeenCalledWith(startHeight + 1)
expect(mockBlock).toHaveBeenCalledWith(startHeight + 2)
// The default maxBlocks is 30, but the head cap must stop the scan at +2.
expect(mockBlock).not.toHaveBeenCalledWith(startHeight + 3)
expect(mockBlock).toHaveBeenCalledTimes(3)
})

// 9. Tx height ahead of chain head: scan is skipped entirely
it('skips the block scan when the tx height is ahead of the chain head', async () => {
mockGetTx.mockResolvedValue(null)
mockFetch.mockResolvedValue({ ok: false })

const startHeight = 5000
mockGetHeight.mockResolvedValue(startHeight - 1) // head below tx height

const bc = await createInstance('http://api.example.com')
const result = await bc.getTransaction(txHash, startHeight)

expect(result).toBeNull()
expect(mockBlock).not.toHaveBeenCalled()
})

// 10. getHeight failure: fall back to scanning the full maxBlocks window
it('scans the full window when reading the chain head fails', async () => {
mockGetTx.mockResolvedValue(null)
mockFetch.mockResolvedValue({ ok: false })
mockBlock.mockResolvedValue({ block: { txs: [] } }) // tx never present
mockGetHeight.mockRejectedValue(new Error('rpc unreachable')) // head unknown

const startHeight = 2000
const bc = await createInstance('http://api.example.com')
const result = await bc.getTransaction(txHash, startHeight)

expect(result).toBeNull()
// Fallback window is the full default maxBlocks (30): startHeight .. startHeight+29.
expect(mockBlock).toHaveBeenCalledWith(startHeight)
expect(mockBlock).toHaveBeenCalledWith(startHeight + 29)
expect(mockBlock).not.toHaveBeenCalledWith(startHeight + 30)
expect(mockBlock).toHaveBeenCalledTimes(30)
})
})
107 changes: 81 additions & 26 deletions packages/pocket/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,36 @@ export function isSequenceMismatchError(errorMessage: string): boolean {
return errorMessage.includes('account sequence mismatch')
}

/**
* Maps `items` through `fn` with at most `limit` calls in flight at once, preserving
* input order in the returned array. Used to bound the Tier-3 block scan's parallel
* `comet.block()` fan-out: firing all ~30 heights at once (and up to MAX_CONCURRENT
* ExecuteTransactions doing the same) could burst hundreds of concurrent RPCs at a
* single node and trip its rate limits. A small pool keeps most of the latency win
* of parallelism without the unbounded burst (issue #304).
*/
async function mapWithConcurrency<T, R>(
items: T[],
limit: number,
fn: (item: T) => Promise<R>,
): Promise<R[]> {
const results = new Array<R>(items.length)
let next = 0
async function worker(): Promise<void> {
while (true) {
const i = next++
if (i >= items.length) return
results[i] = await fn(items[i]!)
}
}
const workerCount = Math.min(limit, items.length)
await Promise.all(Array.from({ length: workerCount }, () => worker()))
return results
}

/** Max concurrent `comet.block()` fetches per Tier-3 block scan (issue #304). */
const BLOCK_SCAN_CONCURRENCY = 8

/**
* Creates a Protobuf-based RPC client for querying a blockchain using a QueryClient.
*
Expand Down Expand Up @@ -356,35 +386,60 @@ export class PocketBlockchain {
const comet = await this.getCometClient()
const normalizedHash = txHash.toUpperCase()

for (let h = startHeight; h < startHeight + maxBlocks; h++) {

let latestHeight: number
try {
latestHeight = await this.getHeight()
} catch (error) {
this.logger.warn({ txHash, startHeight, error }, 'Block scan: failed to read chain head, scanning full window')
latestHeight = startHeight + maxBlocks - 1
}

const endHeight = Math.min(startHeight + maxBlocks - 1, latestHeight)
if (endHeight < startHeight) {
// The tx height is ahead of the current head; nothing to scan yet.
return null
}

const heights: number[] = []
for (let h = startHeight; h <= endHeight; h++) heights.push(h)

// Fetch candidate blocks in parallel, but bounded
const blocks = await mapWithConcurrency(heights, BLOCK_SCAN_CONCURRENCY, async (h) => {
try {
const block = await comet.block(h)
const txs = block.block.txs
for (let i = 0; i < txs.length; i++) {
const txBytes = txs[i]
if (!txBytes) continue
const hash = toHex(sha256(txBytes)).toUpperCase()
if (hash === normalizedHash) {
const results = await comet.blockResults(h)
const txData = results.results[i]
if (!txData) {
this.logger.warn({ txHash, height: h, index: i }, 'Block results missing entry for matched TX')
return null
}
return {
hash: txHash,
height: h,
index: i,
gasUsed: txData.gasUsed,
gasWanted: txData.gasWanted,
success: txData.code === 0,
code: txData.code,
}
}
}
return { h, block: await comet.block(h) }
} catch (error) {
this.logger.warn({ txHash, height: h, error }, 'Block scan error at height')
continue
return { h, block: null }
}
})

// Iterate in ascending height order so we deterministically return the first
// (lowest-height) match, preserving the previous sequential behavior.
for (const { h, block } of blocks) {
if (!block) continue
const txs = block.block.txs
for (let i = 0; i < txs.length; i++) {
const txBytes = txs[i]
if (!txBytes) continue
const hash = toHex(sha256(txBytes)).toUpperCase()
if (hash === normalizedHash) {
const results = await comet.blockResults(h)
const txData = results.results[i]
if (!txData) {
this.logger.warn({ txHash, height: h, index: i }, 'Block results missing entry for matched TX')
return null
}
return {
hash: txHash,
height: h,
index: i,
gasUsed: txData.gasUsed,
gasWanted: txData.gasWanted,
success: txData.code === 0,
code: txData.code,
}
}
}
}
return null
Expand Down
Loading