diff --git a/backend/docs/health.md b/backend/docs/health.md new file mode 100644 index 00000000..a9d2df7b --- /dev/null +++ b/backend/docs/health.md @@ -0,0 +1,132 @@ +# Health, Liveness, and Readiness Probes + +The backend exposes two distinct kinds of health signal. They answer different +questions and orchestrators (Kubernetes, ECS, Nomad, …) act on them +differently. Conflating them — as a single flat `/health` that always returns +`ok` — causes traffic to be routed to instances that are up but unable to serve. + +All probes are mounted at the **root** of the app (not under `/api/v1`) and are +**unauthenticated**, because orchestrators probe them without credentials. + +| Endpoint | Kind | Cost | Checks dependencies | Healthy | Unhealthy | +| ---------- | --------- | ----- | ------------------- | ------- | --------- | +| `/health` | Liveness | cheap | no | `200` | — | +| `/livez` | Liveness | cheap | no | `200` | — | +| `/readyz` | Readiness | real | yes | `200` | `503` | + +## Liveness — `/health`, `/livez` + +> "Is the process up and able to serve an HTTP request at all?" + +Liveness is cheap and **dependency-free**. It returns `200` whenever the event +loop can service a request. A failing liveness probe instructs the orchestrator +to **restart** the container, so it must never consult downstream dependencies — +a transient database blip should not trigger a restart loop. + +`/health` is retained for backward compatibility; `/livez` is the conventional +alias. They are identical. + +```json +{ "status": "ok", "timestamp": "2026-06-02T12:00:00.000Z" } +``` + +## Readiness — `/readyz` + +> "Should this instance receive traffic right now?" + +Readiness probes real dependencies. A failing readiness probe pulls the instance +out of the load-balancer rotation **without restarting it**, so it can recover +and rejoin once its dependencies are healthy again. + +It returns: + +- `200` with `status: "ready"` when the instance can serve traffic. +- `503` with `status: "not_ready"` when a hard dependency is unavailable. +- `503` with `status: "maintenance"` when maintenance mode is enabled. + +```json +{ + "status": "ready", + "database": "ok", + "ingest": "ok", + "webhookQueue": "ok", + "timestamp": "2026-06-02T12:00:00.000Z" +} +``` + +### Sub-status semantics + +Each dependency reports a coarse `SubStatus`, the same pattern used by +`/api/v1/admin/monitoring`: + +- `ok` — healthy. +- `degraded` — serving but impaired. **Does not** fail readiness. +- `unavailable` — could not be reached / unusable. **Fails** readiness (`503`). + +| Sub-status | Probe | `degraded` when | `unavailable` when | +| -------------- | ------------------------------ | ------------------------------------------ | ------------------------------------------- | +| `database` | `pingDatabase()` (`SELECT 1`) | — | connection cannot open or execute | +| `ingest` | `lagMonitor.getLagStatus()` | lag ≥ warn threshold, < critical threshold | lag ≥ critical threshold, or probe throws | +| `webhookQueue` | `webhookQueueService.getStats()` | queue is saturated (`size ≥ capacity`) | the queue's backing store is unreachable | + +Ingest lag thresholds are governed by `LagMonitor` and configurable via +`LAG_WARN_THRESHOLD` / `LAG_CRITICAL_THRESHOLD`. See [reliability.md](./reliability.md). + +The instance is **not ready** (`503`) if *any* sub-status is `unavailable`. +`degraded` sub-statuses are surfaced for observability but keep the instance in +rotation: a slightly stale index or a back-pressured queue is still serviceable. + +### Maintenance mode + +When `statusService.isMaintenanceEnabled()` is true, `/readyz` short-circuits +**before** probing any dependency and returns `503` with `status: "maintenance"`. +The instance is intentionally not serving and should be pulled from rotation. +Liveness is unaffected — the process is healthy, just drained. + +## Edge-case behaviour + +| Scenario | `/health`, `/livez` | `/readyz` | +| ---------------------------- | ------------------- | ----------------------------------------------- | +| All healthy | `200 ok` | `200 ready` | +| Database down | `200 ok` | `503 not_ready`, `database: unavailable` | +| Warn-level lag | `200 ok` | `200 ready`, `ingest: degraded` | +| Critical lag | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Lag probe throws | `200 ok` | `503 not_ready`, `ingest: unavailable` | +| Queue store unreachable | `200 ok` | `503 not_ready`, `webhookQueue: unavailable` | +| Queue saturated | `200 ok` | `200 ready`, `webhookQueue: degraded` | +| Maintenance mode | `200 ok` | `503 maintenance` | +| Partial failure (one dep) | `200 ok` | `503 not_ready` (failing dep `unavailable`, rest `ok`) | + +## Security + +These probes are unauthenticated, so their responses are deliberately minimal. +They expose only the coarse status enums above and a timestamp. They do **not** +leak: + +- internal hostnames or connection strings, +- application or dependency versions, +- absolute ledger numbers or ingest-lag values, +- queue depths or capacities, +- underlying exception messages (dependency errors are caught and collapsed to + `unavailable`). + +Richer, sensitive diagnostics (queue depths, invariant counters, cursor +positions, versions) remain behind API-key auth at +[`/api/v1/admin/monitoring`](./admin-monitoring.md). + +## Orchestrator configuration (Kubernetes example) + +```yaml +livenessProbe: + httpGet: + path: /livez + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 10 +readinessProbe: + httpGet: + path: /readyz + port: 3000 + initialDelaySeconds: 5 + periodSeconds: 5 +``` diff --git a/backend/docs/observability.md b/backend/docs/observability.md new file mode 100644 index 00000000..e7d651bf --- /dev/null +++ b/backend/docs/observability.md @@ -0,0 +1,177 @@ +# Observability — Ingest Lag Alerting + +This document describes how the QuickLendX backend turns indexer-lag threshold +breaches into **alerts** and how **degraded mode auto-recovery** works. It +complements [reliability.md](./reliability.md) (which covers how degraded mode +gates writes) and [logging.md](./logging.md) (log redaction policy). + +The alerting logic lives in +[`src/services/lagMonitor.ts`](../src/services/lagMonitor.ts). + +--- + +## Overview + +`LagMonitor` computes indexer lag (in ledgers) as +`current_ledger - last_indexed_ledger`. Two thresholds classify the lag into a +**level**: + +| Level | Condition | Effect | +| ---------- | ---------------------------------- | -------------------------------------- | +| `none` | `lag < warnThreshold` | Healthy. All endpoints available. | +| `warn` | `lag >= warnThreshold` | Degraded. Write endpoints gated (503). | +| `critical` | `lag >= criticalThreshold` | Critically degraded. All writes blocked. | + +The level is consumed by: + +- **`GET /api/v1/status`** — surfaces the current level to clients. +- **`degradedGuard`** middleware — gates write/sensitive endpoints. + +Prior to this feature, threshold breaches were silent (no operator signal) and +recovery was implicit (a single good reading immediately re-opened the write +guard, allowing it to flap). This feature adds **alerts on transitions** and +**hysteresis-backed auto-recovery**. + +--- + +## Thresholds & configuration + +All four parameters are configurable via environment variables. Defaults are +chosen for a ~5s ledger cadence. + +| Env var | Default | Meaning | +| ------------------------- | ------- | -------------------------------------------------------------------- | +| `LAG_WARN_THRESHOLD` | `10` | Lag (ledgers) at which the system becomes degraded (`warn`). | +| `LAG_CRITICAL_THRESHOLD` | `50` | Lag (ledgers) at which the system becomes critically degraded. | +| `LAG_HYSTERESIS_MARGIN` | `3` | Ledgers **below** a threshold the lag must fall to before recovering.| +| `LAG_RECOVERY_POLLS` | `3` | Consecutive recovered polls required before a degraded level clears. | + +Non-numeric or empty values fall back to the defaults. `recoveryPolls` is +clamped to `>= 1` and `hysteresisMargin` to `>= 0`. + +Thresholds can also be set at runtime in tests/bootstrap via +`setThresholds(warn, critical)` and `setHysteresis(margin, polls)`. + +--- + +## Hysteresis & auto-recovery + +To stop the monitor flapping when lag hovers around a threshold, the monitor +tracks an **effective level** separately from the **instantaneous level** +computed from the raw lag: + +- **Escalation is immediate.** As soon as the raw lag reaches a higher level + (e.g. `lag >= criticalThreshold`), the effective level jumps there. This is + the fail-safe direction — a breach gates writes without delay. +- **De-escalation is sustained.** To clear a level, the raw lag must fall to + the **recovery threshold** (`threshold - hysteresisMargin`) and stay there + for `recoveryPolls` consecutive polls. A single breach anywhere in that + window resets the streak. Recovery steps down **one level at a time** + (`critical → warn → none`) so the `warn` write-guard window is never skipped. + +Recovery thresholds with the defaults: + +- Recover out of `critical` → `warn` when `lag <= 50 - 3 = 47` for 3 polls. +- Recover out of `warn` → `none` when `lag <= 10 - 3 = 7` for 3 polls. + +`getLagStatus()` (used by `/status` and `degradedGuard`) reports the effective +level. It **escalates immediately** when called but **never auto-clears** — only +the scheduled `poll()` path performs de-escalation. This means the many guard +and status calls per interval can raise the level but can never lower it. + +--- + +## Alert events + +Alerts are emitted **only on transitions** of the effective level — never on +every poll. Each transition: + +1. Logs a single structured JSON line (`type: "LAG_ALERT"`), at `WARN` for + escalations and `INFO` for recoveries. +2. Increments in-process counters (see [Metrics](#metrics)). +3. Notifies any subscribers registered via `onAlert(listener)`. + +### Alert payload + +```jsonc +{ + "from": "warn", // level moved away from + "to": "critical", // level moved to + "direction": "escalation", // or "recovery" + "lag": 62, // raw lag at transition (ledgers) + "warnThreshold": 10, + "criticalThreshold": 50, + "at": "2026-06-02T12:00:00.000Z" +} +``` + +> **Security:** Alert payloads carry **only operational fields** — lag, +> thresholds, level, timestamp. They never include request bodies, wallet +> data, auth tokens, or any other secrets. The logged line uses the same +> fixed shape, so no caller-supplied data can leak into log sinks. + +### Subscribing + +```ts +import { lagMonitor } from "../services/lagMonitor"; + +const unsubscribe = lagMonitor.onAlert((event) => { + // forward to PagerDuty / Slack / metrics exporter, etc. +}); +``` + +A throwing subscriber is isolated and never breaks the monitor. + +--- + +## Polling + +`poll()` reads a fresh lag value, advances the hysteresis state machine, and +emits any resulting transition alert. Schedule it on a fixed cadence (mirroring +the invariant scheduler pattern): + +```ts +import { lagMonitor } from "../services/lagMonitor"; + +setInterval(() => { + void lagMonitor.poll(); +}, 5000); +``` + +Do **not** call `poll()` per request — request paths should call the read-only +`getLagStatus()`. + +### Missing / corrupt current-ledger reads + +If the current-ledger read throws, or yields a non-finite or negative lag, the +monitor **fails safe to `critical`** (it returns `lag = criticalThreshold`). +An unknown reading must never silently clear a degraded state or open the write +guard. + +--- + +## Metrics + +`getAlertMetrics()` returns a defensive copy of the in-process counters, +suitable for exposure via the monitoring endpoint or a scraper: + +| Field | Meaning | +| -------------------------- | -------------------------------------------------------- | +| `escalations` | Total escalation transitions observed. | +| `recoveries` | Total recovery transitions observed. | +| `transitionsTo` | Transition count by destination level (`none`/`warn`/`critical`). | +| `currentLevel` | Current effective level. | +| `consecutiveRecoveryPolls` | Consecutive polls the lag has been within recovery range. | + +--- + +## Edge cases (covered by tests) + +See [`src/tests/lagMonitor.alerts.test.ts`](../src/tests/lagMonitor.alerts.test.ts): + +- **Flapping** around a threshold produces no spurious transitions; a single + good poll never clears a degraded state. +- **Sustained breach** holds the level with no duplicate alerts. +- **Rapid recovery** still drains one level per recovery window, preserving the + `warn` guard window. +- **Missing current-ledger read** fails safe to `critical`. diff --git a/backend/src/app.ts b/backend/src/app.ts index 08350047..99d3fb48 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -9,6 +9,7 @@ import { csrfMiddleware } from "./middleware/csrf"; import { corsOptionsDelegate, webhookCorsOptions } from "./config/cors"; import v1Routes from "./routes/v1"; import webhookRoutes from "./routes/webhooks"; +import healthRoutes from "./routes/health"; import { requestLogger } from "./middleware/request-logger"; const app = express(); @@ -47,14 +48,9 @@ app.use("/api/webhooks", cors(webhookCorsOptions), webhookRoutes); app.use(csrfMiddleware); app.use("/api/v1", v1Routes); -// Health check (root level as well if needed) -app.get("/health", (req, res) => { - res.json({ - status: "ok", - version: "1.0.0", - timestamp: new Date().toISOString(), - }); -}); +// Liveness (/health, /livez) and readiness (/readyz) probes. +// Mounted at the root and left unauthenticated so orchestrators can probe them. +app.use(healthRoutes); // 404 handler app.use((req, res) => { diff --git a/backend/src/lib/database.ts b/backend/src/lib/database.ts index 979cb367..6e444921 100644 --- a/backend/src/lib/database.ts +++ b/backend/src/lib/database.ts @@ -77,6 +77,27 @@ export function getStatementCacheStats() { }; } +/** + * Probe database connectivity with a trivial round-trip query. + * + * Used by the readiness endpoint to verify the SQLite connection can both + * open and execute. Returns true on success, false on any failure (a locked, + * corrupt, or unopenable database). Never throws so callers can branch on the + * boolean without their own try/catch. + * + * The query (`SELECT 1`) is constant and parameter-free, so it carries no + * user input and leaks no schema details. + */ +export function pingDatabase(): boolean { + try { + const db = getDatabase(); + const row = db.prepare("SELECT 1 AS ok").get(); + return row?.ok === 1; + } catch { + return false; + } +} + /** * Close the database connection and clear the statement cache. * Ensures clean shutdown and prevents memory leaks. diff --git a/backend/src/routes/health.ts b/backend/src/routes/health.ts new file mode 100644 index 00000000..a1da1199 --- /dev/null +++ b/backend/src/routes/health.ts @@ -0,0 +1,131 @@ +/** + * Liveness and readiness probes. + * + * These endpoints are intentionally mounted at the root of the app (not under + * /api/v1) and are unauthenticated, because container orchestrators (Kubernetes, + * ECS, Nomad, …) probe them without credentials. + * + * Two distinct concerns: + * + * GET /health, GET /livez — Liveness. "Is the process up and able to serve + * an HTTP request at all?" Cheap and dependency-free. A failing liveness + * probe tells the orchestrator to restart the container, so it must NOT + * consult downstream dependencies — a transient DB blip should not trigger + * a restart loop. + * + * GET /readyz — Readiness. "Should this instance receive traffic right now?" + * Probes real dependencies (DB connectivity, ingest lag, webhook queue) and + * returns 503 when any hard dependency is unavailable or when maintenance + * mode is enabled. A failing readiness probe pulls the instance out of the + * load-balancer rotation without restarting it. + * + * Security: responses expose only coarse status enums per sub-system. They do + * not leak internal hostnames, versions, queue depths, ledger numbers, or error + * messages to unauthenticated callers. The richer, authenticated diagnostics + * remain under /api/v1/admin/monitoring. + */ + +import { Router, Request, Response } from "express"; +import { pingDatabase } from "../lib/database"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; + +const router = Router(); + +/** + * Coarse per-dependency status, mirroring the SubStatus pattern used by + * /api/v1/admin/monitoring. "degraded" means serving but impaired (does not + * fail readiness); "unavailable" means the dependency could not be reached + * (fails readiness). + */ +type SubStatus = "ok" | "degraded" | "unavailable"; + +type ReadyStatus = "ready" | "not_ready" | "maintenance"; + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +function liveness(_req: Request, res: Response): void { + res.json({ + status: "ok", + timestamp: new Date().toISOString(), + }); +} + +// Keep the historical /health path as a liveness check, and add the +// conventional /livez alias. +router.get("/health", liveness); +router.get("/livez", liveness); + +// --------------------------------------------------------------------------- +// Readiness +// --------------------------------------------------------------------------- + +router.get("/readyz", async (_req: Request, res: Response) => { + // Maintenance mode short-circuits readiness: the instance is intentionally + // not serving, so it should be pulled from rotation regardless of deps. + if (statusService.isMaintenanceEnabled()) { + res.status(503).json({ + status: "maintenance" as ReadyStatus, + database: "ok" as SubStatus, + ingest: "ok" as SubStatus, + webhookQueue: "ok" as SubStatus, + timestamp: new Date().toISOString(), + }); + return; + } + + // --- Database connectivity (hard dependency) --------------------------- + let database: SubStatus = "ok"; + if (!pingDatabase()) { + database = "unavailable"; + } + + // --- Ingest lag -------------------------------------------------------- + // Reuse the LagMonitor degradation logic. "warn" lag is degraded but still + // serviceable; "critical" lag means the indexed view is too stale to trust, + // so we treat it as unavailable for readiness. + let ingest: SubStatus = "ok"; + try { + const lag = await lagMonitor.getLagStatus(); + if (lag.isCritical) { + ingest = "unavailable"; + } else if (lag.isDegraded) { + ingest = "degraded"; + } + } catch { + ingest = "unavailable"; + } + + // --- Webhook queue health (hard dependency on its backing store) ------- + // A throw here means the queue's store is unreachable. Saturation (queue at + // capacity) is back-pressure, not unreadiness, so it is reported as degraded. + let webhookQueue: SubStatus = "ok"; + try { + const stats = webhookQueueService.getStats(); + if (stats.capacity > 0 && stats.size >= stats.capacity) { + webhookQueue = "degraded"; + } + } catch { + webhookQueue = "unavailable"; + } + + const unavailable = + database === "unavailable" || + ingest === "unavailable" || + webhookQueue === "unavailable"; + + const status: ReadyStatus = unavailable ? "not_ready" : "ready"; + + res.status(unavailable ? 503 : 200).json({ + status, + database, + ingest, + webhookQueue, + timestamp: new Date().toISOString(), + }); +}); + +export default router; diff --git a/backend/src/services/lagMonitor.ts b/backend/src/services/lagMonitor.ts index d6744146..3170c67f 100644 --- a/backend/src/services/lagMonitor.ts +++ b/backend/src/services/lagMonitor.ts @@ -13,9 +13,34 @@ * critically degraded. All mutating * endpoints are blocked. * + * Hysteresis & auto-recovery + * -------------------------- + * To stop the monitor flapping between levels when the lag hovers around a + * threshold, an *effective* level is tracked separately from the + * *instantaneous* level computed from the raw lag: + * + * - To ESCALATE (none→warn, warn→critical) the raw lag must reach the + * upper threshold. + * - To DE-ESCALATE the raw lag must fall below the threshold minus the + * hysteresis margin (the "recovery threshold"), AND it must stay there + * for `recoveryPolls` consecutive polls before the level is cleared. + * + * This means a single good poll never clears a degraded state — recovery is + * explicit and sustained, while escalation is immediate. + * + * Alert events + * ------------ + * Alerts are emitted only on *transitions* of the effective level (via + * `poll()`), never on every poll. Each transition is logged as a single + * structured JSON line and increments an in-process counter. Subscribers + * can register via `onAlert()`. Alert payloads contain only operational + * metrics (lag, thresholds, level) — never request data or secrets. + * * The thresholds can be overridden via environment variables: * LAG_WARN_THRESHOLD (integer, ledgers) * LAG_CRITICAL_THRESHOLD (integer, ledgers) + * LAG_HYSTERESIS_MARGIN (integer, ledgers; default 3) + * LAG_RECOVERY_POLLS (integer, polls; default 3) */ import { statusService } from "./statusService"; @@ -26,6 +51,10 @@ import { statusService } from "./statusService"; export const DEFAULT_WARN_THRESHOLD = 10; export const DEFAULT_CRITICAL_THRESHOLD = 50; +/** Ledgers below a threshold the lag must fall to before de-escalating. */ +export const DEFAULT_HYSTERESIS_MARGIN = 3; +/** Consecutive sub-recovery polls required before a degraded level clears. */ +export const DEFAULT_RECOVERY_POLLS = 3; // --------------------------------------------------------------------------- // Types @@ -40,7 +69,7 @@ export interface LagStatus { warnThreshold: number; /** Threshold at which critical-level degradation begins. */ criticalThreshold: number; - /** Degradation level derived from the current lag. */ + /** Degradation level (hysteresis-aware effective level). */ level: DegradedLevel; /** True when level is "warn" or "critical". */ isDegraded: boolean; @@ -50,6 +79,58 @@ export interface LagStatus { checkedAt: string; } +/** Payload emitted on an effective-level transition. Operational data only. */ +export interface LagAlertEvent { + /** The level the monitor moved away from. */ + from: DegradedLevel; + /** The level the monitor moved to. */ + to: DegradedLevel; + /** "escalation" when severity increased, "recovery" when it decreased. */ + direction: "escalation" | "recovery"; + /** Raw lag (ledgers) at the moment of transition. */ + lag: number; + warnThreshold: number; + criticalThreshold: number; + /** ISO-8601 timestamp of the transition. */ + at: string; +} + +export type LagAlertListener = (event: LagAlertEvent) => void; + +/** In-process counters, surfaced to the monitoring endpoint / scrapers. */ +export interface LagAlertMetrics { + /** Total transitions observed, by direction. */ + escalations: number; + recoveries: number; + /** Transitions broken down by destination level. */ + transitionsTo: Record; + /** Current effective level. */ + currentLevel: DegradedLevel; + /** + * Consecutive polls the raw lag has been at/below the recovery threshold + * for the current effective level. Resets on any breach. + */ + consecutiveRecoveryPolls: number; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Severity rank used to decide escalation vs. recovery. */ +const LEVEL_RANK: Record = { + none: 0, + warn: 1, + critical: 2, +}; + +function parseEnvInt(name: string): number | undefined { + const raw = process.env[name]; + if (raw === undefined || raw === "") return undefined; + const parsed = parseInt(raw, 10); + return Number.isNaN(parsed) ? undefined : parsed; +} + // --------------------------------------------------------------------------- // LagMonitor // --------------------------------------------------------------------------- @@ -59,17 +140,55 @@ export class LagMonitor { private _warnThreshold: number; private _criticalThreshold: number; + private _hysteresisMargin: number; + private _recoveryPolls: number; - constructor(warnThreshold?: number, criticalThreshold?: number) { + /** + * Hysteresis-aware effective level. Escalates immediately on breach; + * de-escalates only after sustained recovery. This is the level reported + * by getLagStatus() and therefore enforced by degradedGuard. + */ + private _effectiveLevel: DegradedLevel = "none"; + /** Consecutive polls the raw lag has been within recovery range. */ + private _recoveryStreak = 0; + + private readonly _listeners = new Set(); + private readonly _metrics: LagAlertMetrics = { + escalations: 0, + recoveries: 0, + transitionsTo: { none: 0, warn: 0, critical: 0 }, + currentLevel: "none", + consecutiveRecoveryPolls: 0, + }; + + constructor( + warnThreshold?: number, + criticalThreshold?: number, + hysteresisMargin?: number, + recoveryPolls?: number + ) { this._warnThreshold = warnThreshold !== undefined ? warnThreshold - : (parseInt(process.env["LAG_WARN_THRESHOLD"] ?? "", 10) || DEFAULT_WARN_THRESHOLD); + : parseEnvInt("LAG_WARN_THRESHOLD") ?? DEFAULT_WARN_THRESHOLD; this._criticalThreshold = criticalThreshold !== undefined ? criticalThreshold - : (parseInt(process.env["LAG_CRITICAL_THRESHOLD"] ?? "", 10) || DEFAULT_CRITICAL_THRESHOLD); + : parseEnvInt("LAG_CRITICAL_THRESHOLD") ?? DEFAULT_CRITICAL_THRESHOLD; + + this._hysteresisMargin = + hysteresisMargin !== undefined + ? hysteresisMargin + : parseEnvInt("LAG_HYSTERESIS_MARGIN") ?? DEFAULT_HYSTERESIS_MARGIN; + + this._recoveryPolls = + recoveryPolls !== undefined + ? recoveryPolls + : parseEnvInt("LAG_RECOVERY_POLLS") ?? DEFAULT_RECOVERY_POLLS; + + if (this._recoveryPolls < 1) this._recoveryPolls = DEFAULT_RECOVERY_POLLS; + if (this._hysteresisMargin < 0) this._hysteresisMargin = 0; } // ------------------------------------------------------------------------- @@ -95,6 +214,19 @@ export class LagMonitor { return this._criticalThreshold; } + get hysteresisMargin(): number { + return this._hysteresisMargin; + } + + get recoveryPolls(): number { + return this._recoveryPolls; + } + + /** Current hysteresis-aware effective level. */ + get effectiveLevel(): DegradedLevel { + return this._effectiveLevel; + } + setThresholds(warn: number, critical: number): void { if (warn <= 0 || critical <= 0) { throw new RangeError("Thresholds must be positive integers"); @@ -108,13 +240,58 @@ export class LagMonitor { this._criticalThreshold = critical; } + /** + * Configure hysteresis behaviour. + * @param margin Ledgers below a threshold the lag must drop to recover. + * @param polls Consecutive recovered polls required before clearing. + */ + setHysteresis(margin: number, polls: number): void { + if (margin < 0) { + throw new RangeError("hysteresisMargin must be >= 0"); + } + if (!Number.isInteger(polls) || polls < 1) { + throw new RangeError("recoveryPolls must be a positive integer"); + } + this._hysteresisMargin = margin; + this._recoveryPolls = polls; + } + + // ------------------------------------------------------------------------- + // Alert subscription / metrics + // ------------------------------------------------------------------------- + + /** Subscribe to transition alerts. Returns an unsubscribe function. */ + onAlert(listener: LagAlertListener): () => void { + this._listeners.add(listener); + return () => this._listeners.delete(listener); + } + + /** Snapshot of in-process alert counters. */ + getAlertMetrics(): LagAlertMetrics { + return { + ...this._metrics, + transitionsTo: { ...this._metrics.transitionsTo }, + }; + } + + /** Reset the state machine and counters. Intended for tests/bootstrap. */ + reset(): void { + this._effectiveLevel = "none"; + this._recoveryStreak = 0; + this._metrics.escalations = 0; + this._metrics.recoveries = 0; + this._metrics.transitionsTo = { none: 0, warn: 0, critical: 0 }; + this._metrics.currentLevel = "none"; + this._metrics.consecutiveRecoveryPolls = 0; + } + // ------------------------------------------------------------------------- // Core computation // ------------------------------------------------------------------------- /** - * Computes the current lag level from a raw lag value. - * Pure function — no I/O. + * Computes the instantaneous lag level from a raw lag value. + * Pure function — no I/O, no hysteresis, no side effects. */ computeLevel(lag: number): DegradedLevel { if (lag >= this._criticalThreshold) return "critical"; @@ -122,14 +299,220 @@ export class LagMonitor { return "none"; } + /** + * The lag value at/below which the system is considered recovered *out of* + * the given level. Recovering from "critical" returns to "warn" territory + * (critical threshold minus margin); recovering from "warn" returns to + * healthy (warn threshold minus margin). Clamped at 0. + */ + private recoveryThresholdFor(level: DegradedLevel): number { + if (level === "critical") { + return Math.max(0, this._criticalThreshold - this._hysteresisMargin); + } + // warn (or none, unused) + return Math.max(0, this._warnThreshold - this._hysteresisMargin); + } + + /** + * Advance the hysteresis state machine by one observation and return the + * resulting effective level. Pure with respect to I/O — it only mutates + * internal state and (on a transition) emits an alert. Safe to call from + * a scheduled poller. + * + * Escalation is immediate: as soon as the raw lag reaches a higher + * instantaneous level, the effective level jumps there and the recovery + * streak resets. + * + * De-escalation requires the raw lag to sit at/below the recovery + * threshold for `recoveryPolls` consecutive observations. A single breach + * anywhere in the window resets the streak. + */ + observe(lag: number, at: string = new Date().toISOString()): DegradedLevel { + const instant = this.computeLevel(lag); + const prev = this._effectiveLevel; + + if (LEVEL_RANK[instant] > LEVEL_RANK[prev]) { + // Escalation — immediate, no dwell required. + this._recoveryStreak = 0; + this._setEffectiveLevel(instant, lag, at); + return this._effectiveLevel; + } + + if (this._effectiveLevel === "none") { + // Healthy and staying healthy (instant is none too). + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + return this._effectiveLevel; + } + + // Currently degraded and lag is not escalating. Check for sustained + // recovery toward the next-lower level. + const recoveryThreshold = this.recoveryThresholdFor(this._effectiveLevel); + + if (lag <= recoveryThreshold) { + this._recoveryStreak += 1; + this._metrics.consecutiveRecoveryPolls = this._recoveryStreak; + if (this._recoveryStreak >= this._recoveryPolls) { + // Step down exactly one level so a deep recovery from critical still + // passes through warn rather than skipping the warn guard window. + const next: DegradedLevel = + this._effectiveLevel === "critical" ? "warn" : "none"; + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + this._setEffectiveLevel(next, lag, at); + // If, after stepping down, the lag is already below the next level's + // recovery threshold, the following poll(s) will continue draining it + // down one level at a time — keeping each transition observable. + } + } else { + // Lag bounced back above the recovery threshold; reset the streak. + this._recoveryStreak = 0; + this._metrics.consecutiveRecoveryPolls = 0; + } + + return this._effectiveLevel; + } + + /** + * Apply a new effective level and emit a transition alert. Caller must have + * already determined `next !== current`. + */ + private _setEffectiveLevel( + next: DegradedLevel, + lag: number, + at: string + ): void { + const from = this._effectiveLevel; + if (next === from) return; + + this._effectiveLevel = next; + this._metrics.currentLevel = next; + + const direction: "escalation" | "recovery" = + LEVEL_RANK[next] > LEVEL_RANK[from] ? "escalation" : "recovery"; + + if (direction === "escalation") this._metrics.escalations += 1; + else this._metrics.recoveries += 1; + this._metrics.transitionsTo[next] += 1; + + const event: LagAlertEvent = { + from, + to: next, + direction, + lag, + warnThreshold: this._warnThreshold, + criticalThreshold: this._criticalThreshold, + at, + }; + + this._emitAlert(event); + } + + /** Log the transition (structured) and notify subscribers. */ + private _emitAlert(event: LagAlertEvent): void { + // Single structured line per transition — operational fields only, + // never request bodies, auth material, or other secrets. + if (process.env["NODE_ENV"] !== "test") { + const line = JSON.stringify({ + level: event.direction === "escalation" ? "WARN" : "INFO", + type: "LAG_ALERT", + event: event.direction, + from: event.from, + to: event.to, + lag: event.lag, + warn_threshold: event.warnThreshold, + critical_threshold: event.criticalThreshold, + timestamp: event.at, + }); + if (event.direction === "escalation") { + console.warn(line); + } else { + console.info(line); + } + } + + for (const listener of this._listeners) { + try { + listener(event); + } catch { + // A misbehaving subscriber must never break the monitor. + } + } + } + + // ------------------------------------------------------------------------- + // Status / polling + // ------------------------------------------------------------------------- + /** * Fetches the current system status and returns a full LagStatus snapshot. + * + * The reported `level` is the **instantaneous** level derived directly from + * the current lag (no hysteresis, no side effects). This preserves the + * historical contract relied on by `/status`, the readiness probe, and + * `degradedGuard` — the snapshot always reflects the lag *right now*, and + * the call neither mutates the state machine nor emits alerts. Hysteresis, + * alerting, and auto-recovery are driven separately by `poll()`. */ async getLagStatus(): Promise { - const status = await statusService.getStatus(); - const lag = status.index_lag; + const lag = await this.readLag(); const level = this.computeLevel(lag); + return this.snapshot(lag, level); + } + + /** + * Advance the hysteresis state machine using a fresh lag reading and emit + * any resulting transition alert. Returns a snapshot whose `level` is the + * hysteresis-aware **effective** level. Call this on a fixed interval (e.g. + * from a scheduler), NOT per request. + * + * This is where threshold breaches become alerts and where degraded-mode + * auto-recovery (sustained-over-N-polls) happens. `getLagStatus()` remains + * instantaneous so per-request consumers are unaffected. + */ + async poll(): Promise { + const lag = await this.readLag(); + const level = this.observe(lag); + return this.snapshot(lag, level); + } + + /** + * Snapshot using the hysteresis-aware effective level without advancing the + * state machine. Useful for a guard that wants auto-recovery semantics + * (degraded stays closed until `poll()` clears it) rather than instantaneous + * lag. Read-only: no alerts, no state change. + */ + async getEffectiveStatus(): Promise { + const lag = await this.readLag(); + return this.snapshot(lag, this._effectiveLevel); + } + + /** + * Read the current lag from statusService. + * + * If the current-ledger read fails or yields a nonsensical (negative / + * non-finite) lag, we treat the indexer as critically lagging rather than + * healthy: a missing reading must never silently clear a degraded state or + * open the write guard. The raw error is swallowed here (it is logged at + * the call site / global handler) so the monitor degrades safely. + */ + private async readLag(): Promise { + try { + const status = await statusService.getStatus(); + const lag = status.index_lag; + if (!Number.isFinite(lag) || lag < 0) { + // Unknown / corrupt reading → fail safe to critical. + return this._criticalThreshold; + } + return lag; + } catch { + // Cannot determine lag → fail safe to critical (block writes). + return this._criticalThreshold; + } + } + /** Build a LagStatus from a raw lag and an already-determined level. */ + private snapshot(lag: number, level: DegradedLevel): LagStatus { return { lag, warnThreshold: this._warnThreshold, @@ -189,4 +572,4 @@ export class LagMonitor { } } -export const lagMonitor = LagMonitor.getInstance(); \ No newline at end of file +export const lagMonitor = LagMonitor.getInstance(); diff --git a/backend/src/tests/lagMonitor.alerts.test.ts b/backend/src/tests/lagMonitor.alerts.test.ts new file mode 100644 index 00000000..af5825dc --- /dev/null +++ b/backend/src/tests/lagMonitor.alerts.test.ts @@ -0,0 +1,526 @@ +/** + * Unit tests for LagMonitor alerting, hysteresis, and degraded-mode + * auto-recovery. Complements lagMonitor.test.ts (which covers the pure + * computeLevel / threshold / singleton behaviour). + * + * Tests cover: + * - Alerts fire only on effective-level transitions, not on every poll + * - Escalation is immediate; de-escalation requires sustained recovery + * - Hysteresis margin prevents flapping around a threshold + * - Sustained breach holds the degraded level + * - Rapid recovery still drains one level at a time (warn guard window kept) + * - Missing / corrupt current-ledger reads fail safe to critical + * - Alert payloads carry only operational fields (no secrets) + * - getLagStatus() escalates immediately but never auto-clears + */ + +import { + LagMonitor, + LagAlertEvent, + DEFAULT_HYSTERESIS_MARGIN, + DEFAULT_RECOVERY_POLLS, +} from "../services/lagMonitor"; +import { statusService } from "../services/statusService"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** warn=10, critical=50, margin=3, recoveryPolls=3 unless overridden. */ +function makeMonitor( + warn = 10, + critical = 50, + margin = 3, + recoveryPolls = 3 +): LagMonitor { + return new LagMonitor(warn, critical, margin, recoveryPolls); +} + +/** Set the mocked lag by adjusting the mock current ledger. */ +function setLag(lag: number): void { + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(100000 + lag); +} + +function collectAlerts(m: LagMonitor): LagAlertEvent[] { + const events: LagAlertEvent[] = []; + m.onAlert((e) => events.push(e)); + return events; +} + +beforeEach(() => { + statusService.setMaintenanceMode(false); + statusService.updateLastIndexedLedger(100000); +}); + +afterEach(() => { + statusService.setMockCurrentLedger(null); +}); + +// --------------------------------------------------------------------------- +// Config / defaults +// --------------------------------------------------------------------------- + +describe("LagMonitor hysteresis config", () => { + it("exposes default hysteresis margin and recovery polls", () => { + const m = makeMonitor(); + expect(DEFAULT_HYSTERESIS_MARGIN).toBe(3); + expect(DEFAULT_RECOVERY_POLLS).toBe(3); + expect(m.hysteresisMargin).toBe(3); + expect(m.recoveryPolls).toBe(3); + }); + + it("reads hysteresis config from env vars", () => { + process.env["LAG_HYSTERESIS_MARGIN"] = "5"; + process.env["LAG_RECOVERY_POLLS"] = "4"; + const m = new LagMonitor(); + expect(m.hysteresisMargin).toBe(5); + expect(m.recoveryPolls).toBe(4); + delete process.env["LAG_HYSTERESIS_MARGIN"]; + delete process.env["LAG_RECOVERY_POLLS"]; + }); + + it("setHysteresis validates inputs", () => { + const m = makeMonitor(); + expect(() => m.setHysteresis(-1, 3)).toThrow(RangeError); + expect(() => m.setHysteresis(3, 0)).toThrow(RangeError); + expect(() => m.setHysteresis(3, 1.5)).toThrow(RangeError); + m.setHysteresis(4, 2); + expect(m.hysteresisMargin).toBe(4); + expect(m.recoveryPolls).toBe(2); + }); + + it("clamps invalid constructor values to safe defaults", () => { + const m = new LagMonitor(10, 50, -5, 0); + expect(m.hysteresisMargin).toBe(0); + expect(m.recoveryPolls).toBe(DEFAULT_RECOVERY_POLLS); + }); +}); + +// --------------------------------------------------------------------------- +// Escalation (immediate) +// --------------------------------------------------------------------------- + +describe("LagMonitor escalation", () => { + it("escalates none→warn immediately on first breaching poll", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(15); + const status = await m.poll(); + + expect(status.level).toBe("warn"); + expect(status.isDegraded).toBe(true); + expect(alerts).toHaveLength(1); + expect(alerts[0]).toMatchObject({ + from: "none", + to: "warn", + direction: "escalation", + lag: 15, + }); + }); + + it("escalates warn→critical immediately", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(15); + await m.poll(); // none -> warn + setLag(60); + const status = await m.poll(); // warn -> critical + + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + expect(alerts.map((a) => a.to)).toEqual(["warn", "critical"]); + }); + + it("can jump none→critical in a single poll", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(80); + const status = await m.poll(); + + expect(status.level).toBe("critical"); + expect(alerts).toHaveLength(1); + expect(alerts[0]).toMatchObject({ from: "none", to: "critical" }); + }); +}); + +// --------------------------------------------------------------------------- +// Alerts fire only on transitions +// --------------------------------------------------------------------------- + +describe("LagMonitor emits alerts only on transitions", () => { + it("does not emit on repeated polls at the same level", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // none -> warn (1 alert) + await m.poll(); // still warn + await m.poll(); // still warn + await m.poll(); // still warn + + expect(alerts).toHaveLength(1); + expect(m.getAlertMetrics().escalations).toBe(1); + }); + + it("does not emit while healthy across many polls", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(2); + await m.poll(); + await m.poll(); + await m.poll(); + + expect(alerts).toHaveLength(0); + expect(m.effectiveLevel).toBe("none"); + }); +}); + +// --------------------------------------------------------------------------- +// De-escalation requires sustained recovery +// --------------------------------------------------------------------------- + +describe("LagMonitor auto-recovery (sustained)", () => { + it("does not clear warn until lag stays below recovery threshold for N polls", async () => { + const m = makeMonitor(10, 50, 3, 3); // recovery threshold for warn = 10-3 = 7 + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + expect(m.effectiveLevel).toBe("warn"); + + setLag(5); // below recovery threshold (7) + await m.poll(); // streak 1 — still warn + expect(m.effectiveLevel).toBe("warn"); + await m.poll(); // streak 2 — still warn + expect(m.effectiveLevel).toBe("warn"); + + const status = await m.poll(); // streak 3 — clears + expect(status.level).toBe("none"); + expect(m.effectiveLevel).toBe("none"); + + const recovery = alerts.filter((a) => a.direction === "recovery"); + expect(recovery).toHaveLength(1); + expect(recovery[0]).toMatchObject({ from: "warn", to: "none" }); + }); + + it("requires lag below the warn recovery threshold, not merely below warn", async () => { + const m = makeMonitor(10, 50, 3, 2); // recovery threshold = 7 + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + + // lag = 8 is below warn (10) but above recovery threshold (7): no progress + setLag(8); + await m.poll(); + await m.poll(); + expect(m.effectiveLevel).toBe("warn"); + expect(alerts.filter((a) => a.direction === "recovery")).toHaveLength(0); + + // drop to recovery range and dwell + setLag(7); + await m.poll(); // streak 1 + await m.poll(); // streak 2 -> clears + expect(m.effectiveLevel).toBe("none"); + }); + + it("steps critical→warn→none one level per recovery window", async () => { + const m = makeMonitor(10, 50, 3, 2); + const alerts = collectAlerts(m); + + setLag(80); + await m.poll(); // -> critical + + // recovery threshold for critical = 50-3 = 47 + setLag(10); + await m.poll(); // streak 1 (critical) + await m.poll(); // streak 2 -> step down to warn + expect(m.effectiveLevel).toBe("warn"); + + // recovery threshold for warn = 10-3 = 7; lag 10 is NOT in range yet + await m.poll(); + await m.poll(); + expect(m.effectiveLevel).toBe("warn"); + + setLag(5); + await m.poll(); // streak 1 (warn) + await m.poll(); // streak 2 -> none + expect(m.effectiveLevel).toBe("none"); + + expect(alerts.map((a) => `${a.from}->${a.to}`)).toEqual([ + "none->critical", + "critical->warn", + "warn->none", + ]); + }); +}); + +// --------------------------------------------------------------------------- +// Flapping around a threshold +// --------------------------------------------------------------------------- + +describe("LagMonitor flapping resistance", () => { + it("a single good poll does not clear a degraded state (anti-flap)", async () => { + const m = makeMonitor(10, 50, 3, 3); + const alerts = collectAlerts(m); + + setLag(20); + await m.poll(); // -> warn + + // oscillate around the recovery threshold + setLag(5); // streak 1 + await m.poll(); + setLag(20); // breach again -> reset streak, still warn (no new alert) + await m.poll(); + setLag(5); // streak 1 again + await m.poll(); + setLag(11); // above recovery -> reset + await m.poll(); + + expect(m.effectiveLevel).toBe("warn"); + // Only the original escalation alert; flapping produced no transitions. + expect(alerts).toHaveLength(1); + expect(alerts[0].direction).toBe("escalation"); + }); + + it("resets recovery streak the moment lag bounces above recovery threshold", async () => { + const m = makeMonitor(10, 50, 3, 3); + setLag(20); + await m.poll(); // -> warn + setLag(5); + await m.poll(); // streak 1 + await m.poll(); // streak 2 + expect(m.getAlertMetrics().consecutiveRecoveryPolls).toBe(2); + + setLag(15); // bounce above recovery + await m.poll(); + expect(m.getAlertMetrics().consecutiveRecoveryPolls).toBe(0); + expect(m.effectiveLevel).toBe("warn"); + }); +}); + +// --------------------------------------------------------------------------- +// Sustained breach +// --------------------------------------------------------------------------- + +describe("LagMonitor sustained breach", () => { + it("holds critical across a long breach with no duplicate alerts", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + + setLag(120); + for (let i = 0; i < 10; i++) await m.poll(); + + expect(m.effectiveLevel).toBe("critical"); + expect(alerts).toHaveLength(1); + expect(alerts[0].to).toBe("critical"); + }); +}); + +// --------------------------------------------------------------------------- +// Missing / corrupt current-ledger read +// --------------------------------------------------------------------------- + +describe("LagMonitor missing current-ledger read", () => { + it("fails safe to critical when statusService throws", async () => { + const m = makeMonitor(); + const spy = jest + .spyOn(statusService, "getStatus") + .mockRejectedValueOnce(new Error("rpc unavailable")); + + const status = await m.poll(); + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + expect(status.lag).toBe(m.criticalThreshold); + + spy.mockRestore(); + }); + + it("fails safe to critical on a negative / nonsensical lag", async () => { + const m = makeMonitor(); + // current ledger behind last-indexed → negative lag + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(99990); + + const status = await m.poll(); + expect(status.level).toBe("critical"); + expect(status.lag).toBe(m.criticalThreshold); + }); + + it("a failed read does not silently clear an existing degraded state", async () => { + const m = makeMonitor(); + setLag(20); + await m.poll(); // -> warn + + const spy = jest + .spyOn(statusService, "getStatus") + .mockRejectedValueOnce(new Error("rpc unavailable")); + const status = await m.poll(); + // read failed → treated as critical, which is an escalation, never a clear + expect(status.level).toBe("critical"); + spy.mockRestore(); + }); +}); + +// --------------------------------------------------------------------------- +// getLagStatus contract (instantaneous, side-effect free) +// --------------------------------------------------------------------------- + +describe("LagMonitor.getLagStatus contract", () => { + it("reflects the current lag instantaneously (no poll required)", async () => { + const m = makeMonitor(); + setLag(60); + const status = await m.getLagStatus(); + expect(status.level).toBe("critical"); + expect(status.isCritical).toBe(true); + }); + + it("is side-effect free: never advances the state machine or emits alerts", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + setLag(60); + await m.getLagStatus(); + await m.getLagStatus(); + await m.getLagStatus(); + expect(alerts).toHaveLength(0); + expect(m.effectiveLevel).toBe("none"); + }); + + it("tracks lag down again immediately (instantaneous, unlike effective level)", async () => { + const m = makeMonitor(); + setLag(60); + expect((await m.getLagStatus()).level).toBe("critical"); + setLag(0); + expect((await m.getLagStatus()).level).toBe("none"); + }); + + it("getEffectiveStatus reflects hysteresis without advancing it", async () => { + const m = makeMonitor(); + setLag(60); + await m.poll(); // effective -> critical + setLag(0); // lag healthy again, but no sustained recovery yet + const eff = await m.getEffectiveStatus(); + expect(eff.level).toBe("critical"); + expect(eff.lag).toBe(0); + // getEffectiveStatus did not advance the machine + expect(m.effectiveLevel).toBe("critical"); + }); + + it("preserves the LagStatus shape", async () => { + const m = makeMonitor(); + setLag(5); + const status = await m.getLagStatus(); + expect(Object.keys(status).sort()).toEqual( + [ + "checkedAt", + "criticalThreshold", + "isCritical", + "isDegraded", + "lag", + "level", + "warnThreshold", + ].sort() + ); + expect(status.checkedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); +}); + +// --------------------------------------------------------------------------- +// Alert payload safety (no secrets) +// --------------------------------------------------------------------------- + +describe("LagMonitor alert payload safety", () => { + it("alert payload contains only operational fields", async () => { + const m = makeMonitor(); + const alerts = collectAlerts(m); + setLag(60); + await m.poll(); + + expect(alerts).toHaveLength(1); + expect(Object.keys(alerts[0]).sort()).toEqual( + [ + "at", + "criticalThreshold", + "direction", + "from", + "lag", + "to", + "warnThreshold", + ].sort() + ); + // No string field should resemble a secret/token/wallet/auth value. + const serialized = JSON.stringify(alerts[0]); + expect(serialized).not.toMatch(/secret|token|password|signature|apikey|authorization/i); + }); +}); + +// --------------------------------------------------------------------------- +// Metrics & subscription lifecycle +// --------------------------------------------------------------------------- + +describe("LagMonitor metrics & subscriptions", () => { + it("tracks escalation/recovery counts and transitionsTo", async () => { + const m = makeMonitor(10, 50, 3, 1); // recover after a single good poll + setLag(60); + await m.poll(); // none->critical + setLag(0); + await m.poll(); // critical->warn + await m.poll(); // warn->none + + const metrics = m.getAlertMetrics(); + expect(metrics.escalations).toBe(1); + expect(metrics.recoveries).toBe(2); + expect(metrics.transitionsTo.critical).toBe(1); + expect(metrics.transitionsTo.warn).toBe(1); + expect(metrics.transitionsTo.none).toBe(1); + expect(metrics.currentLevel).toBe("none"); + }); + + it("getAlertMetrics returns a defensive copy", async () => { + const m = makeMonitor(); + const snap = m.getAlertMetrics(); + snap.escalations = 999; + snap.transitionsTo.warn = 999; + expect(m.getAlertMetrics().escalations).toBe(0); + expect(m.getAlertMetrics().transitionsTo.warn).toBe(0); + }); + + it("unsubscribe stops further alert delivery", async () => { + const m = makeMonitor(); + const events: LagAlertEvent[] = []; + const unsub = m.onAlert((e) => events.push(e)); + + setLag(20); + await m.poll(); // -> warn (delivered) + unsub(); + setLag(60); + await m.poll(); // -> critical (not delivered) + + expect(events).toHaveLength(1); + expect(events[0].to).toBe("warn"); + }); + + it("a throwing listener never breaks the monitor", async () => { + const m = makeMonitor(); + m.onAlert(() => { + throw new Error("boom"); + }); + setLag(60); + await expect(m.poll()).resolves.toMatchObject({ level: "critical" }); + }); + + it("reset() clears state and counters", async () => { + const m = makeMonitor(); + setLag(60); + await m.poll(); + m.reset(); + expect(m.effectiveLevel).toBe("none"); + expect(m.getAlertMetrics().escalations).toBe(0); + expect(m.getAlertMetrics().currentLevel).toBe("none"); + }); +}); diff --git a/backend/src/tests/readiness.test.ts b/backend/src/tests/readiness.test.ts new file mode 100644 index 00000000..68dec63f --- /dev/null +++ b/backend/src/tests/readiness.test.ts @@ -0,0 +1,257 @@ +/** + * Liveness and readiness probe tests. + * + * Covers: + * - Liveness (/health, /livez) is cheap, always 200, dependency-free. + * - Readiness (/readyz) probes DB connectivity, ingest lag, and the webhook + * queue, and honours maintenance mode. + * - Edge cases: DB down, high (critical) lag, maintenance mode, partial + * dependency failure, queue saturation. + * - Security: probes do not leak internal hostnames, versions, or error + * details to unauthenticated callers. + */ + +import express from "express"; +import supertest from "supertest"; +import healthRoutes from "../routes/health"; +import { statusService } from "../services/statusService"; +import { lagMonitor } from "../services/lagMonitor"; +import { webhookQueueService } from "../services/webhookQueueService"; +import * as database from "../lib/database"; + +// Mount the health router the same way app.ts does: at the root, with no auth. +// Probes are unauthenticated, so no X-API-Key header is sent anywhere here. +// (We mount the router in isolation rather than importing the full app so the +// probe behaviour is exercised independently of the rest of the route graph.) +const app = express(); +app.use(express.json()); +app.use(healthRoutes); + +const HEALTHY_QUEUE_STATS = { + depth: 0, + size: 0, + capacity: 5000, + overflowCount: 0, + pendingCount: 0, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, +}; + +beforeEach(() => { + // Healthy baseline: maintenance off, lag well under the warn threshold. + statusService.setMaintenanceMode(false); + statusService.updateLastIndexedLedger(100000); + statusService.setMockCurrentLedger(100002); // lag = 2 + + // The test database has no webhook_queue schema, so stub the queue stats to + // a healthy value by default. Individual tests override this to exercise the + // saturated / unavailable paths. This keeps the suite focused on probe logic + // rather than queue persistence (covered by webhookQueue.persist.test.ts). + jest + .spyOn(webhookQueueService, "getStats") + .mockReturnValue(HEALTHY_QUEUE_STATS as any); +}); + +afterEach(() => { + statusService.setMaintenanceMode(false); + statusService.setMockCurrentLedger(null); + jest.restoreAllMocks(); +}); + +// --------------------------------------------------------------------------- +// Liveness +// --------------------------------------------------------------------------- + +describe("Liveness probe", () => { + it("GET /health returns 200 with status ok", async () => { + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("GET /livez returns 200 with status ok", async () => { + const res = await supertest(app).get("/livez"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness stays up even when a dependency is down", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/health"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ok"); + }); + + it("liveness is dependency-free (does not call pingDatabase)", async () => { + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/livez"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — happy path +// --------------------------------------------------------------------------- + +describe("Readiness probe — ready", () => { + it("GET /readyz returns 200 when all dependencies are healthy", async () => { + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.database).toBe("ok"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + expect(res.body.timestamp).toMatch(/^\d{4}-\d{2}-\d{2}T/); + }); + + it("stays ready with warn-level (degraded) lag", async () => { + statusService.setMockCurrentLedger(100020); // lag = 20, >= warn(10), < critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — maintenance mode +// --------------------------------------------------------------------------- + +describe("Readiness probe — maintenance mode", () => { + it("returns 503 with maintenance status when maintenance is enabled", async () => { + statusService.setMaintenanceMode(true); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("maintenance"); + }); + + it("short-circuits before probing dependencies", async () => { + statusService.setMaintenanceMode(true); + const spy = jest.spyOn(database, "pingDatabase"); + await supertest(app).get("/readyz"); + expect(spy).not.toHaveBeenCalled(); + }); +}); + +// --------------------------------------------------------------------------- +// Readiness — dependency failures +// --------------------------------------------------------------------------- + +describe("Readiness probe — DB down", () => { + it("returns 503 not_ready when the database is unreachable", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + }); +}); + +describe("Readiness probe — high lag", () => { + it("returns 503 not_ready when ingest lag is critical", async () => { + statusService.setMockCurrentLedger(100100); // lag = 100, >= critical(50) + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.ingest).toBe("unavailable"); + }); + + it("returns 503 not_ready when the lag probe throws", async () => { + jest.spyOn(lagMonitor, "getLagStatus").mockRejectedValue(new Error("rpc down")); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.ingest).toBe("unavailable"); + }); +}); + +describe("Readiness probe — webhook queue", () => { + it("returns 503 not_ready when the queue store is unreachable", async () => { + jest.spyOn(webhookQueueService, "getStats").mockImplementation(() => { + throw new Error("queue store unavailable"); + }); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.webhookQueue).toBe("unavailable"); + }); + + it("stays ready (degraded) when the queue is saturated", async () => { + jest.spyOn(webhookQueueService, "getStats").mockReturnValue({ + depth: 5000, + size: 5000, + capacity: 5000, + overflowCount: 3, + pendingCount: 5000, + successCount: 0, + failureCount: 0, + oldestTimestamp: null, + } as any); + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(200); + expect(res.body.status).toBe("ready"); + expect(res.body.webhookQueue).toBe("degraded"); + }); +}); + +describe("Readiness probe — partial dependency failure", () => { + it("a single unavailable dependency fails readiness while others stay ok", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100002); // lag = 2, healthy + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.status).toBe("not_ready"); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("ok"); + expect(res.body.webhookQueue).toBe("ok"); + }); + + it("degraded lag plus DB down still reports both sub-statuses", async () => { + jest.spyOn(database, "pingDatabase").mockReturnValue(false); + statusService.setMockCurrentLedger(100020); // lag = 20, degraded + const res = await supertest(app).get("/readyz"); + expect(res.status).toBe(503); + expect(res.body.database).toBe("unavailable"); + expect(res.body.ingest).toBe("degraded"); + }); +}); + +// --------------------------------------------------------------------------- +// Security — no information leakage to unauthenticated callers +// --------------------------------------------------------------------------- + +describe("Readiness probe — does not leak internal details", () => { + const sub = ["ok", "degraded", "unavailable"]; + + it("readiness response contains only coarse status fields", async () => { + const res = await supertest(app).get("/readyz"); + expect(Object.keys(res.body).sort()).toEqual( + ["database", "ingest", "status", "timestamp", "webhookQueue"].sort() + ); + // No version, hostname, ledger numbers, queue depths, or error strings. + expect(res.body).not.toHaveProperty("version"); + expect(res.body).not.toHaveProperty("host"); + expect(res.body).not.toHaveProperty("lag"); + expect(res.body).not.toHaveProperty("error"); + expect(sub).toContain(res.body.database); + expect(sub).toContain(res.body.ingest); + expect(sub).toContain(res.body.webhookQueue); + }); + + it("does not surface the underlying error message when a dependency throws", async () => { + jest + .spyOn(lagMonitor, "getLagStatus") + .mockRejectedValue(new Error("postgres://secret-host:5432 refused")); + const res = await supertest(app).get("/readyz"); + const serialized = JSON.stringify(res.body); + expect(serialized).not.toContain("secret-host"); + expect(serialized).not.toContain("postgres"); + }); + + it("liveness response contains only status and timestamp", async () => { + const res = await supertest(app).get("/health"); + expect(Object.keys(res.body).sort()).toEqual(["status", "timestamp"].sort()); + expect(res.body).not.toHaveProperty("version"); + }); +});