diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 024f6a72..ce9b280c 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -1,5 +1,5 @@ import type { ConfigService } from "@nestjs/config"; -import type { Counter } from "prom-client"; +import type { Counter, Gauge } from "prom-client"; import { Repository } from "typeorm"; import { beforeEach, describe, expect, it, vi } from "vitest"; import type { IConfig } from "../config/app.config.js"; @@ -20,6 +20,12 @@ const makeProvider = (overrides: Partial = {}): ProviderEntry => address: PROVIDER_A, totalFaultedPeriods: 10n, totalProvingPeriods: 100n, + proofSets: [ + { + nextDeadline: 900n, + maxProvingPeriod: 100n, + }, + ], ...overrides, }); @@ -38,6 +44,11 @@ describe("DataRetentionService", () => { inc: ReturnType; remove: ReturnType; }; + let gaugeMock: { + labels: ReturnType; + set: ReturnType; + remove: ReturnType; + }; let mockBaselineRepository: { find: ReturnType; upsert: ReturnType; @@ -85,14 +96,22 @@ describe("DataRetentionService", () => { fetchProvidersWithDatasets: vi.fn().mockResolvedValue([]), }; - const incMock = vi.fn(); + const counterIncMock = vi.fn(); const removeMock = vi.fn(); counterMock = { - labels: vi.fn().mockReturnValue({ inc: incMock }), - inc: incMock, + labels: vi.fn().mockReturnValue({ inc: counterIncMock }), + inc: counterIncMock, remove: removeMock, }; + const setMock = vi.fn(); + const gaugeIncMock = vi.fn(); + gaugeMock = { + labels: vi.fn().mockReturnValue({ set: setMock, inc: gaugeIncMock }), + set: setMock, + remove: vi.fn(), + }; + mockBaselineRepository = { find: vi.fn().mockResolvedValue([]), upsert: vi.fn().mockResolvedValue(undefined), @@ -106,6 +125,7 @@ describe("DataRetentionService", () => { mockBaselineRepository as unknown as Repository, mockSPRepository as unknown as Repository, counterMock as unknown as Counter, + gaugeMock as unknown as Gauge, ); }); @@ -143,6 +163,7 @@ describe("DataRetentionService", () => { expect(pdpSubgraphServiceMock.fetchSubgraphMeta).toHaveBeenCalled(); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith({ + blockNumber: 1200, addresses: [PROVIDER_A, PROVIDER_B], }); @@ -409,6 +430,7 @@ describe("DataRetentionService", () => { expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(1, { addresses: expect.arrayContaining([expect.any(String)]), + blockNumber: 1200, }); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][0].addresses).toHaveLength(50); expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25); @@ -928,4 +950,113 @@ describe("DataRetentionService", () => { expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ providerAddress: PROVIDER_A }); }); }); + + describe("overdue periods gauge", () => { + it("emits overdue gauge on first poll (baseline-only)", async () => { + // Provider is overdue: currentBlock=1200, + // estimatedOverduePeriods = (1200 - 901) / 100 = 2.99 -> 2 + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider()]); + + await service.pollDataRetention(); + + expect(gaugeMock.labels).toHaveBeenCalledWith( + expect.objectContaining({ + checkType: "dataRetention", + providerId: "1", + providerName: "Provider A", + providerStatus: "approved", + }), + ); + expect(gaugeMock.set).toHaveBeenCalledWith(2); + }); + + it("emits overdue gauge = 0 when provider is not overdue", async () => { + // nextDeadline=2000 > currentBlock=1200 + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ proofSets: [] })]); + + await service.pollDataRetention(); + + expect(gaugeMock.set).toHaveBeenCalledWith(0); + }); + + it("emits overdue gauge even on negative delta (baseline reset)", async () => { + // First poll: high values + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ + makeProvider({ totalFaultedPeriods: 100n, totalProvingPeriods: 200n }), + ]); + await service.pollDataRetention(); + gaugeMock.labels.mockClear(); + gaugeMock.set.mockClear(); + + // Second poll: lower values (negative delta) but still overdue + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ + makeProvider({ totalFaultedPeriods: 50n, totalProvingPeriods: 100n }), + ]); + await service.pollDataRetention(); + + // Gauge should still be emitted despite negative deltas on counters + expect(gaugeMock.labels).toHaveBeenCalled(); + expect(gaugeMock.set).toHaveBeenCalled(); + }); + + it("naturally resets gauge to 0 when subgraph catches up", async () => { + // First poll: provider is overdue (currentBlock=1200, nextDeadline=1000) + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider()]); + await service.pollDataRetention(); + + expect(gaugeMock.set).toHaveBeenCalledWith(2); + + gaugeMock.labels.mockClear(); + gaugeMock.set.mockClear(); + + // Second poll: subgraph caught up, nextDeadline advanced past currentBlock + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ + makeProvider({ + totalFaultedPeriods: 12n, + totalProvingPeriods: 102n, + proofSets: [], + }), + ]); + + await service.pollDataRetention(); + + // Gauge should reset to 0 because nextDeadline (1300) > currentBlock (1200) + expect(gaugeMock.set).toHaveBeenCalledWith(0); + }); + + it("removes overdue gauge when stale provider is cleaned up", async () => { + // First poll: establish baseline for PROVIDER_A + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_A })]); + await service.pollDataRetention(); + + // Second poll: PROVIDER_A removed from active list + walletSdkServiceMock.getTestingProviders.mockReturnValueOnce([ + { id: 2, serviceProvider: PROVIDER_B, name: "Provider B", isApproved: false }, + ]); + + mockSPRepository.find.mockResolvedValueOnce([ + { address: PROVIDER_A, name: "Provider A", providerId: 1, isApproved: true }, + ]); + + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_B })]); + + await service.pollDataRetention(); + + // Should remove overdue gauge for stale provider (both approved and unapproved labels) + const approvedLabels = buildCheckMetricLabels({ + checkType: "dataRetention", + providerId: 1n, + providerName: "Provider A", + providerIsApproved: true, + }); + const unapprovedLabels = buildCheckMetricLabels({ + checkType: "dataRetention", + providerId: 1n, + providerName: "Provider A", + providerIsApproved: false, + }); + expect(gaugeMock.remove).toHaveBeenCalledWith(approvedLabels); + expect(gaugeMock.remove).toHaveBeenCalledWith(unapprovedLabels); + }); + }); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index dcfc313e..42ef72c1 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; -import { Counter } from "prom-client"; +import { Counter, Gauge } from "prom-client"; import { Raw, Repository } from "typeorm"; import { toStructuredError } from "../common/logging.js"; import { IConfig } from "../config/app.config.js"; @@ -50,6 +50,8 @@ export class DataRetentionService { private readonly storageProviderRepository: Repository, @InjectMetric("dataSetChallengeStatus") private readonly dataSetChallengeStatusCounter: Counter, + @InjectMetric("pdp_provider_estimated_overdue_periods") + private readonly estimatedOverduePeriodsGauge: Gauge, ) { this.providerCumulativeTotals = new Map(); } @@ -105,6 +107,7 @@ export class DataRetentionService { try { const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ + blockNumber, addresses: batchAddresses, }); @@ -119,7 +122,7 @@ export class DataRetentionService { ), ); } - return this.processProvider(provider, providerInfo); + return this.processProvider(provider, providerInfo, blockNumberBigInt); }), ); @@ -187,9 +190,9 @@ export class DataRetentionService { /** * Removes stale provider entries from the cumulative totals map and their associated - * Prometheus counter metrics. + * Prometheus counter and gauge metrics. * - * CRITICAL: Local baselines are ONLY deleted if the Prometheus metric is successfully + * CRITICAL: Local baselines are ONLY deleted if the Prometheus metrics are successfully * removed. This prevents massive metric inflation (double-counting) if a provider * temporarily drops offline and returns later. * @@ -256,6 +259,8 @@ export class DataRetentionService { this.dataSetChallengeStatusCounter.remove({ ...approvedLabels, value: "failure" }); this.dataSetChallengeStatusCounter.remove({ ...unapprovedLabels, value: "success" }); this.dataSetChallengeStatusCounter.remove({ ...unapprovedLabels, value: "failure" }); + this.estimatedOverduePeriodsGauge.remove(approvedLabels); + this.estimatedOverduePeriodsGauge.remove(unapprovedLabels); // Only delete local memory if Prometheus removal succeeded without throwing this.providerCumulativeTotals.delete(address); @@ -311,12 +316,17 @@ export class DataRetentionService { private async processProvider( provider: ProviderDataSetResponse["providers"][number], pdpProvider: PDPProviderEx, + currentBlock: bigint, ): Promise<{ faultedPeriods: bigint; successPeriods: bigint }> { - const { address, totalFaultedPeriods, totalProvingPeriods } = provider; - // Use only subgraph-confirmed totals. Speculative overdue estimation was removed - // because it systematically inflated fault counts: overdue periods were pessimistically - // counted as faults, but when the subgraph later confirmed them as successes, the - // negative delta guard silently discarded the correction. + const { address, totalFaultedPeriods, totalProvingPeriods, proofSets } = provider; + // Note: Query filters proofSets with nextDeadline_lt: $blockNumber, so all deadlines are in the past + const estimatedOverduePeriods = proofSets.reduce((acc, proofSet) => { + if (proofSet.maxProvingPeriod === 0n) { + return acc; + } + return acc + (currentBlock - (proofSet.nextDeadline + 1n)) / proofSet.maxProvingPeriod; + }, 0n); + const confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods; const normalizedAddress = address.toLowerCase(); @@ -327,8 +337,27 @@ export class DataRetentionService { successPeriods: confirmedTotalSuccess, }; - // First time seeing this provider (fresh deploy or newly added provider). - // Set baseline without emitting counters to avoid dumping full cumulative history. + const providerLabels = buildCheckMetricLabels({ + checkType: "dataRetention", + providerId: pdpProvider.id, + providerName: pdpProvider.name, + providerIsApproved: pdpProvider.isApproved, + }); + + // Emit overdue periods gauge on every poll — this is a separate signal from the + // confirmed counters. It reflects estimated unrecorded faults in real time and + // naturally resets to 0 when NextProvingPeriod fires and the subgraph catches up. + // Note: Safe to cast under normal conditions (1 period = 240 blocks). However, we + // check for overflow to handle edge cases like proving period changes or fast finality. + this.safeSetGauge( + this.estimatedOverduePeriodsGauge, + providerLabels, + estimatedOverduePeriods, + address, + pdpProvider.id, + pdpProvider.name, + ); + if (previous === undefined) { this.logger.log({ event: "baseline_initialized", @@ -365,13 +394,6 @@ export class DataRetentionService { return newBaseline; } - const providerLabels = buildCheckMetricLabels({ - checkType: "dataRetention", - providerId: pdpProvider.id, - providerName: pdpProvider.name, - providerIsApproved: pdpProvider.isApproved, - }); - if (faultedChallengesDelta > 0n) { this.safeIncrementCounter(this.dataSetChallengeStatusCounter, providerLabels, "failure", faultedChallengesDelta); } @@ -381,6 +403,7 @@ export class DataRetentionService { } this.providerCumulativeTotals.set(normalizedAddress, newBaseline); + return newBaseline; } @@ -475,4 +498,40 @@ export class DataRetentionService { remaining -= chunk; } } + + /** + * Safely sets a Prometheus gauge with a BigInt value. + * If the value exceeds Number.MAX_SAFE_INTEGER, clamps to MAX_SAFE_INTEGER and logs a warning. + * + * @param gauge - The Prometheus gauge to set + * @param labels - The label set for the gauge + * @param value - The BigInt value to set + * @param providerAddress - Provider address for logging + * @param providerId - Provider ID for logging + * @param providerName - Provider name for logging + */ + private safeSetGauge( + gauge: Gauge, + labels: CheckMetricLabels, + value: bigint, + providerAddress: string, + providerId: bigint, + providerName: string, + ): void { + const MAX_SAFE_INTEGER_BIGINT = BigInt(Number.MAX_SAFE_INTEGER); + + if (value > MAX_SAFE_INTEGER_BIGINT) { + this.logger.warn({ + event: "overdue_periods_overflow", + message: "Estimated overdue periods exceeds safe integer range. Clamping to MAX_SAFE_INTEGER.", + providerAddress, + providerId, + providerName, + estimatedOverduePeriods: value.toString(), + }); + gauge.labels(labels).set(Number.MAX_SAFE_INTEGER); + } else { + gauge.labels(labels).set(Number(value)); + } + } } diff --git a/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts b/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts index 516bb091..18bda30d 100644 --- a/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts +++ b/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts @@ -202,6 +202,11 @@ const metricProviders = [ help: "Provider dataset challenge status", labelNames: ["checkType", "providerId", "providerName", "providerStatus", "value"] as const, }), + makeGaugeProvider({ + name: "pdp_provider_estimated_overdue_periods", + help: "Estimated number of unrecorded overdue proving periods per provider. Resets to 0 when the subgraph catches up.", + labelNames: ["checkType", "providerId", "providerName", "providerStatus"] as const, + }), // Storage provider metrics: absolute counts, independent of query filters. makeGaugeProvider({ name: "storage_providers_active", diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts index 8f48ed81..cd3a1ea8 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts @@ -14,6 +14,14 @@ const makeValidProvider = (overrides: Record = {}) => ({ address: VALID_ADDRESS, totalFaultedPeriods: "10", totalProvingPeriods: "100", + proofSets: [ + { + totalFaultedPeriods: "2", + currentDeadlineCount: "5", + nextDeadline: "1000", + maxProvingPeriod: "100", + }, + ], ...overrides, }); @@ -62,6 +70,7 @@ describe("PDPSubgraphService", () => { }); const providers = await service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -75,6 +84,7 @@ describe("PDPSubgraphService", () => { expect(providers[0].address).toBe(VALID_ADDRESS); expect(providers[0].totalFaultedPeriods).toBe(10n); expect(providers[0].totalProvingPeriods).toBe(100n); + expect(providers[0].proofSets[0].maxProvingPeriod).toBe(100n); }); it("returns empty array when no providers exist", async () => { @@ -84,6 +94,7 @@ describe("PDPSubgraphService", () => { }); const providers = await service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); expect(providers).toEqual([]); @@ -91,6 +102,7 @@ describe("PDPSubgraphService", () => { it("returns empty array when addresses array is empty", async () => { const providers = await service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [], }); @@ -105,6 +117,7 @@ describe("PDPSubgraphService", () => { }); const promise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -127,6 +140,7 @@ describe("PDPSubgraphService", () => { }); const promise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); promise.catch(() => {}); @@ -142,6 +156,7 @@ describe("PDPSubgraphService", () => { fetchMock.mockRejectedValueOnce(new Error("Network error")); const promise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); promise.catch(() => {}); @@ -163,6 +178,7 @@ describe("PDPSubgraphService", () => { await expect( service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }), ).rejects.toThrow("Data validation failed"); @@ -181,6 +197,7 @@ describe("PDPSubgraphService", () => { await expect( service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }), ).rejects.toThrow("Data validation failed"); @@ -189,6 +206,21 @@ describe("PDPSubgraphService", () => { expect(fetchMock).toHaveBeenCalledTimes(1); }); + it("sends blockNumber as string in the GraphQL variables", async () => { + fetchMock.mockResolvedValueOnce({ + ok: true, + json: async () => makeSubgraphResponse([makeValidProvider()]), + }); + + await service.fetchProvidersWithDatasets({ + blockNumber: 12345, + addresses: [VALID_ADDRESS], + }); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body.variables.blockNumber).toBe("12345"); + }); + it("retries network errors but not validation errors", async () => { // First attempt: network error (should retry) fetchMock.mockRejectedValueOnce(new Error("Network timeout")); @@ -202,6 +234,7 @@ describe("PDPSubgraphService", () => { }); const promise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); promise.catch(() => {}); @@ -223,6 +256,7 @@ describe("PDPSubgraphService", () => { const addresses = [VALID_ADDRESS, "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B"]; await service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses, }); @@ -240,6 +274,7 @@ describe("PDPSubgraphService", () => { }); await service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses, }); @@ -255,6 +290,7 @@ describe("PDPSubgraphService", () => { }); const promise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -286,6 +322,7 @@ describe("PDPSubgraphService", () => { }); const fetchPromise = service.fetchProvidersWithDatasets({ + blockNumber: 5000, addresses, }); diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts index b37d39ac..aedd8bce 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts @@ -124,29 +124,32 @@ export class PDPSubgraphService { /** * Fetch provider-level totals from subgraph with batching, pagination, and rate limiting * - * @param options - Options containing provider addresses to query - * @returns Array of providers with their cumulative proving period totals + * @param options - Options containing block number and provider addresses + * @returns Array of providers with their data sets currently proving */ async fetchProvidersWithDatasets( options: ProvidersWithDataSetsOptions, ): Promise { - const { addresses } = options; + const { blockNumber, addresses } = options; if (addresses.length === 0) { return []; } if (addresses.length <= PDPSubgraphService.MAX_PROVIDERS_PER_QUERY) { - return this.fetchWithRetry(addresses); + return this.fetchWithRetry(blockNumber, addresses); } - return this.fetchMultipleBatchesWithRateLimit(addresses); + return this.fetchMultipleBatchesWithRateLimit(blockNumber, addresses); } /** * Fetch multiple batches with rate limiting and concurrency control */ - private async fetchMultipleBatchesWithRateLimit(addresses: string[]): Promise { + private async fetchMultipleBatchesWithRateLimit( + blockNumber: number, + addresses: string[], + ): Promise { const batches: string[][] = []; for (let i = 0; i < addresses.length; i += PDPSubgraphService.MAX_PROVIDERS_PER_QUERY) { const addressesLimit = Math.min(addresses.length, i + PDPSubgraphService.MAX_PROVIDERS_PER_QUERY); @@ -158,7 +161,7 @@ export class PDPSubgraphService { for (let i = 0; i < batches.length; i += PDPSubgraphService.MAX_CONCURRENT_REQUESTS) { const batchGroup = batches.slice(i, i + PDPSubgraphService.MAX_CONCURRENT_REQUESTS); - const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(batch))); + const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(blockNumber, batch))); allProviders.push(...results.flat()); } @@ -171,6 +174,7 @@ export class PDPSubgraphService { * Assuming initial request to be first attempt */ private async fetchWithRetry( + blockNumber: number, addresses: string[], attempt: number = 1, ): Promise { @@ -179,6 +183,7 @@ export class PDPSubgraphService { } const variables = { + blockNumber: blockNumber.toString(), addresses, }; @@ -242,13 +247,14 @@ export class PDPSubgraphService { error: toStructuredError(error), }); await new Promise((resolve) => setTimeout(resolve, delay)); - return this.fetchWithRetry(addresses, attempt + 1); + return this.fetchWithRetry(blockNumber, addresses, attempt + 1); } this.logger.error({ event: "subgraph_provider_request_failed", message: "Subgraph provider request failed after maximum retries", maxRetries: PDPSubgraphService.MAX_RETRIES, + blockNumber, addressCount: addresses.length, error: toStructuredError(error), }); diff --git a/apps/backend/src/pdp-subgraph/queries.ts b/apps/backend/src/pdp-subgraph/queries.ts index 9f69cb0a..a21a3991 100644 --- a/apps/backend/src/pdp-subgraph/queries.ts +++ b/apps/backend/src/pdp-subgraph/queries.ts @@ -1,10 +1,14 @@ export const Queries = { GET_PROVIDERS_WITH_DATASETS: ` - query GetProvidersWithDataSet($addresses: [Bytes!]) { + query GetProvidersWithDataSet($addresses: [Bytes!], $blockNumber: BigInt!) { providers(where: {address_in: $addresses}) { address totalFaultedPeriods totalProvingPeriods + proofSets (where: {nextDeadline_lt: $blockNumber, status: PROVING}) { + nextDeadline + maxProvingPeriod + } } } `, diff --git a/apps/backend/src/pdp-subgraph/types.spec.ts b/apps/backend/src/pdp-subgraph/types.spec.ts index 9b81c6e1..02e6eee0 100644 --- a/apps/backend/src/pdp-subgraph/types.spec.ts +++ b/apps/backend/src/pdp-subgraph/types.spec.ts @@ -8,6 +8,12 @@ const makeValidProvider = (overrides: Record = {}) => ({ address: VALID_ADDRESS, totalFaultedPeriods: "10", totalProvingPeriods: "100", + proofSets: [ + { + nextDeadline: "1000", + maxProvingPeriod: "100", + }, + ], ...overrides, }); @@ -24,6 +30,10 @@ describe("validateProviderDataSetResponse", () => { expect(provider.address).toBe(VALID_ADDRESS); expect(provider.totalFaultedPeriods).toBe(10n); expect(provider.totalProvingPeriods).toBe(100n); + + const proofSet = provider.proofSets[0]; + expect(proofSet.nextDeadline).toBe(1000n); + expect(proofSet.maxProvingPeriod).toBe(100n); }); it("converts string numbers to bigint", () => { @@ -46,6 +56,11 @@ describe("validateProviderDataSetResponse", () => { expect(result.providers).toEqual([]); }); + it("accepts a provider with empty proofSets", () => { + const result = validateProviderDataSetResponse(makeValidResponse([makeValidProvider({ proofSets: [] })])); + expect(result.providers[0].proofSets).toEqual([]); + }); + it("preserves unknown fields (schema uses .unknown(true))", () => { const result = validateProviderDataSetResponse(makeValidResponse([makeValidProvider({ extraField: "hello" })])); expect((result.providers[0] as Record).extraField).toBe("hello"); @@ -85,6 +100,18 @@ describe("validateProviderDataSetResponse", () => { ).toThrow("Invalid provider dataset response format"); }); + it("throws on missing proofSet fields", () => { + expect(() => + validateProviderDataSetResponse( + makeValidResponse([ + makeValidProvider({ + proofSets: [{ totalFaultedPeriods: "1" }], + }), + ]), + ), + ).toThrow("Invalid provider dataset response format"); + }); + it("validates multiple providers in a single response", () => { const provider1 = makeValidProvider({ address: VALID_ADDRESS, totalFaultedPeriods: "5" }); const provider2 = makeValidProvider({ @@ -105,12 +132,19 @@ describe("validateProviderDataSetResponse", () => { makeValidProvider({ totalFaultedPeriods: "0", totalProvingPeriods: "0", + proofSets: [ + { + nextDeadline: "0", + maxProvingPeriod: "0", + }, + ], }), ]), ); expect(result.providers[0].totalFaultedPeriods).toBe(0n); expect(result.providers[0].totalProvingPeriods).toBe(0n); + expect(result.providers[0].proofSets[0].maxProvingPeriod).toBe(0n); }); }); diff --git a/apps/backend/src/pdp-subgraph/types.ts b/apps/backend/src/pdp-subgraph/types.ts index 78d81b2e..ad8dcdc4 100644 --- a/apps/backend/src/pdp-subgraph/types.ts +++ b/apps/backend/src/pdp-subgraph/types.ts @@ -14,10 +14,11 @@ export type GraphQLResponse = { }; /** - * Options for fetching provider-level totals from the PDP subgraph + * Options for fetching providers with data sets */ export type ProvidersWithDataSetsOptions = { addresses: string[]; + blockNumber: number; }; /** @@ -31,6 +32,15 @@ export type SubgraphMeta = { }; }; +/** + * A single proof set within a provider, representing deadline-related proving data. + * All numeric fields are bigints converted from the subgraph string representation. + */ +export type DataSet = { + nextDeadline: bigint; + maxProvingPeriod: bigint; +}; + /** * Validated and transformed response from the PDP subgraph providers query. * Numeric fields are converted from subgraph string representation to bigint. @@ -40,6 +50,7 @@ export type ProviderDataSetResponse = { address: Hex; totalFaultedPeriods: bigint; totalProvingPeriods: bigint; + proofSets: DataSet[]; }[]; }; @@ -86,6 +97,11 @@ const metaSchema = Joi.object({ .unknown(true) .required(); +const dataSetSchema = Joi.object({ + nextDeadline: Joi.string().pattern(/^\d+$/).required().custom(toBigInt), + maxProvingPeriod: Joi.string().pattern(/^\d+$/).required().custom(toBigInt), +}).unknown(true); + const providerDataSetResponseSchema = Joi.object({ providers: Joi.array() .items( @@ -93,6 +109,7 @@ const providerDataSetResponseSchema = Joi.object({ address: Joi.string().required().custom(toEthereumAddress), totalFaultedPeriods: Joi.string().pattern(/^\d+$/).required().custom(toBigInt), totalProvingPeriods: Joi.string().pattern(/^\d+$/).required().custom(toBigInt), + proofSets: Joi.array().items(dataSetSchema).required(), }).unknown(true), ) .required(), diff --git a/docs/checks/data-retention.md b/docs/checks/data-retention.md index dbcd7ecc..804190c6 100644 --- a/docs/checks/data-retention.md +++ b/docs/checks/data-retention.md @@ -13,7 +13,7 @@ The Data Retention check monitors storage providers' ability to retain data over Every data retention check cycle, dealbot: 1. Queries the [PDP subgraph](https://docs.filecoin.io/smart-contracts/advanced/proof-of-data-possession) for provider-level challenge statistics -2. Computes confirmed successful proving periods from the subgraph totals +2. Computes confirmed successful proving periods from the subgraph totals with estimated overdue periods for real-time monitoring 3. Calculates deltas since the last poll 4. Records metrics to track provider reliability over time @@ -40,18 +40,31 @@ From `GET_PROVIDERS_WITH_DATASETS` query for each provider: - `address` - Provider address - `totalFaultedPeriods` - Cumulative count of faulted proving periods across all data sets (maintained by the subgraph's `NextProvingPeriod` event handler) - `totalProvingPeriods` - Cumulative count of all proving periods (successful + faulted) across all data sets +- `proofSets` - Array of proof sets where `nextDeadline < currentBlock` (overdue deadlines), each containing: + - `nextDeadline` - Next deadline block number + - `maxProvingPeriod` - Maximum proving period duration + +> **Note**: The subgraph query uses the field name `proofSets`, but this refers to "dataSets" in the current codebase. The terminology was updated from "proof set" to "data set" but the subgraph schema retains the old naming. Source: [`pdp-subgraph.service.ts` (`fetchSubgraphMeta`, `fetchProvidersWithDatasets`)](../../apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts) -### 2. Compute Challenge Totals +### 2. Compute Challenge Totals and Overdue Estimates -Dealbot uses the subgraph-confirmed totals directly: +Dealbot uses the subgraph-confirmed totals directly for cumulative counters: ``` confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods ``` -> **Note:** An earlier implementation estimated overdue periods (periods elapsed since the last recorded deadline) and pessimistically counted them as faults. This was removed because the speculative estimation systematically inflated fault rates — overdue periods were counted as faults immediately, but when the subgraph later confirmed them as successes, the correction was discarded by the negative-delta guard. +Additionally, dealbot calculates **estimated overdue periods** for real-time monitoring via a separate gauge metric. For each proof set where the deadline has passed (`nextDeadline < currentBlock`): + +``` +estimatedOverduePeriods = (currentBlock - (nextDeadline + 1)) / maxProvingPeriod +``` + +This gauge provides immediate visibility into providers that are behind on submitting proofs, even before the subgraph confirms the faults. The gauge naturally resets to 0 when providers submit their proofs and the subgraph catches up. + +**Key distinction**: The overdue gauge is independent of the cumulative counter baselines. It reflects the current state on every poll, while counters track confirmed changes over time. ### 3. Calculate Deltas @@ -108,8 +121,9 @@ To prevent unbounded memory growth, dealbot periodically removes baseline data f 1. Identify providers in the baseline map but not in the current active list 2. Fetch provider info from the database 3. Remove Prometheus counter metrics for both success and fault labels -4. Delete baseline entry from memory **only if** counter removal succeeds -5. Delete baseline entry from database (non-blocking, logged on failure) +4. Remove Prometheus gauge metric for overdue periods +5. Delete baseline entry from memory **only if** metric removal succeeds +6. Delete baseline entry from database (non-blocking, logged on failure) **Critical safeguard**: Baselines are retained if: @@ -154,6 +168,8 @@ Source: [`pdp-subgraph.service.ts` (`enforceRateLimit`)](../../apps/backend/src/ ## Metrics Recorded +### Counter: `dataSetChallengeStatus` + See [`dataSetChallengeStatus`](./events-and-metrics.md#dataSetChallengeStatus) for more info. **Increment behavior**: @@ -162,6 +178,17 @@ See [`dataSetChallengeStatus`](./events-and-metrics.md#dataSetChallengeStatus) f - Increments by the delta amount (not always 1) - Handles large values (>MAX_SAFE_INTEGER) via chunked increments +### Gauge: `pdp_provider_overdue_periods` + +See [`pdp_provider_overdue_periods`](./events-and-metrics.md#pdp_provider_overdue_periods) for more info. + +**Emission behavior**: + +- Emitted on every poll, independent of counter deltas +- Reflects estimated unrecorded overdue proving periods in real-time +- Naturally resets to 0 when providers submit proofs and the subgraph catches up +- Handles large values (>MAX_SAFE_INTEGER) via chunked `.inc()` calls + ## Configuration Key environment variables that control data retention check behavior: @@ -212,7 +239,8 @@ flowchart TD BatchLoop --> FetchData[Fetch Provider Totals from Subgraph] FetchData --> ProcessParallel[Process Providers in Parallel] ProcessParallel --> CalcTotals[Compute Success from Confirmed Totals] - CalcTotals --> CheckBaseline{Has Prior
Baseline?} + CalcTotals --> EmitGauge[Emit Overdue Periods Gauge] + EmitGauge --> CheckBaseline{Has Prior
Baseline?} CheckBaseline -->|No| InitBaseline[Initialize Baseline. No Metric Emission] InitBaseline --> PersistBaseline CheckBaseline -->|Yes| CalcDeltas[Calculate Deltas from Baseline] diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index 76e4ce5c..8ededa34 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -105,3 +105,4 @@ sequenceDiagram | `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). | | | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Not tied to an [event above](#event-list) but rather to the periodic chain-checking done in the [Data Retention Check](./data-retention.md) | `success`, `failure` | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | +| `pdp_provider_overdue_periods` | Data Retention | Emitted on every poll | Gauge value (estimated overdue periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) |