Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions apps/backend/src/data-retention/data-retention.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { buildCheckMetricLabels } from "../metrics-prometheus/check-metric-label
import type { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js";
import type { ProviderDataSetResponse } from "../pdp-subgraph/types.js";
import type { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";
import { DataRetentionService } from "./data-retention.service.js";
import { DataRetentionDependencyError, DataRetentionService } from "./data-retention.service.js";

const PROVIDER_A = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" as const;
const PROVIDER_B = "0xab5801a7d398351b8be11c439e05c5b3259aec9b" as const;
Expand Down Expand Up @@ -155,13 +155,12 @@ describe("DataRetentionService", () => {
);
});

it("returns early when pdpSubgraphEndpoint is empty", async () => {
it("throws early when pdpSubgraphEndpoint is empty", async () => {
(configServiceMock.get as ReturnType<typeof vi.fn>).mockReturnValue({
pdpSubgraphEndpoint: "",
});

await service.pollDataRetention();

await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled();
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled();
});
Expand Down Expand Up @@ -369,10 +368,61 @@ describe("DataRetentionService", () => {
});
});

it("catches and logs errors without rethrowing", async () => {
it("fails the job when the subgraph query is unavailable", async () => {
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("subgraph down"));

// Should not throw
// A dependency outage must surface as a job failure, not a silent success.
await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
});

it("fails the job when fetching subgraph meta fails", async () => {
pdpSubgraphServiceMock.fetchSubgraphMeta.mockRejectedValueOnce(new Error("subgraph meta down"));

await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).not.toHaveBeenCalled();
});

it("preserves the original error type for unexpected (non-dependency) failures", async () => {
// A logic/programming error must not be mislabeled as a dependency outage.
const bug = new TypeError("unexpected bug");
walletSdkServiceMock.getTestingProviders.mockImplementationOnce(() => {
throw bug;
});

await expect(service.pollDataRetention()).rejects.toBe(bug);
});

it("fails the job when baselines cannot be loaded from the database", async () => {
mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed"));

await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
// Aborts before touching the subgraph.
expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled();
});

it("fails the job when the subgraph endpoint is missing in production", async () => {
(configServiceMock.get as ReturnType<typeof vi.fn>).mockImplementation((key: string) => {
if (key === "blockchain") return { pdpSubgraphEndpoint: "" };
if (key === "app") return { env: "production" };
if (key === "spBlocklists") return { ids: new Set(), addresses: new Set() };
return undefined;
});

await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
});

it("stays a success when the provider set is empty but healthy", async () => {
walletSdkServiceMock.getTestingProviders.mockReturnValueOnce([]);

await expect(service.pollDataRetention()).resolves.toBeUndefined();
});

it("stays a success when a single provider fails to process (transient per-provider failure)", async () => {
// Provider returned from subgraph but absent from the local cache rejects in processing,
// which is a transient per-provider failure and must not fail the whole job.
const PROVIDER_C = "0x1234567890123456789012345678901234567890";
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_C })]);

await expect(service.pollDataRetention()).resolves.toBeUndefined();
});

Expand Down Expand Up @@ -489,7 +539,7 @@ describe("DataRetentionService", () => {
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25);
});

it("continues processing next batch if one batch fails", async () => {
it("processes all remaining batches before failing when one batch fails", async () => {
const manyProviders = Array.from({ length: 75 }, (_, i) => ({
id: i + 1,
serviceProvider: `0x${i.toString().padStart(40, "0")}`,
Expand All @@ -503,7 +553,9 @@ describe("DataRetentionService", () => {
.mockRejectedValueOnce(new Error("Subgraph timeout"))
.mockResolvedValueOnce([]);

await service.pollDataRetention();
// The poll attempts every batch so healthy data is recorded, then fails the job
// because the subgraph dependency errored.
await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);

// Both batches should be attempted
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -823,10 +875,10 @@ describe("DataRetentionService", () => {
{ id: 2, serviceProvider: PROVIDER_B, name: "Provider B", isApproved: false },
]);

// Simulate processing error
// Simulate a subgraph query failure
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockRejectedValueOnce(new Error("Processing failed"));

await service.pollDataRetention();
await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);

// Should NOT attempt cleanup due to processing errors
expect(mockSPRepository.find).not.toHaveBeenCalled();
Expand Down Expand Up @@ -1005,8 +1057,8 @@ describe("DataRetentionService", () => {

pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValue([makeProvider()]);

// First poll: DB load fails, poll bails out to avoid emitting bloated values
await service.pollDataRetention();
// First poll: DB load fails, poll fails the job to avoid emitting bloated values
await expect(service.pollDataRetention()).rejects.toBeInstanceOf(DataRetentionDependencyError);
expect(mockBaselineRepository.find).toHaveBeenCalledTimes(1);
expect(pdpSubgraphServiceMock.fetchSubgraphMeta).not.toHaveBeenCalled();
expect(counterMock.labels).not.toHaveBeenCalled();
Expand Down
45 changes: 39 additions & 6 deletions apps/backend/src/data-retention/data-retention.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@ import { DataRetentionBaseline } from "../database/entities/data-retention-basel
import { StorageProvider } from "../database/entities/storage-provider.entity.js";
import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js";
import { PDPSubgraphService } from "../pdp-subgraph/pdp-subgraph.service.js";
import { type ProviderDataSetResponse } from "../pdp-subgraph/types.js";
import { type ProviderDataSetResponse, type SubgraphMeta } from "../pdp-subgraph/types.js";
import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";
import { type PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js";

/**
* Thrown when the data-retention check cannot run because one of its dependencies
* (the PDP subgraph or the persisted baselines) is unavailable. Transient per-provider
* failures do NOT raise this — they leave the job a success with partial results.
*/
export class DataRetentionDependencyError extends Error {
constructor(message: string, options?: { cause?: unknown }) {
super(message, options);
this.name = "DataRetentionDependencyError";
}
}

type ProviderBaseline = {
faultedPeriods: bigint;
successPeriods: bigint;
Expand Down Expand Up @@ -61,26 +73,37 @@ export class DataRetentionService {
async pollDataRetention(): Promise<void> {
const pdpSubgraphEndpoint = this.configService.get("blockchain").pdpSubgraphEndpoint;
if (!pdpSubgraphEndpoint) {
this.logger.warn({
this.logger.error({
event: "pdp_subgraph_endpoint_not_configured",
message: "No PDP subgraph endpoint configured",
});
return;
throw new DataRetentionDependencyError("PDP subgraph endpoint not configured");
}

const baselines = await this.loadBaselinesFromDb();
if (baselines === null) {
// Cannot safely compute deltas without persisted baselines.
return;
// Cannot safely compute deltas without persisted baselines. DB dependency is unavailable.
throw new DataRetentionDependencyError("Failed to load data retention baselines from database");
}

// A subgraph query failure means the check could not run against its dependency, its a job failure.
let subgraphQueryFailed = false;

try {
const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta();
let subgraphMeta: SubgraphMeta;
try {
subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta();
} catch (error) {
// The subgraph is a hard dependency for the poll; label this precisely so the
// outer catch (which now preserves error type) rethrows it as a dependency failure.
throw new DataRetentionDependencyError("Failed to fetch PDP subgraph meta", { cause: error });
}
const allProviderInfos = this.walletSdkService.getTestingProviders();
const spBlocklists = this.configService.get("spBlocklists");
const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id));

if (!providerInfos || providerInfos.length === 0) {
// An empty-but-healthy provider set is a successful no-op poll, not a failure.
this.logger.warn({
event: "no_testing_providers_configured",
message: "No testing providers configured",
Expand Down Expand Up @@ -170,6 +193,7 @@ export class DataRetentionService {
);
} catch (error) {
hasProcessingErrors = true;
subgraphQueryFailed = true;
this.logger.error({
event: "provider_batch_fetch_failed",
Comment thread
silent-cipher marked this conversation as resolved.
message: "Failed to fetch batch",
Expand All @@ -195,6 +219,15 @@ export class DataRetentionService {
message: "Failed to poll data retention",
error: toStructuredError(error),
});
// Preserve the original failure type: genuine dependency failures are already
// DataRetentionDependencyError, while anything else (e.g. a logic error) must not be
// mislabeled as a dependency outage. Wrap only non-Error throwables.
throw error instanceof Error ? error : new Error(String(error));
}

// Fail the job once the poll has recorded what it could.
if (subgraphQueryFailed) {
throw new DataRetentionDependencyError("PDP subgraph query failed for one or more provider batches");
Comment thread
silent-cipher marked this conversation as resolved.
}
}

Expand Down
6 changes: 5 additions & 1 deletion apps/backend/src/jobs/jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,11 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown {
message: "Chain integration disabled; skipping provider refresh job.",
});
} else {
await this.walletSdkService.loadProviders();
// loadProviders() swallows on-chain failures and returns false, that is a job failure.
const loaded = await this.walletSdkService.loadProviders();
if (!loaded) {
throw new Error("Provider refresh failed: unable to load providers from on-chain registry");
}
Comment thread
silent-cipher marked this conversation as resolved.
}
await this.updateStorageProviderGauges();
return "success";
Expand Down
6 changes: 3 additions & 3 deletions apps/backend/src/wallet-sdk/wallet-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ export class WalletSdkService implements OnModuleInit {
* This allows dealbot to test all FWSS SPs, even those not yet approved
* Only loads active, approved providers that support the PDP product
*/
async loadProviders(): Promise<void> {
async loadProviders(): Promise<boolean> {
if (this.providersLoadPromise) {
await this.providersLoadPromise;
return;
return this.providersLoadPromise;
}

this.providersLoadPromise = this.loadProvidersInternal();
Expand All @@ -99,6 +98,7 @@ export class WalletSdkService implements OnModuleInit {
if (success) {
this.providersLoadedOnce = true;
}
return success;
} finally {
this.providersLoadPromise = null;
}
Expand Down