Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ igniter-landing.html
.claude/worktrees/
SESSION-SUMMARY.md
*.png

# local files/research/issues/logs
.local
28 changes: 28 additions & 0 deletions apps/middleman-workflows/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { Config } from 'jest'
const config: Config = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['<rootDir>/src/**/*.test.ts'],
moduleNameMapper: {
'^@/(.*)$': '<rootDir>/src/$1',
'^@igniter/pocket/proto/generated/(.*)$': '<rootDir>/../../packages/pocket/src/proto/generated/$1',
'^@igniter/pocket/proto/(.*)$': '<rootDir>/../../packages/pocket/src/proto/generated/$1',
'^@igniter/pocket/(.*)$': '<rootDir>/../../packages/pocket/src/$1',
'^@igniter/pocket$': '<rootDir>/../../packages/pocket/src/index.ts',
'^@igniter/tx-verify$': '<rootDir>/../../packages/tx-verify/src/index.ts',
'^@igniter/domain/(.*)$': '<rootDir>/../../packages/domain/src/$1',
'^@igniter/db/(.*)$': '<rootDir>/../../packages/db/src/$1',
'^@igniter/logger$': '<rootDir>/../../packages/logger/src/index.ts',
'^@pocket/(.*)$': '<rootDir>/../../packages/pocket/src/$1',
},
transform: {
'^.+\\.tsx?$': [
'ts-jest',
{
tsconfig: '<rootDir>/tsconfig.json',
diagnostics: false,
},
],
},
}
export default config
7 changes: 6 additions & 1 deletion apps/middleman-workflows/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"dev": "nodemon --signal SIGHUP",
"build": "tsc --project ./tsconfig.json && tsc-alias -p ./tsconfig.json",
"start": "node ./dist/src/worker.js",
"check-types": "tsc --noEmit"
"check-types": "tsc --noEmit",
"test": "jest"
},
"dependencies": {
"@igniter/commons": "workspace:*",
Expand All @@ -26,6 +27,7 @@
"@temporalio/worker": "1.11.7",
"@temporalio/workflow": "1.11.7",
"@igniter/pocket": "workspace:*",
"@igniter/tx-verify": "workspace:*",
"@igniter/logger": "workspace:*",
"@igniter/db": "workspace:*",
"@igniter/domain": "workspace:*",
Expand All @@ -42,10 +44,13 @@
"devDependencies": {
"@igniter/eslint-config": "workspace:*",
"@igniter/typescript-config": "workspace:*",
"@types/jest": "^30.0.0",
"@types/pg": "^8.11.11",
"@types/url-join": "^4.0.1",
"jest": "^30.1.3",
"nodemon": "^3.1.9",
"npm-run-all": "4.1.5",
"ts-jest": "^29.4.4",
"tsc-alias": "^1.8.15"
},
"exports": {
Expand Down
200 changes: 195 additions & 5 deletions apps/middleman-workflows/src/activities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ import {
import {
NodeStatus,
TransactionStatus,
TransactionType,
SupplierChangeType,
} from '@igniter/db/middleman/enums'
import { createHash } from 'node:crypto'
import { detectSupplierChanges, DetectedSupplierChange } from '@igniter/domain/middleman/utils/supplierChanges'
import { extractTransactionStakingSuppliers, extractTransactionUnstakingSuppliers } from '@/workflows/utils'
import { ProviderService } from '@/lib/provider'
import DAL from '@/lib/dal/DAL'
import type { PocketBlockchain, SupplierServiceConfig, SupplierEndpoint, ServiceRevenueShare } from '@igniter/pocket'
import type { PocketBlockchain, SupplierServiceConfig, SupplierEndpoint, ServiceRevenueShare, VerifyOutcome, SupplierEffect } from '@igniter/pocket'
import type { VerificationDecision, SupplierPathOutcome } from '@igniter/tx-verify'
import { TX_EXPIRATION_BLOCKS } from '@igniter/tx-verify'
import { STAKE_TYPE_URL, UNSTAKE_TYPE_URL } from '@/lib/constants'
import { ServiceConfigUpdate } from '@igniter/pocket/proto/pocket/shared/supplier'
import { NodesMinMax } from '@/lib/dal/nodes'
import { verifyStakeGoalState } from './verifyStakeGoalStateHelper'
import { parseSignerAndSequence } from './parseSignerAndSequence'

export type Height = number

Expand Down Expand Up @@ -147,7 +153,20 @@ export const governanceActivities = (dal: DAL) => ({
},
})

export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain, providerService: ProviderService) => ({
/** Number of consecutive unavailable checks between critical alerts for a chronically-unverifiable tx. */
const VERIFY_UNAVAILABLE_ALERT_THRESHOLD = Number(process.env.VERIFY_UNAVAILABLE_ALERT_THRESHOLD ?? 50)

/** Per-sweep hash-scan window, matching the on-chain mempool expiration window. */
// TX_EXPIRATION_BLOCKS imported from @igniter/tx-verify above

/**
* Parses a transaction's unsigned payload into the expected on-chain supplier effect.
* Returns null when the tx has no supplier-state path (send / OperationalFunds), so the
* verifier knows to skip the supplier verification path for it.
*/

export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain, providerService: ProviderService) => {
const activities = {
/**
* Returns the latest block height from the blockchain.
* @returns GetLatestBlockResult
Expand Down Expand Up @@ -393,6 +412,18 @@ export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain,
async updateProviders(providers: Provider[]) {
await dal.provider.updateProviders(providers)
},
/**
* Reads the timeoutHeight embedded in the signed payload for a transaction.
* Reuses parseSignerAndSequence's TxBody decode path. Returns null when the
* payload is absent, unparseable, or has no embedded timeout (external-wallet txs).
*/
async getTxTimeoutHeight(transactionId: number): Promise<number | null> {
const txn = await dal.transaction.getTransaction(transactionId)
if (!txn?.signedPayload) return null
const { timeoutHeight } = parseSignerAndSequence(txn.signedPayload)
return timeoutHeight
},

/**
* Updates a transaction with the given payload.
*
Expand Down Expand Up @@ -1107,7 +1138,7 @@ export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain,
return { statusResult }
},

async notifyProviderOfFailedStakes(transactionId: number) {
async notifyProviderOfFailedStakes(transactionId: number, onlyAddresses?: string[]) {
try {
const transaction = await dal.transaction.getTransaction(transactionId)

Expand All @@ -1129,7 +1160,9 @@ export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain,

const suppliers = extractTransactionStakingSuppliers(transaction)

const addresses = suppliers.map(({ address }) => address)
const addresses = suppliers
.map(({ address }) => address)
.filter((addr) => !onlyAddresses || onlyAddresses.includes(addr))

if (addresses.length === 0) {
return {
Expand All @@ -1154,4 +1187,161 @@ export const delegatorActivities = (dal: DAL, pocketRpcClient: PocketBlockchain,
}
}
},
})

/**
* The verifier's queue: pending transactions that have been broadcast (have a hash).
*/
async listPendingWithHash() {
const txs = await dal.transaction.listPendingWithHash()
// @ts-ignore (todo: align serialized shape with the workflow)
return txs.map(({ id, executionHeight }) => ({ id, executionHeight }))
},

/**
* Verifies a broadcast tx by hash, scanning from one block past its last covered
* height (or its execution height on the first sweep). Maps the pocket tri-state
* result down to the minimal shape the pure decision logic consumes.
*/
async verifyTxHash(transactionId: number): Promise<VerifyOutcome<{ success: boolean; code: number; gasUsed: string }>> {
const txn = await dal.transaction.getTransaction(transactionId)
if (!txn?.hash) {
throw new Error('verifyTxHash: tx missing hash')
}
const startHeight = txn.lastCoveredHeight != null ? txn.lastCoveredHeight + 1 : (txn.executionHeight ?? 0)
const out = await pocketRpcClient.verifyTransaction(txn.hash, startHeight, TX_EXPIRATION_BLOCKS)
if (out.status !== 'confirmed') return out
// gasUsed is serialized as a string: a bigint cannot cross the Temporal
// activity boundary (the default payload converter cannot encode BigInt).
return {
status: 'confirmed',
data: { success: out.data.success, code: out.data.code, gasUsed: out.data.gasUsed.toString() },
}
},

/**
* Verifies a broadcast tx by its expected on-chain supplier goal-state. Returns null for
* tx types with no supplier-state path (send / OperationalFunds) so the decision logic
* treats the supplier path as inapplicable. Uses goal-state semantics: the tx is
* confirmed if every operator's on-chain state matches intent, regardless of which tx
* produced it.
*/
async verifySupplierEffect(transactionId: number): Promise<SupplierPathOutcome | null> {
const txn = await dal.transaction.getTransaction(transactionId)
if (!txn) throw new Error('verifySupplierEffect: tx not found')

if (txn.type === TransactionType.Stake || txn.type === TransactionType.Upstake) {
return verifyStakeGoalState(txn, (addr) => pocketRpcClient.getSupplier(addr))
}

if (txn.type === TransactionType.Unstake) {
const [unstake] = extractTransactionUnstakingSuppliers(txn)
if (!unstake) return null
const effect = { kind: 'unstake' as const, minSessionEndHeight: txn.executionHeight ?? 0 }
const out = await pocketRpcClient.verifySupplierEffect(unstake.operatorAddress, effect)
if (out.status === 'confirmed') return { status: 'confirmed' }
if (out.status === 'absent') return { status: 'absent', absentOperators: [unstake.operatorAddress] }
return { status: 'unavailable' }
}

return null
},

/**
* Extracts tx validity evidence (timeoutHeight, sequence) from the signed payload.
* Used to drive the hash-absent path in decideVerification toward expired/sequence-consumed
* verdicts without waiting for the full expiration window.
*/
async checkTxValidityEvidence(transactionId: number): Promise<{
txTimeoutHeight: number | null
sequence: { consumed: boolean; observedAtHeight: number } | null
}> {
const txn = await dal.transaction.getTransaction(transactionId)
if (!txn) return { txTimeoutHeight: null, sequence: null }

if (!txn.signedPayload) return { txTimeoutHeight: null, sequence: null }

const parsed = parseSignerAndSequence(txn.signedPayload)

if (parsed.timeoutHeight) return { txTimeoutHeight: parsed.timeoutHeight, sequence: null }

if (parsed.sequence == null) return { txTimeoutHeight: null, sequence: null }

let signer: string | null = null
try {
const { body } = JSON.parse(txn.unsignedPayload)
signer = body.messages[0]?.value?.signer ?? null
} catch { /* ignore */ }

if (!signer) return { txTimeoutHeight: null, sequence: null }

try {
const sequenceEvidence = await pocketRpcClient.isSequenceConsumed(signer, parsed.sequence)
return { txTimeoutHeight: null, sequence: sequenceEvidence }
} catch {
return { txTimeoutHeight: null, sequence: null }
}
},

/**
* Applies a verification decision computed by the pure `decideVerification` (v2 shape).
* Pending → record progress (coverage/unavailable) + maybe alert on chronic unavailability.
* Terminal → run downstream effects BEFORE the CAS so idempotent re-runs on retry are safe;
* only the caller that wins the CAS runs effects (zero-rows → already terminal, skip).
*/
async applyVerificationDecision(transactionId: number, decision: VerificationDecision): Promise<void> {
if (decision.tx === 'pending') {
await dal.transaction.recordVerificationProgress(transactionId, {
lastCoveredHeight: decision.newLastCoveredHeight,
incUnavailable: decision.incUnavailable,
})
if (decision.incUnavailable) await maybeAlertUnavailable(transactionId)
return
}

const txn = await dal.transaction.getTransaction(transactionId)
if (!txn) return

// Effects keyed off GOAL-STATE, not tx outcome: a tx that failed on-chain while a
// sibling achieved the goal must NOT release staked suppliers. Effects run before
// the CAS (idempotent; a partial run is re-swept).
if (decision.effects === 'apply-success' && txn.type === TransactionType.Stake) {
await activities.createNewNodesFromTransaction(txn.id)
await activities.notifyProviderOfStakedAddresses(txn.id)
} else if (decision.effects === 'apply-failure' && txn.type === TransactionType.Stake) {
await activities.notifyProviderOfFailedStakes(txn.id, decision.failedOperators)
// Upstake has no creation site today; apply-failure for it is intentionally effects-free.
} else if (decision.effects === 'apply-success' && txn.type === TransactionType.Unstake) {
await activities.updateUnstakingNodesFromTransaction(txn.id)
await activities.notifyProviderOfUntakingAddresses(txn.id)
}

const status = decision.tx === 'success' ? TransactionStatus.Success : TransactionStatus.Failure
const verificationHeight = await pocketRpcClient.getHeight().catch(() => undefined)
const fields: { code?: number; consumedFee?: number; verificationHeight?: number; log?: string } = {
verificationHeight,
log: decision.tx === 'success' ? 'verified'
: decision.effects === 'apply-success' ? 'tx failed on-chain; goal met by sibling tx'
: 'verification negative (validity bound covered, no effect)',
}
if (decision.code !== undefined) fields.code = decision.code
if (decision.gasUsed !== undefined) fields.consumedFee = Number(decision.gasUsed)
await dal.transaction.claimTerminalTransition(transactionId, status, fields)
},
}

/**
* Reads the tx and emits a critical log when its unavailable-check counter crosses a
* multiple of the alert threshold. No status change — this is operator-attention only.
*/
async function maybeAlertUnavailable(transactionId: number): Promise<void> {
const txn = await dal.transaction.getTransaction(transactionId)
if (txn && txn.unavailableChecks > 0 && txn.unavailableChecks % VERIFY_UNAVAILABLE_ALERT_THRESHOLD === 0) {
log.error(
'TX unverifiable: RPC repeatedly unavailable — operator attention needed',
{ transactionId, unavailableChecks: txn.unavailableChecks },
)
}
}

return activities
}
25 changes: 25 additions & 0 deletions apps/middleman-workflows/src/activities/parseSignerAndSequence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { TxRaw, AuthInfo, TxBody } from '@igniter/pocket/proto/cosmos/tx/v1beta1/tx'

/**
* Parses signer sequence and timeoutHeight from a base64-encoded signed TxRaw payload.
* Returns null values on parse failure (activity caller treats as no evidence → pending).
*/
export function parseSignerAndSequence(signedPayload: string): {
sequence: number | null
timeoutHeight: number | null
} {
try {
const txBytes = Buffer.from(signedPayload, 'base64')
const txRaw = TxRaw.decode(txBytes)
const authInfo = AuthInfo.decode(txRaw.authInfoBytes)
const sequence = authInfo.signerInfos[0]?.sequence ?? null
const body = TxBody.decode(txRaw.bodyBytes)
const timeoutHeight = body.timeoutHeight || null
return {
sequence: sequence !== null ? Number(sequence) : null,
timeoutHeight: timeoutHeight ? Number(timeoutHeight) : null,
}
} catch {
return { sequence: null, timeoutHeight: null }
}
}
Loading
Loading