From 177f002474e2d69071d77df4c588574e8bc93999 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 3 Jun 2026 22:51:34 +0530 Subject: [PATCH 1/3] fix: don't swallow check dependency failure --- .../data-retention.service.spec.ts | 66 ++++- .../data-retention/data-retention.service.ts | 257 ++++++++++-------- apps/backend/src/jobs/jobs.service.ts | 6 +- .../src/wallet-sdk/wallet-sdk.service.ts | 6 +- 4 files changed, 203 insertions(+), 132 deletions(-) diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 87ced66a..269521dc 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -10,7 +10,7 @@ import { buildCheckMetricLabels } from "../metrics-prometheus/check-metric-label import type { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; import type { ProviderDataSetResponse } from "../pdp-subgraph/types.js"; import type { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; -import { DataRetentionService } from "./data-retention.service.js"; +import { DataRetentionDependencyError, DataRetentionService } from "./data-retention.service.js"; const PROVIDER_A = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" as const; const PROVIDER_B = "0xab5801a7d398351b8be11c439e05c5b3259aec9b" as const; @@ -155,13 +155,12 @@ describe("DataRetentionService", () => { ); }); - it("returns early when pdpSubgraphEndpoint is empty", async () => { + it("throws early when pdpSubgraphEndpoint is empty", async () => { (configServiceMock.get as ReturnType).mockReturnValue({ pdpSubgraphEndpoint: "", }); - await service.pollDataRetention(); - + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled(); }); @@ -369,10 +368,51 @@ describe("DataRetentionService", () => { }); }); - it("catches and logs errors without rethrowing", async () => { + it("fails the job when the subgraph query is unavailable", async () => { pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("subgraph down")); - // Should not throw + // A dependency outage must surface as a job failure, not a silent success. + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + }); + + it("fails the job when fetching subgraph meta fails", async () => { + pdpSubgraphServiceMock.fetchSubgraphMeta.mockRejectedValueOnce(new Error("subgraph meta down")); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled(); + }); + + it("fails the job when baselines cannot be loaded from the database", async () => { + mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + // Aborts before touching the subgraph. + expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); + }); + + it("fails the job when the subgraph endpoint is missing in production", async () => { + (configServiceMock.get as ReturnType).mockImplementation((key: string) => { + if (key === "blockchain") return { pdpSubgraphEndpoint: "" }; + if (key === "app") return { env: "production" }; + if (key === "spBlocklists") return { ids: new Set(), addresses: new Set() }; + return undefined; + }); + + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); + }); + + it("stays a success when the provider set is empty but healthy", async () => { + walletSdkServiceMock.getTestingProviders.mockReturnValueOnce([]); + + await expect(service.pollDataRetention()).resolves.toBeUndefined(); + }); + + it("stays a success when a single provider fails to process (transient per-provider failure)", async () => { + // Provider returned from subgraph but absent from the local cache rejects in processing, + // which is a transient per-provider failure and must not fail the whole job. + const PROVIDER_C = "0x1234567890123456789012345678901234567890"; + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_C })]); + await expect(service.pollDataRetention()).resolves.toBeUndefined(); }); @@ -489,7 +529,7 @@ describe("DataRetentionService", () => { expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25); }); - it("continues processing next batch if one batch fails", async () => { + it("processes all remaining batches before failing when one batch fails", async () => { const manyProviders = Array.from({ length: 75 }, (_, i) => ({ id: i + 1, serviceProvider: `0x${i.toString().padStart(40, "0")}`, @@ -503,7 +543,9 @@ describe("DataRetentionService", () => { .mockRejectedValueOnce(new Error("Subgraph timeout")) .mockResolvedValueOnce([]); - await service.pollDataRetention(); + // The poll attempts every batch so healthy data is recorded, then fails the job + // because the subgraph dependency errored. + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); // Both batches should be attempted expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2); @@ -823,10 +865,10 @@ describe("DataRetentionService", () => { { id: 2, serviceProvider: PROVIDER_B, name: "Provider B", isApproved: false }, ]); - // Simulate processing error + // Simulate a subgraph query failure pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("Processing failed")); - await service.pollDataRetention(); + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); // Should NOT attempt cleanup due to processing errors expect(mockSPRepository.find).not.toHaveBeenCalled(); @@ -1005,8 +1047,8 @@ describe("DataRetentionService", () => { pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValue([makeProvider()]); - // First poll: DB load fails, poll bails out to avoid emitting bloated values - await service.pollDataRetention(); + // First poll: DB load fails, poll fails the job to avoid emitting bloated values + await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError); expect(mockBaselineRepository.find).toHaveBeenCalledTimes(1); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled(); expect(counterMock.labels).not.toHaveBeenCalled(); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index c6ece7b5..accea2d0 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -12,10 +12,22 @@ import { DataRetentionBaseline } from "../database/entities/data-retention-basel import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; import { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; -import { type ProviderDataSetResponse } from "../pdp-subgraph/types.js"; +import { type ProviderDataSetResponse, type SubgraphMeta } from "../pdp-subgraph/types.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { type PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js"; +/** + * Thrown when the data-retention check cannot run because one of its dependencies + * (the PDP subgraph or the persisted baselines) is unavailable. Transient per-provider + * failures do NOT raise this — they leave the job a success with partial results. + */ +export class DataRetentionDependencyError extends Error { + constructor(message: string, options?: { cause?: unknown }) { + super(message, options); + this.name = "DataRetentionDependencyError"; + } +} + type ProviderBaseline = { faultedPeriods: bigint; successPeriods: bigint; @@ -61,141 +73,154 @@ export class DataRetentionService { async pollDataRetention(): Promise { const pdpSubgraphEndpoint = this.configService.get("blockchain").pdpSubgraphEndpoint; if (!pdpSubgraphEndpoint) { - this.logger.warn({ + this.logger.error({ event: "pdp_subgraph_endpoint_not_configured", message: "No PDP subgraph endpoint configured", }); - return; + throw new DataRetentionDependencyError("PDP subgraph endpoint is not configured"); } const baselines = await this.loadBaselinesFromDb(); if (baselines === null) { - // Cannot safely compute deltas without persisted baselines. - return; + // Cannot safely compute deltas without persisted baselines. The DB dependency is unavailable. + throw new DataRetentionDependencyError("Failed to load data-retention baselines from database"); } + let subgraphMeta: SubgraphMeta; try { - const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); - const allProviderInfos = this.walletSdkService.getTestingProviders(); - const spBlocklists = this.configService.get("spBlocklists"); - const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); - - if (!providerInfos || providerInfos.length === 0) { - this.logger.warn({ - event: "no_testing_providers_configured", - message: "No testing providers configured", - }); - return; - } + subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + } catch (error) { + this.logger.error({ + event: "data_retention_poll_failed", + message: "Failed to fetch subgraph meta", + error: toStructuredError(error), + }); + throw new DataRetentionDependencyError("Failed to fetch PDP subgraph meta", { cause: error }); + } - const blockNumber = subgraphMeta._meta.block.number; - const blockNumberBigInt = BigInt(blockNumber); - // Create snapshot of provider cache to avoid race condition if loadProviders() clears cache - // Normalize addresses to lowercase for consistent lookups - const providerInfoMap = new Map(providerInfos.map((info) => [info.serviceProvider.toLowerCase(), info])); - const providerAddresses = Array.from(providerInfoMap.keys()); + const allProviderInfos = this.walletSdkService.getTestingProviders(); + const spBlocklists = this.configService.get("spBlocklists"); + const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); - let hasProcessingErrors = false; + if (!providerInfos || providerInfos.length === 0) { + // An empty-but-healthy provider set is a successful no-op poll, not a failure. + this.logger.warn({ + event: "no_testing_providers_configured", + message: "No testing providers configured", + }); + return; + } - for (let i = 0; i < providerAddresses.length; i += DataRetentionService.MAX_PROVIDER_BATCH_LENGTH) { - const batchAddresses = providerAddresses.slice( - i, - Math.min(providerAddresses.length, i + DataRetentionService.MAX_PROVIDER_BATCH_LENGTH), - ); + const blockNumber = subgraphMeta._meta.block.number; + const blockNumberBigInt = BigInt(blockNumber); + // Create snapshot of provider cache to avoid race condition if loadProviders() clears cache + // Normalize addresses to lowercase for consistent lookups + const providerInfoMap = new Map(providerInfos.map((info) => [info.serviceProvider.toLowerCase(), info])); + const providerAddresses = Array.from(providerInfoMap.keys()); + + // Per-provider processing failures are transient and keep the job a success. + let hasProcessingErrors = false; + // A subgraph query failure means the check could not run against its dependency, its a job failure. + let subgraphQueryFailed = false; + + for (let i = 0; i < providerAddresses.length; i += DataRetentionService.MAX_PROVIDER_BATCH_LENGTH) { + const batchAddresses = providerAddresses.slice( + i, + Math.min(providerAddresses.length, i + DataRetentionService.MAX_PROVIDER_BATCH_LENGTH), + ); - try { - const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ - blockNumber, - addresses: batchAddresses, - }); + try { + const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ + blockNumber, + addresses: batchAddresses, + }); - // Process providers in parallel - const processingResults = await Promise.allSettled( - providersFromSubgraph.map((provider) => { - const providerInfo = providerInfoMap.get(provider.address.toLowerCase()); - if (!providerInfo) { - return Promise.reject( - new Error( - `Provider ${provider.address} returned from subgraph but not found in local cache - data inconsistency`, - ), - ); - } - return this.processProvider(provider, providerInfo, blockNumberBigInt, baselines); - }), - ); - - await Promise.all( - processingResults.map(async (result, index) => { - if (result.status === "rejected") { - hasProcessingErrors = true; - const addr = providersFromSubgraph[index].address; - const providerInfo = providerInfoMap.get(addr.toLowerCase()); - this.logger.error({ - event: "provider_processing_failed", - message: "Failed to process provider", - providerAddress: addr, - providerId: providerInfo?.id, - providerName: providerInfo?.name, - error: toStructuredError(result.reason), - }); - return; - } - - try { - await this.persistBaseline(result.value.providerAddress, result.value.baseline, blockNumberBigInt); - } catch (error) { - hasProcessingErrors = true; - // Leave stale cleanup for a later poll so DB-backed baselines and local state do not diverge further. - this.logger.warn({ - event: "baseline_persist_failed", - message: "Failed to persist baseline to database", - providerAddress: result.value.providerAddress, - error: toStructuredError(error), - }); - return; - } - - try { - this.applyPersistedProviderResult(result.value, baselines); - } catch (error) { - hasProcessingErrors = true; - this.logger.error({ - event: "provider_result_apply_failed", - message: "Failed to apply persisted provider result", - providerAddress: result.value.providerAddress, - error: toStructuredError(error), - }); - } - }), - ); - } catch (error) { - hasProcessingErrors = true; - this.logger.error({ - event: "provider_batch_fetch_failed", - message: "Failed to fetch batch", - batchStartIndex: i, - error: toStructuredError(error), - }); - // Continue processing next batch - } - } + // Process providers in parallel + const processingResults = await Promise.allSettled( + providersFromSubgraph.map((provider) => { + const providerInfo = providerInfoMap.get(provider.address.toLowerCase()); + if (!providerInfo) { + return Promise.reject( + new Error( + `Provider ${provider.address} returned from subgraph but not found in local cache - data inconsistency`, + ), + ); + } + return this.processProvider(provider, providerInfo, blockNumberBigInt, baselines); + }), + ); - // Only cleanup stale providers after successful poll to preserve baselines during transient failures - if (!hasProcessingErrors) { - await this.cleanupStaleProviders(providerAddresses, baselines); - } else { - this.logger.warn({ - event: "stale_provider_cleanup_skipped", - message: "Skipping stale provider cleanup due to processing errors", + await Promise.all( + processingResults.map(async (result, index) => { + if (result.status === "rejected") { + hasProcessingErrors = true; + const addr = providersFromSubgraph[index].address; + const providerInfo = providerInfoMap.get(addr.toLowerCase()); + this.logger.error({ + event: "provider_processing_failed", + message: "Failed to process provider", + providerAddress: addr, + providerId: providerInfo?.id, + providerName: providerInfo?.name, + error: toStructuredError(result.reason), + }); + return; + } + + try { + await this.persistBaseline(result.value.providerAddress, result.value.baseline, blockNumberBigInt); + } catch (error) { + hasProcessingErrors = true; + // Leave stale cleanup for a later poll so DB-backed baselines and local state do not diverge further. + this.logger.warn({ + event: "baseline_persist_failed", + message: "Failed to persist baseline to database", + providerAddress: result.value.providerAddress, + error: toStructuredError(error), + }); + return; + } + + try { + this.applyPersistedProviderResult(result.value, baselines); + } catch (error) { + hasProcessingErrors = true; + this.logger.error({ + event: "provider_result_apply_failed", + message: "Failed to apply persisted provider result", + providerAddress: result.value.providerAddress, + error: toStructuredError(error), + }); + } + }), + ); + } catch (error) { + // Subgraph query failure: a dependency outage, not a per-provider issue. + subgraphQueryFailed = true; + this.logger.error({ + event: "provider_batch_fetch_failed", + message: "Failed to fetch batch", + batchStartIndex: i, + error: toStructuredError(error), }); + // Continue processing next batch } - } catch (error) { - this.logger.error({ - event: "data_retention_poll_failed", - message: "Failed to poll data retention", - error: toStructuredError(error), + } + + // Only cleanup stale providers after a fully clean poll to preserve baselines during transient failures + if (!hasProcessingErrors && !subgraphQueryFailed) { + await this.cleanupStaleProviders(providerAddresses, baselines); + } else { + this.logger.warn({ + event: "stale_provider_cleanup_skipped", + message: "Skipping stale provider cleanup due to processing errors", }); } + + // The subgraph dependency failed for at least one batch: fail the job so the outage alarms. + if (subgraphQueryFailed) { + throw new DataRetentionDependencyError("PDP subgraph query failed for one or more provider batches"); + } } /** diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 957ce65a..a3531e9b 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -668,7 +668,11 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { message: "Chain integration disabled; skipping provider refresh job.", }); } else { - await this.walletSdkService.loadProviders(); + // loadProviders() swallows on-chain failures and returns false, that is a job failure. + const loaded = await this.walletSdkService.loadProviders(); + if (!loaded) { + throw new Error("Provider refresh failed: unable to load providers from on-chain registry"); + } } await this.updateStorageProviderGauges(); return "success"; diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 2ec9401e..470345c5 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -87,10 +87,9 @@ export class WalletSdkService implements OnModuleInit { * This allows dealbot to test all FWSS SPs, even those not yet approved * Only loads active, approved providers that support the PDP product */ - async loadProviders(): Promise { + async loadProviders(): Promise { if (this.providersLoadPromise) { - await this.providersLoadPromise; - return; + return this.providersLoadPromise; } this.providersLoadPromise = this.loadProvidersInternal(); @@ -99,6 +98,7 @@ export class WalletSdkService implements OnModuleInit { if (success) { this.providersLoadedOnce = true; } + return success; } finally { this.providersLoadPromise = null; } From f78c45d9f9989988174c197d811aa30df8c8d012 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Wed, 3 Jun 2026 23:33:05 +0530 Subject: [PATCH 2/3] chore: minimum diff --- .../data-retention/data-retention.service.ts | 249 +++++++++--------- 1 file changed, 125 insertions(+), 124 deletions(-) diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index accea2d0..1e3945c6 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -12,7 +12,7 @@ import { DataRetentionBaseline } from "../database/entities/data-retention-basel import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; import { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; -import { type ProviderDataSetResponse, type SubgraphMeta } from "../pdp-subgraph/types.js"; +import { type ProviderDataSetResponse } from "../pdp-subgraph/types.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { type PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js"; @@ -77,147 +77,148 @@ export class DataRetentionService { event: "pdp_subgraph_endpoint_not_configured", message: "No PDP subgraph endpoint configured", }); - throw new DataRetentionDependencyError("PDP subgraph endpoint is not configured"); + throw new DataRetentionDependencyError("PDP subgraph endpoint not configured"); } const baselines = await this.loadBaselinesFromDb(); if (baselines === null) { - // Cannot safely compute deltas without persisted baselines. The DB dependency is unavailable. - throw new DataRetentionDependencyError("Failed to load data-retention baselines from database"); + // Cannot safely compute deltas without persisted baselines. DB dependency is unavailable. + throw new DataRetentionDependencyError("Failed to load data retention baselines from database"); } - let subgraphMeta: SubgraphMeta; - try { - subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); - } catch (error) { - this.logger.error({ - event: "data_retention_poll_failed", - message: "Failed to fetch subgraph meta", - error: toStructuredError(error), - }); - throw new DataRetentionDependencyError("Failed to fetch PDP subgraph meta", { cause: error }); - } - - const allProviderInfos = this.walletSdkService.getTestingProviders(); - const spBlocklists = this.configService.get("spBlocklists"); - const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); - - if (!providerInfos || providerInfos.length === 0) { - // An empty-but-healthy provider set is a successful no-op poll, not a failure. - this.logger.warn({ - event: "no_testing_providers_configured", - message: "No testing providers configured", - }); - return; - } - - const blockNumber = subgraphMeta._meta.block.number; - const blockNumberBigInt = BigInt(blockNumber); - // Create snapshot of provider cache to avoid race condition if loadProviders() clears cache - // Normalize addresses to lowercase for consistent lookups - const providerInfoMap = new Map(providerInfos.map((info) => [info.serviceProvider.toLowerCase(), info])); - const providerAddresses = Array.from(providerInfoMap.keys()); - - // Per-provider processing failures are transient and keep the job a success. - let hasProcessingErrors = false; // A subgraph query failure means the check could not run against its dependency, its a job failure. let subgraphQueryFailed = false; - for (let i = 0; i < providerAddresses.length; i += DataRetentionService.MAX_PROVIDER_BATCH_LENGTH) { - const batchAddresses = providerAddresses.slice( - i, - Math.min(providerAddresses.length, i + DataRetentionService.MAX_PROVIDER_BATCH_LENGTH), - ); - - try { - const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ - blockNumber, - addresses: batchAddresses, + try { + const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + const allProviderInfos = this.walletSdkService.getTestingProviders(); + const spBlocklists = this.configService.get("spBlocklists"); + const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); + + if (!providerInfos || providerInfos.length === 0) { + // An empty-but-healthy provider set is a successful no-op poll, not a failure. + this.logger.warn({ + event: "no_testing_providers_configured", + message: "No testing providers configured", }); + return; + } - // Process providers in parallel - const processingResults = await Promise.allSettled( - providersFromSubgraph.map((provider) => { - const providerInfo = providerInfoMap.get(provider.address.toLowerCase()); - if (!providerInfo) { - return Promise.reject( - new Error( - `Provider ${provider.address} returned from subgraph but not found in local cache - data inconsistency`, - ), - ); - } - return this.processProvider(provider, providerInfo, blockNumberBigInt, baselines); - }), - ); + const blockNumber = subgraphMeta._meta.block.number; + const blockNumberBigInt = BigInt(blockNumber); + // Create snapshot of provider cache to avoid race condition if loadProviders() clears cache + // Normalize addresses to lowercase for consistent lookups + const providerInfoMap = new Map(providerInfos.map((info) => [info.serviceProvider.toLowerCase(), info])); + const providerAddresses = Array.from(providerInfoMap.keys()); + + let hasProcessingErrors = false; - await Promise.all( - processingResults.map(async (result, index) => { - if (result.status === "rejected") { - hasProcessingErrors = true; - const addr = providersFromSubgraph[index].address; - const providerInfo = providerInfoMap.get(addr.toLowerCase()); - this.logger.error({ - event: "provider_processing_failed", - message: "Failed to process provider", - providerAddress: addr, - providerId: providerInfo?.id, - providerName: providerInfo?.name, - error: toStructuredError(result.reason), - }); - return; - } - - try { - await this.persistBaseline(result.value.providerAddress, result.value.baseline, blockNumberBigInt); - } catch (error) { - hasProcessingErrors = true; - // Leave stale cleanup for a later poll so DB-backed baselines and local state do not diverge further. - this.logger.warn({ - event: "baseline_persist_failed", - message: "Failed to persist baseline to database", - providerAddress: result.value.providerAddress, - error: toStructuredError(error), - }); - return; - } - - try { - this.applyPersistedProviderResult(result.value, baselines); - } catch (error) { - hasProcessingErrors = true; - this.logger.error({ - event: "provider_result_apply_failed", - message: "Failed to apply persisted provider result", - providerAddress: result.value.providerAddress, - error: toStructuredError(error), - }); - } - }), + for (let i = 0; i < providerAddresses.length; i += DataRetentionService.MAX_PROVIDER_BATCH_LENGTH) { + const batchAddresses = providerAddresses.slice( + i, + Math.min(providerAddresses.length, i + DataRetentionService.MAX_PROVIDER_BATCH_LENGTH), ); - } catch (error) { - // Subgraph query failure: a dependency outage, not a per-provider issue. - subgraphQueryFailed = true; - this.logger.error({ - event: "provider_batch_fetch_failed", - message: "Failed to fetch batch", - batchStartIndex: i, - error: toStructuredError(error), - }); - // Continue processing next batch + + try { + const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ + blockNumber, + addresses: batchAddresses, + }); + + // Process providers in parallel + const processingResults = await Promise.allSettled( + providersFromSubgraph.map((provider) => { + const providerInfo = providerInfoMap.get(provider.address.toLowerCase()); + if (!providerInfo) { + return Promise.reject( + new Error( + `Provider ${provider.address} returned from subgraph but not found in local cache - data inconsistency`, + ), + ); + } + return this.processProvider(provider, providerInfo, blockNumberBigInt, baselines); + }), + ); + + await Promise.all( + processingResults.map(async (result, index) => { + if (result.status === "rejected") { + hasProcessingErrors = true; + const addr = providersFromSubgraph[index].address; + const providerInfo = providerInfoMap.get(addr.toLowerCase()); + this.logger.error({ + event: "provider_processing_failed", + message: "Failed to process provider", + providerAddress: addr, + providerId: providerInfo?.id, + providerName: providerInfo?.name, + error: toStructuredError(result.reason), + }); + return; + } + + try { + await this.persistBaseline(result.value.providerAddress, result.value.baseline, blockNumberBigInt); + } catch (error) { + hasProcessingErrors = true; + // Leave stale cleanup for a later poll so DB-backed baselines and local state do not diverge further. + this.logger.warn({ + event: "baseline_persist_failed", + message: "Failed to persist baseline to database", + providerAddress: result.value.providerAddress, + error: toStructuredError(error), + }); + return; + } + + try { + this.applyPersistedProviderResult(result.value, baselines); + } catch (error) { + hasProcessingErrors = true; + this.logger.error({ + event: "provider_result_apply_failed", + message: "Failed to apply persisted provider result", + providerAddress: result.value.providerAddress, + error: toStructuredError(error), + }); + } + }), + ); + } catch (error) { + hasProcessingErrors = true; + subgraphQueryFailed = true; + this.logger.error({ + event: "provider_batch_fetch_failed", + message: "Failed to fetch batch", + batchStartIndex: i, + error: toStructuredError(error), + }); + // Continue processing next batch + } } - } - // Only cleanup stale providers after a fully clean poll to preserve baselines during transient failures - if (!hasProcessingErrors && !subgraphQueryFailed) { - await this.cleanupStaleProviders(providerAddresses, baselines); - } else { - this.logger.warn({ - event: "stale_provider_cleanup_skipped", - message: "Skipping stale provider cleanup due to processing errors", + // Only cleanup stale providers after successful poll to preserve baselines during transient failures + if (!hasProcessingErrors) { + await this.cleanupStaleProviders(providerAddresses, baselines); + } else { + this.logger.warn({ + event: "stale_provider_cleanup_skipped", + message: "Skipping stale provider cleanup due to processing errors", + }); + } + } catch (error) { + this.logger.error({ + event: "data_retention_poll_failed", + message: "Failed to poll data retention", + error: toStructuredError(error), }); + // Re-throw as a dependency failure + throw error instanceof DataRetentionDependencyError + ? error + : new DataRetentionDependencyError("Failed to poll data retention", { cause: error }); } - // The subgraph dependency failed for at least one batch: fail the job so the outage alarms. + // Fail the job once the poll has recorded what it could. if (subgraphQueryFailed) { throw new DataRetentionDependencyError("PDP subgraph query failed for one or more provider batches"); } From 97cb4a34fdee3df70026da23b59bf45b507f5e63 Mon Sep 17 00:00:00 2001 From: silent-cipher Date: Thu, 4 Jun 2026 00:21:16 +0530 Subject: [PATCH 3/3] fix: preserve non-dependency error types --- .../data-retention.service.spec.ts | 10 ++++++++++ .../data-retention/data-retention.service.ts | 19 +++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 269521dc..2d74dc1a 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -382,6 +382,16 @@ describe("DataRetentionService", () => { expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled(); }); + it("preserves the original error type for unexpected (non-dependency) failures", async () => { + // A logic/programming error must not be mislabeled as a dependency outage. + const bug = new TypeError("unexpected bug"); + walletSdkServiceMock.getTestingProviders.mockImplementationOnce(() => { + throw bug; + }); + + await expect(service.pollDataRetention()).rejects.toBe(bug); + }); + it("fails the job when baselines cannot be loaded from the database", async () => { mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index 1e3945c6..97cc0555 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -12,7 +12,7 @@ import { DataRetentionBaseline } from "../database/entities/data-retention-basel import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; import { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js"; -import { type ProviderDataSetResponse } from "../pdp-subgraph/types.js"; +import { type ProviderDataSetResponse, type SubgraphMeta } from "../pdp-subgraph/types.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { type PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js"; @@ -90,7 +90,14 @@ export class DataRetentionService { let subgraphQueryFailed = false; try { - const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + let subgraphMeta: SubgraphMeta; + try { + subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + } catch (error) { + // The subgraph is a hard dependency for the poll; label this precisely so the + // outer catch (which now preserves error type) rethrows it as a dependency failure. + throw new DataRetentionDependencyError("Failed to fetch PDP subgraph meta", { cause: error }); + } const allProviderInfos = this.walletSdkService.getTestingProviders(); const spBlocklists = this.configService.get("spBlocklists"); const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); @@ -212,10 +219,10 @@ export class DataRetentionService { message: "Failed to poll data retention", error: toStructuredError(error), }); - // Re-throw as a dependency failure - throw error instanceof DataRetentionDependencyError - ? error - : new DataRetentionDependencyError("Failed to poll data retention", { cause: error }); + // Preserve the original failure type: genuine dependency failures are already + // DataRetentionDependencyError, while anything else (e.g. a logic error) must not be + // mislabeled as a dependency outage. Wrap only non-Error throwables. + throw error instanceof Error ? error : new Error(String(error)); } // Fail the job once the poll has recorded what it could.