diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 87ced66a..2d74dc1a 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -10,7 +10,7 @@ import { buildCheckMetricLabels } from "../metrics-prometheus/check-metric-label import type { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; import type { ProviderDataSetResponse } from "../pdp-subgraph/types.js"; import type { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; -import { DataRetentionService } from "./data-retention.service.js"; +import { DataRetentionDependencyError, DataRetentionService } from "./data-retention.service.js"; const PROVIDER_A = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" as const; const PROVIDER_B = "0xab5801a7d398351b8be11c439e05c5b3259aec9b" as const; @@ -155,13 +155,12 @@ describe("DataRetentionService", () => { ); }); - it("returns early when pdpSubgraphEndpoint is empty", async () => { + it("throws early when pdpSubgraphEndpoint is empty", async () => { (configServiceMock.get as ReturnType).mockReturnValue({ pdpSubgraphEndpoint: "", }); - await service.pollDataRetention(); - + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled(); }); @@ -369,10 +368,61 @@ describe("DataRetentionService", () => { }); }); - it("catches and logs errors without rethrowing", async () => { + it("fails the job when the subgraph query is unavailable", async () => { pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("subgraph down")); - // Should not throw + // A dependency outage must surface as a job failure, not a silent success. + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + }); + + it("fails the job when fetching subgraph meta fails", async () => { + pdpSubgraphServiceMock.fetchSubgraphMeta.mockRejectedValueOnce(new Error("subgraph meta down")); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled(); + }); + + it("preserves the original error type for unexpected (non-dependency) failures", async () => { + // A logic/programming error must not be mislabeled as a dependency outage. + const bug = new TypeError("unexpected bug"); + walletSdkServiceMock.getTestingProviders.mockImplementationOnce(() => { + throw bug; + }); + + await expect(service.pollDataRetention()).rejects.toBe(bug); + }); + + it("fails the job when baselines cannot be loaded from the database", async () => { + mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + // Aborts before touching the subgraph. + expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); + }); + + it("fails the job when the subgraph endpoint is missing in production", async () => { + (configServiceMock.get as ReturnType).mockImplementation((key: string) => { + if (key === "blockchain") return { pdpSubgraphEndpoint: "" }; + if (key === "app") return { env: "production" }; + if (key === "spBlocklists") return { ids: new Set(), addresses: new Set() }; + return undefined; + }); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + }); + + it("stays a success when the provider set is empty but healthy", async () => { + walletSdkServiceMock.getTestingProviders.mockReturnValueOnce([]); + + await expect(service.pollDataRetention()).resolves.toBeUndefined(); + }); + + it("stays a success when a single provider fails to process (transient per-provider failure)", async () => { + // Provider returned from subgraph but absent from the local cache rejects in processing, + // which is a transient per-provider failure and must not fail the whole job. + const PROVIDER_C = "0x1234567890123456789012345678901234567890"; + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_C })]); + await expect(service.pollDataRetention()).resolves.toBeUndefined(); }); @@ -489,7 +539,7 @@ describe("DataRetentionService", () => { expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25); }); - it("continues processing next batch if one batch fails", async () => { + it("processes all remaining batches before failing when one batch fails", async () => { const manyProviders = Array.from({ length: 75 }, (_, i) => ({ id: i + 1, serviceProvider: `0x${i.toString().padStart(40, "0")}`, @@ -503,7 +553,9 @@ describe("DataRetentionService", () => { .mockRejectedValueOnce(new Error("Subgraph timeout")) .mockResolvedValueOnce([]); - await service.pollDataRetention(); + // The poll attempts every batch so healthy data is recorded, then fails the job + // because the subgraph dependency errored. + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); // Both batches should be attempted expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2); @@ -823,10 +875,10 @@ describe("DataRetentionService", () => { { id: 2, serviceProvider: PROVIDER_B, name: "Provider B", isApproved: false }, ]); - // Simulate processing error + // Simulate a subgraph query failure pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("Processing failed")); - await service.pollDataRetention(); + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); // Should NOT attempt cleanup due to processing errors expect(mockSPRepository.find).not.toHaveBeenCalled(); @@ -1005,8 +1057,8 @@ describe("DataRetentionService", () => { pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValue([makeProvider()]); - // First poll: DB load fails, poll bails out to avoid emitting bloated values - await service.pollDataRetention(); + // First poll: DB load fails, poll fails the job to avoid emitting bloated values + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); expect(mockBaselineRepository.find).toHaveBeenCalledTimes(1); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); expect(counterMock.labels).not.toHaveBeenCalled(); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index c6ece7b5..97cc0555 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -12,10 +12,22 @@ import { DataRetentionBaseline } from "../database/entities/data-retention-basel import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; import { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; -import { type ProviderDataSetResponse } from "../pdp-subgraph/types.js"; +import { type ProviderDataSetResponse, type SubgraphMeta } from "../pdp-subgraph/types.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { type PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js"; +/** + * Thrown when the data-retention check cannot run because one of its dependencies + * (the PDP subgraph or the persisted baselines) is unavailable. Transient per-provider + * failures do NOT raise this — they leave the job a success with partial results. + */ +export class DataRetentionDependencyError extends Error { + constructor(message: string, options?: { cause?: unknown }) { + super(message, options); + this.name = "DataRetentionDependencyError"; + } +} + type ProviderBaseline = { faultedPeriods: bigint; successPeriods: bigint; @@ -61,26 +73,37 @@ export class DataRetentionService { async pollDataRetention(): Promise { const pdpSubgraphEndpoint = this.configService.get("blockchain").pdpSubgraphEndpoint; if (!pdpSubgraphEndpoint) { - this.logger.warn({ + this.logger.error({ event: "pdp_subgraph_endpoint_not_configured", message: "No PDP subgraph endpoint configured", }); - return; + throw new DataRetentionDependencyError("PDP subgraph endpoint not configured"); } const baselines = await this.loadBaselinesFromDb(); if (baselines === null) { - // Cannot safely compute deltas without persisted baselines. - return; + // Cannot safely compute deltas without persisted baselines. DB dependency is unavailable. + throw new DataRetentionDependencyError("Failed to load data retention baselines from database"); } + // A subgraph query failure means the check could not run against its dependency, its a job failure. + let subgraphQueryFailed = false; + try { - const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + let subgraphMeta: SubgraphMeta; + try { + subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + } catch (error) { + // The subgraph is a hard dependency for the poll; label this precisely so the + // outer catch (which now preserves error type) rethrows it as a dependency failure. + throw new DataRetentionDependencyError("Failed to fetch PDP subgraph meta", { cause: error }); + } const allProviderInfos = this.walletSdkService.getTestingProviders(); const spBlocklists = this.configService.get("spBlocklists"); const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); if (!providerInfos || providerInfos.length === 0) { + // An empty-but-healthy provider set is a successful no-op poll, not a failure. this.logger.warn({ event: "no_testing_providers_configured", message: "No testing providers configured", @@ -170,6 +193,7 @@ export class DataRetentionService { ); } catch (error) { hasProcessingErrors = true; + subgraphQueryFailed = true; this.logger.error({ event: "provider_batch_fetch_failed", message: "Failed to fetch batch", @@ -195,6 +219,15 @@ export class DataRetentionService { message: "Failed to poll data retention", error: toStructuredError(error), }); + // Preserve the original failure type: genuine dependency failures are already + // DataRetentionDependencyError, while anything else (e.g. a logic error) must not be + // mislabeled as a dependency outage. Wrap only non-Error throwables. + throw error instanceof Error ? error : new Error(String(error)); + } + + // Fail the job once the poll has recorded what it could. + if (subgraphQueryFailed) { + throw new DataRetentionDependencyError("PDP subgraph query failed for one or more provider batches"); } } diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 957ce65a..a3531e9b 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -668,7 +668,11 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { message: "Chain integration disabled; skipping provider refresh job.", }); } else { - await this.walletSdkService.loadProviders(); + // loadProviders() swallows on-chain failures and returns false, that is a job failure. + const loaded = await this.walletSdkService.loadProviders(); + if (!loaded) { + throw new Error("Provider refresh failed: unable to load providers from on-chain registry"); + } } await this.updateStorageProviderGauges(); return "success"; diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 2ec9401e..470345c5 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -87,10 +87,9 @@ export class WalletSdkService implements OnModuleInit { * This allows dealbot to test all FWSS SPs, even those not yet approved * Only loads active, approved providers that support the PDP product */ - async loadProviders(): Promise { + async loadProviders(): Promise { if (this.providersLoadPromise) { - await this.providersLoadPromise; - return; + return this.providersLoadPromise; } this.providersLoadPromise = this.loadProvidersInternal(); @@ -99,6 +98,7 @@ export class WalletSdkService implements OnModuleInit { if (success) { this.providersLoadedOnce = true; } + return success; } finally { this.providersLoadPromise = null; }