Skip to content
139 changes: 135 additions & 4 deletions apps/backend/src/data-retention/data-retention.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ConfigService } from "@nestjs/config";
import type { Counter } from "prom-client";
import type { Counter, Gauge } from "prom-client";
import { Repository } from "typeorm";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { IConfig } from "../config/app.config.js";
Expand All @@ -20,6 +20,12 @@ const makeProvider = (overrides: Partial<ProviderEntry> = {}): ProviderEntry =>
address: PROVIDER_A,
totalFaultedPeriods: 10n,
totalProvingPeriods: 100n,
proofSets: [
{
nextDeadline: 900n,
maxProvingPeriod: 100n,
},
],
...overrides,
});

Expand All @@ -38,6 +44,11 @@ describe("DataRetentionService", () => {
inc: ReturnType<typeof vi.fn>;
remove: ReturnType<typeof vi.fn>;
};
let gaugeMock: {
labels: ReturnType<typeof vi.fn>;
set: ReturnType<typeof vi.fn>;
remove: ReturnType<typeof vi.fn>;
};
let mockBaselineRepository: {
find: ReturnType<typeof vi.fn>;
upsert: ReturnType<typeof vi.fn>;
Expand Down Expand Up @@ -85,14 +96,22 @@ describe("DataRetentionService", () => {
fetchProvidersWithDatasets: vi.fn().mockResolvedValue([]),
};

const incMock = vi.fn();
const counterIncMock = vi.fn();
const removeMock = vi.fn();
counterMock = {
labels: vi.fn().mockReturnValue({ inc: incMock }),
inc: incMock,
labels: vi.fn().mockReturnValue({ inc: counterIncMock }),
inc: counterIncMock,
remove: removeMock,
};

const setMock = vi.fn();
const gaugeIncMock = vi.fn();
gaugeMock = {
labels: vi.fn().mockReturnValue({ set: setMock, inc: gaugeIncMock }),
set: setMock,
remove: vi.fn(),
};

mockBaselineRepository = {
find: vi.fn().mockResolvedValue([]),
upsert: vi.fn().mockResolvedValue(undefined),
Expand All @@ -106,6 +125,7 @@ describe("DataRetentionService", () => {
mockBaselineRepository as unknown as Repository<DataRetentionBaseline>,
mockSPRepository as unknown as Repository<StorageProvider>,
counterMock as unknown as Counter,
gaugeMock as unknown as Gauge,
);
});

Expand Down Expand Up @@ -143,6 +163,7 @@ describe("DataRetentionService", () => {

expect(pdpSubgraphServiceMock.fetchSubgraphMeta).toHaveBeenCalled();
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith({
blockNumber: 1200,
addresses: [PROVIDER_A, PROVIDER_B],
});

Expand Down Expand Up @@ -409,6 +430,7 @@ describe("DataRetentionService", () => {
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(1, {
addresses: expect.arrayContaining([expect.any(String)]),
blockNumber: 1200,
});
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][0].addresses).toHaveLength(50);
expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25);
Expand Down Expand Up @@ -928,4 +950,113 @@ describe("DataRetentionService", () => {
expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ providerAddress: PROVIDER_A });
});
});

describe("overdue periods gauge", () => {
it("emits overdue gauge on first poll (baseline-only)", async () => {
// Provider is overdue: currentBlock=1200,
// estimatedOverduePeriods = (1200 - 901) / 100 = 2.99 -> 2
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider()]);

await service.pollDataRetention();

expect(gaugeMock.labels).toHaveBeenCalledWith(
expect.objectContaining({
checkType: "dataRetention",
providerId: "1",
providerName: "Provider A",
providerStatus: "approved",
}),
);
expect(gaugeMock.set).toHaveBeenCalledWith(2);
});

it("emits overdue gauge = 0 when provider is not overdue", async () => {
// nextDeadline=2000 > currentBlock=1200
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ proofSets: [] })]);

await service.pollDataRetention();

expect(gaugeMock.set).toHaveBeenCalledWith(0);
});

it("emits overdue gauge even on negative delta (baseline reset)", async () => {
// First poll: high values
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([
makeProvider({ totalFaultedPeriods: 100n, totalProvingPeriods: 200n }),
]);
await service.pollDataRetention();
gaugeMock.labels.mockClear();
gaugeMock.set.mockClear();

// Second poll: lower values (negative delta) but still overdue
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([
makeProvider({ totalFaultedPeriods: 50n, totalProvingPeriods: 100n }),
]);
await service.pollDataRetention();

// Gauge should still be emitted despite negative deltas on counters
expect(gaugeMock.labels).toHaveBeenCalled();
expect(gaugeMock.set).toHaveBeenCalled();
});

it("naturally resets gauge to 0 when subgraph catches up", async () => {
// First poll: provider is overdue (currentBlock=1200, nextDeadline=1000)
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider()]);
await service.pollDataRetention();

expect(gaugeMock.set).toHaveBeenCalledWith(2);

gaugeMock.labels.mockClear();
gaugeMock.set.mockClear();

// Second poll: subgraph caught up, nextDeadline advanced past currentBlock
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([
makeProvider({
totalFaultedPeriods: 12n,
totalProvingPeriods: 102n,
proofSets: [],
}),
]);

await service.pollDataRetention();

// Gauge should reset to 0 because nextDeadline (1300) > currentBlock (1200)
expect(gaugeMock.set).toHaveBeenCalledWith(0);
});

it("removes overdue gauge when stale provider is cleaned up", async () => {
// First poll: establish baseline for PROVIDER_A
pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_A })]);
await service.pollDataRetention();

// Second poll: PROVIDER_A removed from active list
walletSdkServiceMock.getTestingProviders.mockReturnValueOnce([
{ id: 2, serviceProvider: PROVIDER_B, name: "Provider B", isApproved: false },
]);

mockSPRepository.find.mockResolvedValueOnce([
{ address: PROVIDER_A, name: "Provider A", providerId: 1, isApproved: true },
]);

pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_B })]);

await service.pollDataRetention();

// Should remove overdue gauge for stale provider (both approved and unapproved labels)
const approvedLabels = buildCheckMetricLabels({
checkType: "dataRetention",
providerId: 1n,
providerName: "Provider A",
providerIsApproved: true,
});
const unapprovedLabels = buildCheckMetricLabels({
checkType: "dataRetention",
providerId: 1n,
providerName: "Provider A",
providerIsApproved: false,
});
expect(gaugeMock.remove).toHaveBeenCalledWith(approvedLabels);
expect(gaugeMock.remove).toHaveBeenCalledWith(unapprovedLabels);
});
});
});
95 changes: 77 additions & 18 deletions apps/backend/src/data-retention/data-retention.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { InjectRepository } from "@nestjs/typeorm";
import { InjectMetric } from "@willsoto/nestjs-prometheus";
import { Counter } from "prom-client";
import { Counter, Gauge } from "prom-client";
import { Raw, Repository } from "typeorm";
import { toStructuredError } from "../common/logging.js";
import { IConfig } from "../config/app.config.js";
Expand Down Expand Up @@ -50,6 +50,8 @@ export class DataRetentionService {
private readonly storageProviderRepository: Repository<StorageProvider>,
@InjectMetric("dataSetChallengeStatus")
private readonly dataSetChallengeStatusCounter: Counter,
@InjectMetric("pdp_provider_estimated_overdue_periods")
private readonly estimatedOverduePeriodsGauge: Gauge,
) {
this.providerCumulativeTotals = new Map();
}
Expand Down Expand Up @@ -105,6 +107,7 @@ export class DataRetentionService {

try {
const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({
blockNumber,
addresses: batchAddresses,
});

Expand All @@ -119,7 +122,7 @@ export class DataRetentionService {
),
);
}
return this.processProvider(provider, providerInfo);
return this.processProvider(provider, providerInfo, blockNumberBigInt);
}),
);

Expand Down Expand Up @@ -187,9 +190,9 @@ export class DataRetentionService {

/**
* Removes stale provider entries from the cumulative totals map and their associated
* Prometheus counter metrics.
* Prometheus counter and gauge metrics.
*
* CRITICAL: Local baselines are ONLY deleted if the Prometheus metric is successfully
* CRITICAL: Local baselines are ONLY deleted if the Prometheus metrics are successfully
* removed. This prevents massive metric inflation (double-counting) if a provider
* temporarily drops offline and returns later.
*
Expand Down Expand Up @@ -256,6 +259,8 @@ export class DataRetentionService {
this.dataSetChallengeStatusCounter.remove({ ...approvedLabels, value: "failure" });
this.dataSetChallengeStatusCounter.remove({ ...unapprovedLabels, value: "success" });
this.dataSetChallengeStatusCounter.remove({ ...unapprovedLabels, value: "failure" });
this.estimatedOverduePeriodsGauge.remove(approvedLabels);
this.estimatedOverduePeriodsGauge.remove(unapprovedLabels);

// Only delete local memory if Prometheus removal succeeded without throwing
this.providerCumulativeTotals.delete(address);
Expand Down Expand Up @@ -311,12 +316,17 @@ export class DataRetentionService {
private async processProvider(
provider: ProviderDataSetResponse["providers"][number],
pdpProvider: PDPProviderEx,
currentBlock: bigint,
): Promise<{ faultedPeriods: bigint; successPeriods: bigint }> {
const { address, totalFaultedPeriods, totalProvingPeriods } = provider;
// Use only subgraph-confirmed totals. Speculative overdue estimation was removed
// because it systematically inflated fault counts: overdue periods were pessimistically
// counted as faults, but when the subgraph later confirmed them as successes, the
// negative delta guard silently discarded the correction.
const { address, totalFaultedPeriods, totalProvingPeriods, proofSets } = provider;
// Note: Query filters proofSets with nextDeadline_lt: $blockNumber, so all deadlines are in the past
const estimatedOverduePeriods = proofSets.reduce((acc, proofSet) => {
if (proofSet.maxProvingPeriod === 0n) {
return acc;
}
return acc + (currentBlock - (proofSet.nextDeadline + 1n)) / proofSet.maxProvingPeriod;
}, 0n);

const confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods;

const normalizedAddress = address.toLowerCase();
Expand All @@ -327,8 +337,27 @@ export class DataRetentionService {
successPeriods: confirmedTotalSuccess,
};

// First time seeing this provider (fresh deploy or newly added provider).
// Set baseline without emitting counters to avoid dumping full cumulative history.
const providerLabels = buildCheckMetricLabels({
checkType: "dataRetention",
providerId: pdpProvider.id,
providerName: pdpProvider.name,
providerIsApproved: pdpProvider.isApproved,
});

// Emit overdue periods gauge on every poll — this is a separate signal from the
// confirmed counters. It reflects estimated unrecorded faults in real time and
// naturally resets to 0 when NextProvingPeriod fires and the subgraph catches up.
// Note: Safe to cast under normal conditions (1 period = 240 blocks). However, we
// check for overflow to handle edge cases like proving period changes or fast finality.
this.safeSetGauge(
this.estimatedOverduePeriodsGauge,
providerLabels,
estimatedOverduePeriods,
address,
pdpProvider.id,
pdpProvider.name,
);

if (previous === undefined) {
this.logger.log({
event: "baseline_initialized",
Expand Down Expand Up @@ -365,13 +394,6 @@ export class DataRetentionService {
return newBaseline;
}

const providerLabels = buildCheckMetricLabels({
checkType: "dataRetention",
providerId: pdpProvider.id,
providerName: pdpProvider.name,
providerIsApproved: pdpProvider.isApproved,
});

if (faultedChallengesDelta > 0n) {
this.safeIncrementCounter(this.dataSetChallengeStatusCounter, providerLabels, "failure", faultedChallengesDelta);
}
Expand All @@ -381,6 +403,7 @@ export class DataRetentionService {
}

this.providerCumulativeTotals.set(normalizedAddress, newBaseline);

return newBaseline;
}

Expand Down Expand Up @@ -475,4 +498,40 @@ export class DataRetentionService {
remaining -= chunk;
}
}

/**
* Safely sets a Prometheus gauge with a BigInt value.
* If the value exceeds Number.MAX_SAFE_INTEGER, clamps to MAX_SAFE_INTEGER and logs a warning.
*
* @param gauge - The Prometheus gauge to set
* @param labels - The label set for the gauge
* @param value - The BigInt value to set
* @param providerAddress - Provider address for logging
* @param providerId - Provider ID for logging
* @param providerName - Provider name for logging
*/
private safeSetGauge(
gauge: Gauge,
labels: CheckMetricLabels,
value: bigint,
providerAddress: string,
providerId: bigint,
providerName: string,
): void {
const MAX_SAFE_INTEGER_BIGINT = BigInt(Number.MAX_SAFE_INTEGER);

if (value > MAX_SAFE_INTEGER_BIGINT) {
this.logger.warn({
event: "overdue_periods_overflow",
message: "Estimated overdue periods exceeds safe integer range. Clamping to MAX_SAFE_INTEGER.",
providerAddress,
providerId,
providerName,
estimatedOverduePeriods: value.toString(),
});
gauge.labels(labels).set(Number.MAX_SAFE_INTEGER);
} else {
gauge.labels(labels).set(Number(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ const metricProviders = [
help: "Provider dataset challenge status",
labelNames: ["checkType", "providerId", "providerName", "providerStatus", "value"] as const,
}),
makeGaugeProvider({
name: "pdp_provider_estimated_overdue_periods",
help: "Estimated number of unrecorded overdue proving periods per provider. Resets to 0 when the subgraph catches up.",
labelNames: ["checkType", "providerId", "providerName", "providerStatus"] as const,
}),
// Storage provider metrics: absolute counts, independent of query filters.
makeGaugeProvider({
name: "storage_providers_active",
Expand Down
Loading
Loading