diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index 7929ed87..762c9e29 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -68,6 +68,21 @@ export const configValidationSchema = Joi.object({ DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs IPFS_BLOCK_FETCH_CONCURRENCY: Joi.number().integer().min(1).max(32).default(6), + // Piece Cleanup + MAX_DATASET_STORAGE_SIZE_BYTES: Joi.number() + .integer() + .min(1) + .default(24 * 1024 * 1024 * 1024), // 24 GiB per SP + TARGET_DATASET_STORAGE_SIZE_BYTES: Joi.number() + .integer() + .min(1) + .default(20 * 1024 * 1024 * 1024), // 20 GiB per SP + JOB_PIECE_CLEANUP_PER_SP_PER_HOUR: Joi.number() + .min(0.001) + .max(20) + .default(1 / 24), // ~once per day + MAX_PIECE_CLEANUP_RUNTIME_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for cleanup jobs + // Dataset DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH), RANDOM_PIECE_SIZES: Joi.string().default("10485760"), // 10 MiB @@ -204,6 +219,20 @@ export interface IJobsConfig { * Uses AbortController to actively cancel job execution. */ retrievalJobTimeoutSeconds: number; + /** + * Target number of piece cleanup runs per storage provider per hour. + * + * Increasing this makes cleanup more aggressive at the cost of more SP API calls. + * Only used when `DEALBOT_JOBS_MODE=pgboss`. + */ + pieceCleanupPerSpPerHour: number; + /** + * Maximum runtime (seconds) for piece cleanup jobs before forced abort. + * + * Uses AbortController to actively cancel job execution. + * Only used when `DEALBOT_JOBS_MODE=pgboss`. + */ + maxPieceCleanupRuntimeSeconds: number; } export interface IDatasetConfig { @@ -223,6 +252,11 @@ export interface IRetrievalConfig { ipfsBlockFetchConcurrency: number; } +export interface IPieceCleanupConfig { + maxDatasetStorageSizeBytes: number; + targetDatasetStorageSizeBytes: number; +} + export interface IConfig { app: IAppConfig; database: IDatabaseConfig; @@ -232,6 +266,7 @@ export interface IConfig { dataset: IDatasetConfig; timeouts: ITimeoutConfig; retrieval: IRetrievalConfig; + pieceCleanup: IPieceCleanupConfig; } export function loadConfig(): IConfig { @@ -293,6 +328,8 @@ export function loadConfig(): IConfig { dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10), retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10), dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10), + pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || "1"), + maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10), }, dataset: { localDatasetsPath: process.env.DEALBOT_LOCAL_DATASETS_PATH || DEFAULT_LOCAL_DATASETS_PATH, @@ -322,5 +359,15 @@ export function loadConfig(): IConfig { retrieval: { ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10), }, + pieceCleanup: { + maxDatasetStorageSizeBytes: Number.parseInt( + process.env.MAX_DATASET_STORAGE_SIZE_BYTES || String(24 * 1024 * 1024 * 1024), + 10, + ), + targetDatasetStorageSizeBytes: Number.parseInt( + process.env.TARGET_DATASET_STORAGE_SIZE_BYTES || String(20 * 1024 * 1024 * 1024), + 10, + ), + }, }; } diff --git a/apps/backend/src/database/entities/deal.entity.ts b/apps/backend/src/database/entities/deal.entity.ts index d3c95d4a..0216a5d6 100644 --- a/apps/backend/src/database/entities/deal.entity.ts +++ b/apps/backend/src/database/entities/deal.entity.ts @@ -132,6 +132,13 @@ export class Deal { @Column({ name: "retry_count", default: 0 }) retryCount: number; + // Piece cleanup tracking + @Column({ name: "cleaned_up", default: false }) + cleanedUp: boolean; + + @Column({ name: "cleaned_up_at", type: "timestamptz", nullable: true }) + cleanedUpAt: Date; + @CreateDateColumn({ name: "created_at", type: "timestamptz" }) createdAt: Date; diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index 28c6cd0c..4f5dafd1 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -7,7 +7,8 @@ export type JobType = | "metrics" | "metrics_cleanup" | "providers_refresh" - | "data_retention_poll"; + | "data_retention_poll" + | "piece_cleanup"; @Entity("job_schedule_state") @Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true }) diff --git a/apps/backend/src/database/migrations/1761500000002-AddPieceCleanupColumns.ts b/apps/backend/src/database/migrations/1761500000002-AddPieceCleanupColumns.ts new file mode 100644 index 00000000..7777a2da --- /dev/null +++ b/apps/backend/src/database/migrations/1761500000002-AddPieceCleanupColumns.ts @@ -0,0 +1,18 @@ +import type { MigrationInterface, QueryRunner } from "typeorm"; + +export class AddPieceCleanupColumns1761500000002 implements MigrationInterface { + name = "AddPieceCleanupColumns1761500000002"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + ALTER TABLE deals + ADD COLUMN IF NOT EXISTS cleaned_up BOOLEAN NOT NULL DEFAULT false, + ADD COLUMN IF NOT EXISTS cleaned_up_at TIMESTAMPTZ NULL + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS cleaned_up_at`); + await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS cleaned_up`); + } +} diff --git a/apps/backend/src/jobs/jobs.module.ts b/apps/backend/src/jobs/jobs.module.ts index 6bb95f32..a8ac50af 100644 --- a/apps/backend/src/jobs/jobs.module.ts +++ b/apps/backend/src/jobs/jobs.module.ts @@ -7,6 +7,7 @@ import { StorageProvider } from "../database/entities/storage-provider.entity.js import { DealModule } from "../deal/deal.module.js"; import { MetricsModule } from "../metrics/metrics.module.js"; import { MetricsWorkerModule } from "../metrics/metrics-worker.module.js"; +import { PieceCleanupModule } from "../piece-cleanup/piece-cleanup.module.js"; import { RetrievalModule } from "../retrieval/retrieval.module.js"; import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; import { JobsService } from "./jobs.service.js"; @@ -25,6 +26,7 @@ const metricsModule = runMode === "worker" ? MetricsWorkerModule : MetricsModule metricsModule, WalletSdkModule, DataRetentionModule, + PieceCleanupModule, ], providers: [JobsService, JobScheduleRepository], }) diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index 0744175e..1db920cd 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -31,16 +31,16 @@ describe("JobsService schedule rows", () => { }; let dataRetentionServiceMock: { pollDataRetention: ReturnType }; let metricsMocks: { - jobsQueuedGauge: JobsServiceDeps[8]; - jobsRetryScheduledGauge: JobsServiceDeps[9]; - oldestQueuedAgeGauge: JobsServiceDeps[10]; - oldestInFlightAgeGauge: JobsServiceDeps[11]; - jobsInFlightGauge: JobsServiceDeps[12]; - jobsEnqueueAttemptsCounter: JobsServiceDeps[13]; - jobsStartedCounter: JobsServiceDeps[14]; - jobsCompletedCounter: JobsServiceDeps[15]; - jobsPausedGauge: JobsServiceDeps[16]; - jobDuration: JobsServiceDeps[17]; + jobsQueuedGauge: JobsServiceDeps[9]; + jobsRetryScheduledGauge: JobsServiceDeps[10]; + oldestQueuedAgeGauge: JobsServiceDeps[11]; + oldestInFlightAgeGauge: JobsServiceDeps[12]; + jobsInFlightGauge: JobsServiceDeps[13]; + jobsEnqueueAttemptsCounter: JobsServiceDeps[14]; + jobsStartedCounter: JobsServiceDeps[15]; + jobsCompletedCounter: JobsServiceDeps[16]; + jobsPausedGauge: JobsServiceDeps[17]; + jobDuration: JobsServiceDeps[18]; }; let baseConfigValues: Partial; let configService: JobsServiceDeps[0]; @@ -54,16 +54,17 @@ describe("JobsService schedule rows", () => { metricsSchedulerService: JobsServiceDeps[5]; walletSdkService: JobsServiceDeps[6]; dataRetentionService: JobsServiceDeps[7]; - jobsQueuedGauge: JobsServiceDeps[8]; - jobsRetryScheduledGauge: JobsServiceDeps[9]; - oldestQueuedAgeGauge: JobsServiceDeps[10]; - oldestInFlightAgeGauge: JobsServiceDeps[11]; - jobsInFlightGauge: JobsServiceDeps[12]; - jobsEnqueueAttemptsCounter: JobsServiceDeps[13]; - jobsStartedCounter: JobsServiceDeps[14]; - jobsCompletedCounter: JobsServiceDeps[15]; - jobsPausedGauge: JobsServiceDeps[16]; - jobDuration: JobsServiceDeps[17]; + pieceCleanupService: JobsServiceDeps[8]; + jobsQueuedGauge: JobsServiceDeps[9]; + jobsRetryScheduledGauge: JobsServiceDeps[10]; + oldestQueuedAgeGauge: JobsServiceDeps[11]; + oldestInFlightAgeGauge: JobsServiceDeps[12]; + jobsInFlightGauge: JobsServiceDeps[13]; + jobsEnqueueAttemptsCounter: JobsServiceDeps[14]; + jobsStartedCounter: JobsServiceDeps[15]; + jobsCompletedCounter: JobsServiceDeps[16]; + jobsPausedGauge: JobsServiceDeps[17]; + jobDuration: JobsServiceDeps[18]; }>, ) => JobsService; @@ -92,16 +93,16 @@ describe("JobsService schedule rows", () => { }; metricsMocks = { - jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[8], - jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9], - oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10], - oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11], - jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12], - jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[13], - jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14], - jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15], - jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[16], - jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[17], + jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9], + jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10], + oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11], + oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12], + jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[13], + jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14], + jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15], + jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[16], + jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[17], + jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[18], }; baseConfigValues = { @@ -120,6 +121,8 @@ describe("JobsService schedule rows", () => { pgbossSchedulerEnabled: true, workerPollSeconds: 60, dataSetCreationJobTimeoutSeconds: 300, + pieceCleanupPerSpPerHour: 1, + maxPieceCleanupRuntimeSeconds: 300, } as IConfig["jobs"], database: { host: "localhost", @@ -128,6 +131,9 @@ describe("JobsService schedule rows", () => { password: "pass", database: "dealbot", } as IConfig["database"], + pieceCleanup: { + maxDatasetStorageSizeBytes: 24 * 1024 * 1024 * 1024, + } as IConfig["pieceCleanup"], }; configService = { @@ -144,6 +150,8 @@ describe("JobsService schedule rows", () => { overrides.metricsSchedulerService ?? ({} as JobsServiceDeps[5]), overrides.walletSdkService ?? ({} as JobsServiceDeps[6]), overrides.dataRetentionService ?? (dataRetentionServiceMock as unknown as JobsServiceDeps[7]), + overrides.pieceCleanupService ?? + ({ isProviderOverQuota: vi.fn().mockResolvedValue(false) } as unknown as JobsServiceDeps[8]), overrides.jobsQueuedGauge ?? metricsMocks.jobsQueuedGauge, overrides.jobsRetryScheduledGauge ?? metricsMocks.jobsRetryScheduledGauge, overrides.oldestQueuedAgeGauge ?? metricsMocks.oldestQueuedAgeGauge, @@ -613,8 +621,13 @@ describe("JobsService schedule rows", () => { // Check upserts for providerB const upsertCalls = jobScheduleRepositoryMock.upsertSchedule.mock.calls; const upsertsForB = upsertCalls.filter((call) => call[1] === providerB.address); - expect(upsertsForB).toHaveLength(3); - expect(upsertsForB.map((call) => call[0]).sort()).toEqual(["data_set_creation", "deal", "retrieval"]); + expect(upsertsForB).toHaveLength(4); + expect(upsertsForB.map((call) => call[0]).sort()).toEqual([ + "data_set_creation", + "deal", + "piece_cleanup", + "retrieval", + ]); }); it("deletes schedule rows for providers no longer present", async () => { @@ -913,6 +926,120 @@ describe("JobsService schedule rows", () => { ); }); + it("deal job skips deal creation when SP is over storage quota", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T12:00:00Z")); + + const dealService = { + createDealForProvider: vi.fn(), + getTestingDealOptions: vi.fn(() => ({ enableIpni: false })), + getBaseDataSetMetadata: vi.fn(() => ({})), + checkDataSetExists: vi.fn(async () => false), + }; + + const walletSdkService = { + getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]), + ensureWalletAllowances: vi.fn(), + loadProviders: vi.fn(), + getProviderInfo: vi.fn(() => ({ id: 1 })), + }; + + const pieceCleanupService = { + isProviderOverQuota: vi.fn().mockResolvedValue(true), + }; + + service = buildService({ + dealService: dealService as unknown as ConstructorParameters[3], + walletSdkService: walletSdkService as unknown as ConstructorParameters[6], + pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8], + }); + + await callPrivate(service, "handleDealJob", { + id: "job-over-quota", + data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + }); + + expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa"); + expect(dealService.createDealForProvider).not.toHaveBeenCalled(); + }); + + it("deal job proceeds with deal creation when quota check throws (fail-open)", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T12:00:00Z")); + + const dealService = { + createDealForProvider: vi.fn(async () => ({})), + getTestingDealOptions: vi.fn(() => ({ enableIpni: false })), + getBaseDataSetMetadata: vi.fn(() => ({})), + checkDataSetExists: vi.fn(async () => false), + }; + + const walletSdkService = { + getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]), + ensureWalletAllowances: vi.fn(), + loadProviders: vi.fn(), + getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })), + }; + + const pieceCleanupService = { + isProviderOverQuota: vi.fn().mockRejectedValue(new Error("DB connection failed")), + }; + + service = buildService({ + dealService: dealService as unknown as ConstructorParameters[3], + walletSdkService: walletSdkService as unknown as ConstructorParameters[6], + pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8], + }); + + await callPrivate(service, "handleDealJob", { + id: "job-quota-error", + data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + }); + + expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa"); + // Should proceed despite the quota check failure (fail-open) + expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); + }); + + it("deal job uses live provider data for quota (DB/provider drift: live says OK → deal proceeds)", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2024-01-01T12:00:00Z")); + + const dealService = { + createDealForProvider: vi.fn(async () => ({})), + getTestingDealOptions: vi.fn(() => ({ enableIpni: false })), + getBaseDataSetMetadata: vi.fn(() => ({})), + checkDataSetExists: vi.fn(async () => false), + }; + + const walletSdkService = { + getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]), + ensureWalletAllowances: vi.fn(), + loadProviders: vi.fn(), + getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })), + }; + + // Live provider data says NOT over quota (even if DB SUM disagrees) + const pieceCleanupService = { + isProviderOverQuota: vi.fn().mockResolvedValue(false), + }; + + service = buildService({ + dealService: dealService as unknown as ConstructorParameters[3], + walletSdkService: walletSdkService as unknown as ConstructorParameters[6], + pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8], + }); + + await callPrivate(service, "handleDealJob", { + id: "job-drift-ok", + data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + }); + + expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa"); + // Live data says under quota, so deal creation should proceed + expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); + }); + it("deal job creates deal without metadata when minNumDataSetsForChecks is 1", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2024-01-01T12:00:00Z")); diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 4a31a732..60a657ef 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -13,6 +13,7 @@ import type { JobType } from "../database/entities/job-schedule-state.entity.js" import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealService } from "../deal/deal.service.js"; import { MetricsSchedulerService } from "../metrics/services/metrics-scheduler.service.js"; +import { PieceCleanupService } from "../piece-cleanup/piece-cleanup.service.js"; import { RetrievalService } from "../retrieval/retrieval.service.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { provisionNextMissingDataSet } from "./data-set-creation.handler.js"; @@ -25,8 +26,8 @@ import { } from "./job-queues.js"; import { JobScheduleRepository } from "./repositories/job-schedule.repository.js"; -type SpJobType = "deal" | "retrieval" | "data_set_creation"; -const SP_JOB_TYPES: ReadonlySet = new Set(["deal", "retrieval", "data_set_creation"]); +type SpJobType = "deal" | "retrieval" | "data_set_creation" | "piece_cleanup"; +const SP_JOB_TYPES: ReadonlySet = new Set(["deal", "retrieval", "data_set_creation", "piece_cleanup"]); function isSpJobType(jobType: string): jobType is SpJobType { return SP_JOB_TYPES.has(jobType); } @@ -66,6 +67,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private readonly metricsSchedulerService: MetricsSchedulerService, private readonly walletSdkService: WalletSdkService, private readonly dataRetentionService: DataRetentionService, + private readonly pieceCleanupService: PieceCleanupService, @InjectMetric("jobs_queued") private readonly jobsQueuedGauge: Gauge, @InjectMetric("jobs_retry_scheduled") @@ -290,6 +292,10 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.handleDataSetCreationJob(job); return; } + if (job.data.jobType === "piece_cleanup") { + await this.handlePieceCleanupJob(job); + return; + } this.logger.warn({ event: "unknown_sp_job_type", message: "Skipping unknown SP job type", @@ -432,6 +438,26 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { return; } + // Over-quota gating: avoid adding more pieces while already above quota + try { + const overQuota = await this.pieceCleanupService.isProviderOverQuota(spAddress); + if (overQuota) { + this.logger.warn({ + event: "deal_job_over_quota", + message: `Deal job skipped: SP ${spAddress} is over the storage quota; cleanup must run first`, + spAddress, + }); + return; + } + } catch (error) { + this.logger.warn({ + event: "deal_job_quota_check_failed", + message: `Failed to check storage quota for ${spAddress}; proceeding with deal creation`, + spAddress, + error: toStructuredError(error), + }); + } + // Create AbortController for job timeout enforcement const abortController = new AbortController(); const timeoutSeconds = this.configService.get("jobs").dealJobTimeoutSeconds; @@ -642,6 +668,62 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } + private async handlePieceCleanupJob(job: SpJob): Promise { + const data = job.data; + const spAddress = data.spAddress; + const now = new Date(); + const maintenance = this.getMaintenanceWindowStatus(now); + if (maintenance.active) { + this.logMaintenanceSkip(`piece_cleanup job for ${spAddress}`, maintenance.window?.label, { + jobId: job.id, + providerAddress: spAddress, + providerId: this.walletSdkService.getProviderInfo(spAddress)?.id, + }); + await this.deferJobForMaintenance("piece_cleanup", data, maintenance, now); + return; + } + + const abortController = new AbortController(); + const jobsConfig = this.configService.get("jobs"); + const timeoutSeconds = jobsConfig.maxPieceCleanupRuntimeSeconds; + const timeoutMs = Math.max(60000, timeoutSeconds * 1000); + const effectiveTimeoutSeconds = Math.round(timeoutMs / 1000); + const abortReason = new Error(`Piece cleanup job timeout (${effectiveTimeoutSeconds}s) for ${spAddress}`); + const timeoutId = setTimeout(() => { + abortController.abort(abortReason); + }, timeoutMs); + + await this.recordJobExecution("piece_cleanup", async () => { + try { + await this.pieceCleanupService.cleanupPiecesForProvider(spAddress, abortController.signal); + return "success"; + } catch (error) { + if (abortController.signal.aborted) { + const reason = abortController.signal.reason; + const reasonMessage = reason instanceof Error ? reason.message : String(reason ?? ""); + this.logger.warn({ + event: "piece_cleanup_job_aborted", + message: + reasonMessage || `Piece cleanup job aborted after timeout (${effectiveTimeoutSeconds}s) for ${spAddress}`, + spAddress, + timeoutSeconds: effectiveTimeoutSeconds, + error: toStructuredError(reason ?? error), + }); + return "aborted"; + } + this.logger.error({ + event: "piece_cleanup_job_failed", + message: `Piece cleanup job failed for ${spAddress}`, + spAddress, + error: toStructuredError(error), + }); + throw error; + } finally { + clearTimeout(timeoutId); + } + }); + } + private async handleDataSetCreationJob(job: SpJob): Promise { const data = job.data; const spAddress = data.spAddress; @@ -803,6 +885,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { metricsCleanupIntervalSeconds: number; dataRetentionPollIntervalSeconds: number; providersRefreshIntervalSeconds: number; + pieceCleanupIntervalSeconds: number; } { const jobsConfig = this.configService.get("jobs", { infer: true }); const scheduling = this.configService.get("scheduling", { infer: true }); @@ -814,11 +897,13 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const retrievalsPerHour = jobsConfig.retrievalsPerSpPerHour; const metricsPerHour = jobsConfig.metricsPerHour; const dataSetCreationsPerHour = jobsConfig.dataSetCreationsPerSpPerHour; + const pieceCleanupPerHour = jobsConfig.pieceCleanupPerSpPerHour; const dealIntervalSeconds = Math.max(1, Math.round(3600 / dealsPerHour)); const retrievalIntervalSeconds = Math.max(1, Math.round(3600 / retrievalsPerHour)); const metricsIntervalSeconds = Math.max(1, Math.round(3600 / metricsPerHour)); const dataSetCreationIntervalSeconds = Math.max(1, Math.round(3600 / dataSetCreationsPerHour)); + const pieceCleanupIntervalSeconds = Math.max(1, Math.round(3600 / pieceCleanupPerHour)); const metricsCleanupIntervalSeconds = defaultMetricsCleanupIntervalSeconds; const dataRetentionPollIntervalSeconds = scheduling.dataRetentionPollIntervalSeconds; const providersRefreshIntervalSeconds = scheduling.providersRefreshIntervalSeconds; @@ -831,6 +916,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { metricsCleanupIntervalSeconds, dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, + pieceCleanupIntervalSeconds, }; } @@ -851,6 +937,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { metricsCleanupIntervalSeconds, dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, + pieceCleanupIntervalSeconds, } = this.getIntervalSecondsForRates(); const useOnlyApprovedProviders = this.configService.get("blockchain").useOnlyApprovedProviders; @@ -871,6 +958,8 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const minDataSets = this.configService.get("blockchain").minNumDataSetsForChecks; + const cleanupStartAt = new Date(now.getTime() + phaseMs); + for (const address of providerAddresses) { await this.jobScheduleRepository.upsertSchedule("deal", address, dealIntervalSeconds, dealStartAt); await this.jobScheduleRepository.upsertSchedule("retrieval", address, retrievalIntervalSeconds, retrievalStartAt); @@ -882,6 +971,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataSetCreationStartAt, ); } + await this.jobScheduleRepository.upsertSchedule( + "piece_cleanup", + address, + pieceCleanupIntervalSeconds, + cleanupStartAt, + ); } if (providerAddresses.length > 0) { @@ -1013,6 +1108,8 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { return SP_WORK_QUEUE; case "data_set_creation": return SP_WORK_QUEUE; + case "piece_cleanup": + return SP_WORK_QUEUE; case "metrics": return METRICS_QUEUE; case "metrics_cleanup": @@ -1029,7 +1126,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } private mapJobPayload(row: ScheduleRow): SpJobData | MetricsJobData | ProvidersRefreshJobData { - if (row.job_type === "deal" || row.job_type === "retrieval" || row.job_type === "data_set_creation") { + if ( + row.job_type === "deal" || + row.job_type === "retrieval" || + row.job_type === "data_set_creation" || + row.job_type === "piece_cleanup" + ) { return { jobType: row.job_type, spAddress: row.sp_address, intervalSeconds: row.interval_seconds }; } return { intervalSeconds: row.interval_seconds }; @@ -1104,6 +1206,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { "metrics_cleanup", "data_retention_poll", "providers_refresh", + "piece_cleanup", ]; for (const jobType of jobTypes) { this.jobsQueuedGauge.set({ job_type: jobType }, 0); diff --git a/apps/backend/src/piece-cleanup/piece-cleanup.module.ts b/apps/backend/src/piece-cleanup/piece-cleanup.module.ts new file mode 100644 index 00000000..91f6f87f --- /dev/null +++ b/apps/backend/src/piece-cleanup/piece-cleanup.module.ts @@ -0,0 +1,12 @@ +import { Module } from "@nestjs/common"; +import { TypeOrmModule } from "@nestjs/typeorm"; +import { Deal } from "../database/entities/deal.entity.js"; +import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; +import { PieceCleanupService } from "./piece-cleanup.service.js"; + +@Module({ + imports: [TypeOrmModule.forFeature([Deal]), WalletSdkModule], + providers: [PieceCleanupService], + exports: [PieceCleanupService], +}) +export class PieceCleanupModule {} diff --git a/apps/backend/src/piece-cleanup/piece-cleanup.service.spec.ts b/apps/backend/src/piece-cleanup/piece-cleanup.service.spec.ts new file mode 100644 index 00000000..f8093a93 --- /dev/null +++ b/apps/backend/src/piece-cleanup/piece-cleanup.service.spec.ts @@ -0,0 +1,478 @@ +import { ConfigService } from "@nestjs/config"; +import { Test, TestingModule } from "@nestjs/testing"; +import { getRepositoryToken } from "@nestjs/typeorm"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { IConfig } from "../config/app.config.js"; +import { Deal } from "../database/entities/deal.entity.js"; +import { DealStatus } from "../database/types.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; +import { PieceCleanupService, type StorageContext } from "./piece-cleanup.service.js"; + +vi.mock("@filoz/synapse-sdk", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + RPC_URLS: { + calibration: { http: "http://localhost:1234" }, + }, + Synapse: { + create: vi.fn().mockReturnValue({ + storage: { + createContext: vi.fn().mockResolvedValue({ + deletePiece: vi.fn(), + } as unknown as StorageContext), + }, + }), + }, + }; +}); + +vi.mock("filecoin-pin/core/data-set", () => ({ + listDataSets: vi.fn().mockResolvedValue([]), + calculateActualStorage: vi.fn().mockResolvedValue({ + totalBytes: 0n, + dataSetCount: 0, + dataSetsProcessed: 0, + pieceCount: 0, + warnings: [], + }), +})); + +describe("PieceCleanupService", () => { + let service: PieceCleanupService; + let dealRepoMock: ReturnType; + let walletSdkMock: ReturnType; + + const MiB = 1024 * 1024; + const THRESHOLD_BYTES = 100 * MiB; // 100 MiB for tests + + function createDealRepoMock() { + return { + find: vi.fn(), + save: vi.fn(), + createQueryBuilder: vi.fn(), + }; + } + + function createWalletSdkMock() { + return { + getProviderInfo: vi.fn().mockReturnValue({ id: 9, name: "Test SP" }), + }; + } + + const TARGET_BYTES = 80 * MiB; // 80 MiB low-water mark for tests + + function createConfigMock() { + return { + get: vi.fn((key: keyof IConfig) => { + if (key === "pieceCleanup") { + return { + maxDatasetStorageSizeBytes: THRESHOLD_BYTES, + targetDatasetStorageSizeBytes: TARGET_BYTES, + }; + } + if (key === "blockchain") { + return { + walletPrivateKey: "0x1234567890123456789012345678901234567890123456789012345678901234", + network: "calibration", + walletAddress: "0x123", + }; + } + return undefined; + }), + }; + } + + function makeDeal(overrides: Partial = {}): Deal { + const deal = new Deal(); + deal.id = overrides.id ?? `deal-${Math.random().toString(36).slice(2)}`; + deal.spAddress = overrides.spAddress ?? "0xProvider"; + deal.status = overrides.status ?? DealStatus.DEAL_CREATED; + deal.pieceId = Object.hasOwn(overrides, "pieceId") ? (overrides.pieceId as number) : 42; + deal.dataSetId = Object.hasOwn(overrides, "dataSetId") ? (overrides.dataSetId as bigint) : 1n; + deal.pieceCid = overrides.pieceCid ?? "bafk-piece"; + deal.pieceSize = overrides.pieceSize ?? 10 * MiB; + deal.fileSize = overrides.fileSize ?? 10 * MiB; + deal.cleanedUp = overrides.cleanedUp ?? false; + deal.createdAt = overrides.createdAt ?? new Date("2024-01-01T00:00:00Z"); + deal.walletAddress = overrides.walletAddress ?? "0x123"; + deal.fileName = overrides.fileName ?? "test.bin"; + return deal; + } + + function mockQueryBuilder(totalBytes: number) { + const qb = { + select: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + andWhere: vi.fn().mockReturnThis(), + getRawOne: vi.fn().mockResolvedValue({ totalBytes: String(totalBytes) }), + }; + dealRepoMock.createQueryBuilder.mockReturnValue(qb); + return qb; + } + + beforeEach(async () => { + dealRepoMock = createDealRepoMock(); + walletSdkMock = createWalletSdkMock(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PieceCleanupService, + { provide: ConfigService, useValue: createConfigMock() }, + { provide: getRepositoryToken(Deal), useValue: dealRepoMock }, + { provide: WalletSdkService, useValue: walletSdkMock }, + ], + }).compile(); + + service = module.get(PieceCleanupService); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("getStoredBytesForProvider", () => { + it("returns total bytes from the query builder", async () => { + mockQueryBuilder(50 * MiB); + + const result = await service.getStoredBytesForProvider("0xProvider"); + + expect(result).toBe(50 * MiB); + expect(dealRepoMock.createQueryBuilder).toHaveBeenCalledWith("deal"); + }); + + it("returns 0 when no deals exist", async () => { + mockQueryBuilder(0); + + const result = await service.getStoredBytesForProvider("0xProvider"); + + expect(result).toBe(0); + }); + + it("returns 0 when query result is null", async () => { + const qb = { + select: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + andWhere: vi.fn().mockReturnThis(), + getRawOne: vi.fn().mockResolvedValue(null), + }; + dealRepoMock.createQueryBuilder.mockReturnValue(qb); + + const result = await service.getStoredBytesForProvider("0xProvider"); + + expect(result).toBe(0); + }); + }); + + describe("isProviderOverQuota", () => { + it("returns true when live stored bytes exceed threshold", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(THRESHOLD_BYTES + 1); + + const result = await service.isProviderOverQuota("0xProvider"); + + expect(result).toBe(true); + }); + + it("returns false when live stored bytes are at threshold", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(THRESHOLD_BYTES); + + const result = await service.isProviderOverQuota("0xProvider"); + + expect(result).toBe(false); + }); + + it("returns false when live stored bytes are below threshold", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(THRESHOLD_BYTES - 1); + + const result = await service.isProviderOverQuota("0xProvider"); + + expect(result).toBe(false); + }); + + it("falls back to DB when live query fails", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockRejectedValue(new Error("network error")); + mockQueryBuilder(THRESHOLD_BYTES + 1); + + const result = await service.isProviderOverQuota("0xProvider"); + + expect(result).toBe(true); + }); + }); + + describe("getCleanupCandidates", () => { + it("queries for oldest completed deals with piece IDs", async () => { + const deals = [makeDeal({ createdAt: new Date("2024-01-01") }), makeDeal({ createdAt: new Date("2024-01-02") })]; + dealRepoMock.find.mockResolvedValue(deals); + + const result = await service.getCleanupCandidates("0xProvider", 10); + + expect(result).toEqual(deals); + expect(dealRepoMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + where: expect.objectContaining({ + spAddress: "0xProvider", + status: DealStatus.DEAL_CREATED, + cleanedUp: false, + }), + order: { createdAt: "ASC" }, + take: 10, + }), + ); + }); + + it("respects the limit parameter", async () => { + dealRepoMock.find.mockResolvedValue([]); + + await service.getCleanupCandidates("0xProvider", 5); + + expect(dealRepoMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + take: 5, + }), + ); + }); + }); + + describe("cleanupPiecesForProvider", () => { + it("skips cleanup when stored bytes are below threshold", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(50 * MiB); // 50 MiB < 100 MiB threshold + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.skipped).toBe(true); + expect(result.deleted).toBe(0); + expect(result.failed).toBe(0); + expect(result.storedBytes).toBe(50 * MiB); + expect(result.thresholdBytes).toBe(THRESHOLD_BYTES); + }); + + it("skips cleanup when stored bytes equal threshold", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(THRESHOLD_BYTES); // exactly at threshold + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.skipped).toBe(true); + expect(result.deleted).toBe(0); + }); + + it("returns cleanup result with no candidates when above threshold but no eligible deals", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(200 * MiB); // above threshold + dealRepoMock.find.mockResolvedValue([]); // no candidates + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.skipped).toBe(false); + expect(result.deleted).toBe(0); + expect(result.failed).toBe(0); + }); + + it("deletes pieces until excess is cleared (down to low-water mark)", async () => { + // storedBytes = 130 MiB, target = 80 MiB, so excess = 50 MiB to delete + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(130 * MiB); + + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 10 * MiB }); + const deal2 = makeDeal({ id: "deal-2", pieceId: 2, pieceSize: 10 * MiB }); + const deal3 = makeDeal({ id: "deal-3", pieceId: 3, pieceSize: 10 * MiB }); + const deal4 = makeDeal({ id: "deal-4", pieceId: 4, pieceSize: 10 * MiB }); + const deal5 = makeDeal({ id: "deal-5", pieceId: 5, pieceSize: 10 * MiB }); + const deal6 = makeDeal({ id: "deal-6", pieceId: 6, pieceSize: 10 * MiB }); + dealRepoMock.find.mockResolvedValue([deal1, deal2, deal3, deal4, deal5, deal6]); + + const deletePieceSpy = vi.spyOn(service, "deletePiece").mockResolvedValue(undefined); + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.deleted).toBe(5); // 50 MiB = 5 × 10 MiB + expect(result.failed).toBe(0); + expect(result.skipped).toBe(false); + expect(deletePieceSpy).toHaveBeenCalledTimes(5); + }); + + it("continues deleting after individual piece failure", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(200 * MiB); + + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 10 * MiB }); + const deal2 = makeDeal({ id: "deal-2", pieceId: 2, pieceSize: 10 * MiB }); + // First batch returns both deals; second batch returns empty + dealRepoMock.find.mockResolvedValueOnce([deal1, deal2]).mockResolvedValueOnce([]); + + vi.spyOn(service, "deletePiece").mockRejectedValueOnce(new Error("SDK error")).mockResolvedValueOnce(undefined); + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.deleted).toBe(1); + expect(result.failed).toBe(1); + }); + + it("respects abort signal", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(200 * MiB); + + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 10 * MiB }); + dealRepoMock.find.mockResolvedValue([deal1]); + + const abortController = new AbortController(); + abortController.abort(new Error("aborted")); + + await expect(service.cleanupPiecesForProvider("0xProvider", abortController.signal)).rejects.toThrow("aborted"); + }); + + it("bails out when all deletions in a batch fail", async () => { + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(200 * MiB); + + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 10 * MiB }); + const deal2 = makeDeal({ id: "deal-2", pieceId: 2, pieceSize: 10 * MiB }); + dealRepoMock.find.mockResolvedValue([deal1, deal2]); + + vi.spyOn(service, "deletePiece").mockRejectedValue(new Error("persistent failure")); + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.deleted).toBe(0); + expect(result.failed).toBe(2); + expect(result.skipped).toBe(false); + }); + + it("credits 0 bytes and bails out when pieceSize is 0", async () => { + // storedBytes = 110 MiB, target = 80 MiB, excess = 30 MiB to delete + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(THRESHOLD_BYTES + 10 * MiB); + + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 0, fileSize: 10 * MiB }); + // First batch returns the deal, second batch returns [] + dealRepoMock.find.mockResolvedValueOnce([deal1]).mockResolvedValueOnce([]); + + const deletePieceSpy = vi.spyOn(service, "deletePiece").mockResolvedValue(undefined); + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + // Piece is still deleted + expect(result.deleted).toBe(1); + expect(deletePieceSpy).toHaveBeenCalledTimes(1); + // pieceSize=0 credits 0 bytes, so the loop fetches a second batch to confirm no more candidates + expect(dealRepoMock.find).toHaveBeenCalledTimes(2); + }); + + it("loops through multiple batches when excess spans batches", async () => { + // storedBytes = 100 MiB + 20 MiB = 120 MiB, target = 80 MiB, excess = 40 MiB to delete + vi.spyOn(service, "getLiveStoredBytesForProvider").mockResolvedValue(120 * MiB); + + // First batch returns 1 deal (10 MiB freed, still 30 MiB excess) + const deal1 = makeDeal({ id: "deal-1", pieceId: 1, pieceSize: 10 * MiB }); + // Second batch returns 3 more deals (30 MiB freed, excess cleared) + const deal2 = makeDeal({ id: "deal-2", pieceId: 2, pieceSize: 10 * MiB }); + const deal3 = makeDeal({ id: "deal-3", pieceId: 3, pieceSize: 10 * MiB }); + const deal4 = makeDeal({ id: "deal-4", pieceId: 4, pieceSize: 10 * MiB }); + dealRepoMock.find.mockResolvedValueOnce([deal1]).mockResolvedValueOnce([deal2, deal3, deal4]); + + const deletePieceSpy = vi.spyOn(service, "deletePiece").mockResolvedValue(undefined); + + const result = await service.cleanupPiecesForProvider("0xProvider"); + + expect(result.deleted).toBe(4); + expect(deletePieceSpy).toHaveBeenCalledTimes(4); + // find should have been called twice (two batches) + expect(dealRepoMock.find).toHaveBeenCalledTimes(2); + }); + }); + + describe("deletePiece", () => { + it("throws when deal is missing pieceId", async () => { + const deal = makeDeal({ pieceId: undefined }); + + await expect(service.deletePiece(deal)).rejects.toThrow("missing pieceId"); + }); + + it("calls Synapse SDK to delete piece and marks deal as cleaned up", async () => { + const { Synapse } = await import("@filoz/synapse-sdk"); + const deletePieceMock = vi.fn().mockResolvedValue(undefined); + const createContextMock = vi.fn().mockResolvedValue({ + deletePiece: deletePieceMock, + }); + (Synapse.create as ReturnType).mockReturnValue({ + storage: { + createContext: createContextMock, + }, + }); + + const deal = makeDeal({ pieceId: 42, dataSetId: 1n, spAddress: "0xProvider" }); + dealRepoMock.save.mockResolvedValue(deal); + + await service.deletePiece(deal); + + expect(createContextMock).toHaveBeenCalledWith({ + providerId: 9, + }); + expect(deletePieceMock).toHaveBeenCalledWith({ piece: 42n }); + expect(deal.cleanedUp).toBe(true); + expect(deal.cleanedUpAt).toBeInstanceOf(Date); + expect(dealRepoMock.save).toHaveBeenCalledWith(deal); + }); + + it("treats 'Can only schedule removal of live pieces' revert as idempotent success", async () => { + const { Synapse } = await import("@filoz/synapse-sdk"); + const deletePieceMock = vi.fn().mockRejectedValue(new Error("Can only schedule removal of live pieces")); + const createContextMock = vi.fn().mockResolvedValue({ + deletePiece: deletePieceMock, + }); + (Synapse.create as ReturnType).mockReturnValue({ + storage: { + createContext: createContextMock, + }, + }); + + const deal = makeDeal({ pieceId: 42, dataSetId: 1n, spAddress: "0xProvider" }); + dealRepoMock.save.mockResolvedValue(deal); + + await service.deletePiece(deal); + + expect(deal.cleanedUp).toBe(true); + expect(deal.cleanedUpAt).toBeInstanceOf(Date); + expect(dealRepoMock.save).toHaveBeenCalledWith(deal); + }); + + it("treats 'Piece ID already scheduled for removal' revert as idempotent success", async () => { + const { Synapse } = await import("@filoz/synapse-sdk"); + const deletePieceMock = vi.fn().mockRejectedValue(new Error("Piece ID already scheduled for removal")); + const createContextMock = vi.fn().mockResolvedValue({ + deletePiece: deletePieceMock, + }); + (Synapse.create as ReturnType).mockReturnValue({ + storage: { + createContext: createContextMock, + }, + }); + + const deal = makeDeal({ pieceId: 42, dataSetId: 1n, spAddress: "0xProvider" }); + dealRepoMock.save.mockResolvedValue(deal); + + await service.deletePiece(deal); + + expect(deal.cleanedUp).toBe(true); + expect(dealRepoMock.save).toHaveBeenCalledWith(deal); + }); + + it("rethrows non-idempotent errors", async () => { + const { Synapse } = await import("@filoz/synapse-sdk"); + const deletePieceMock = vi.fn().mockRejectedValue(new Error("network timeout")); + const createContextMock = vi.fn().mockResolvedValue({ + deletePiece: deletePieceMock, + }); + (Synapse.create as ReturnType).mockReturnValue({ + storage: { + createContext: createContextMock, + }, + }); + + const deal = makeDeal({ pieceId: 42, dataSetId: 1n, spAddress: "0xProvider" }); + + await expect(service.deletePiece(deal)).rejects.toThrow("network timeout"); + }); + + it("respects abort signal before SDK call", async () => { + const deal = makeDeal(); + const abortController = new AbortController(); + abortController.abort(new Error("cancelled")); + + await expect(service.deletePiece(deal, abortController.signal)).rejects.toThrow("cancelled"); + }); + }); +}); diff --git a/apps/backend/src/piece-cleanup/piece-cleanup.service.ts b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts new file mode 100644 index 00000000..e0e62b1c --- /dev/null +++ b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts @@ -0,0 +1,376 @@ +import { calibration, mainnet, Synapse } from "@filoz/synapse-sdk"; +import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { InjectRepository } from "@nestjs/typeorm"; +import { calculateActualStorage, listDataSets } from "filecoin-pin/core/data-set"; +import { IsNull, Not, Repository } from "typeorm"; +import { privateKeyToAccount } from "viem/accounts"; +import { toStructuredError } from "../common/logging.js"; +import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; +import { Deal } from "../database/entities/deal.entity.js"; +import { DealStatus } from "../database/types.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; + +export type StorageContext = Awaited>; + +export interface CleanupResult { + /** Number of pieces successfully deleted. */ + deleted: number; + /** Number of pieces that failed to delete. */ + failed: number; + /** Whether cleanup was skipped (below threshold). */ + skipped: boolean; + /** Total stored bytes before cleanup. */ + storedBytes: number; + /** Threshold in bytes. */ + thresholdBytes: number; +} + +@Injectable() +export class PieceCleanupService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(PieceCleanupService.name); + private readonly blockchainConfig: IBlockchainConfig; + private sharedSynapse?: Synapse; + + constructor( + private readonly configService: ConfigService, + @InjectRepository(Deal) + private readonly dealRepository: Repository, + private readonly walletSdkService: WalletSdkService, + ) { + this.blockchainConfig = this.configService.get("blockchain"); + } + + async onModuleInit(): Promise { + if (process.env.DEALBOT_DISABLE_CHAIN === "true") { + this.logger.warn("Chain integration disabled; skipping Synapse initialization for piece cleanup."); + return; + } + try { + this.logger.log("Initializing shared Synapse instance for piece cleanup."); + this.sharedSynapse = this.createSynapseInstance(); + } catch (error) { + this.logger.error({ + event: "piece_cleanup_synapse_init_failed", + message: "Failed to initialize shared Synapse instance for piece cleanup; will create on demand", + error: toStructuredError(error), + }); + } + } + + async onModuleDestroy(): Promise { + if (this.sharedSynapse) { + this.sharedSynapse = undefined; + } + } + + private createSynapseInstance(): Synapse { + try { + return Synapse.create({ + account: privateKeyToAccount(this.blockchainConfig.walletPrivateKey), + chain: this.blockchainConfig.network === "mainnet" ? mainnet : calibration, + source: "dealbot", + }); + } catch (error) { + this.logger.error({ + event: "synapse_init_failed", + message: "Failed to initialize Synapse for piece cleanup", + error: toStructuredError(error), + }); + throw error; + } + } + + /** + * Check whether a provider is over the configured storage quota. + * Uses live data from the provider with DB fallback. + * Used by the deal handler to gate new deal creation. + */ + async isProviderOverQuota(spAddress: string): Promise { + const thresholdBytes = this.configService.get("pieceCleanup").maxDatasetStorageSizeBytes; + try { + const liveBytes = await this.getLiveStoredBytesForProvider(spAddress); + return liveBytes > thresholdBytes; + } catch (error) { + this.logger.warn({ + event: "piece_cleanup_live_query_failed", + message: `Failed to query live storage for SP ${spAddress}; falling back to DB`, + spAddress, + error: toStructuredError(error), + }); + const storedBytes = await this.getStoredBytesForProvider(spAddress); + return storedBytes > thresholdBytes; + } + } + + /** + * Run cleanup for a single SP. + * 1. Query live storage (falls back to DB if unavailable) + * 2. If live usage > MAX threshold, start cleanup + * 3. Select oldest completed pieces from DB as deletion candidates + * 4. For each piece, call deletePiece() via Synapse SDK + * 5. Mark the deal record as cleaned up + * 6. Repeat until usage drops below TARGET or runtime cap is reached + */ + async cleanupPiecesForProvider(spAddress: string, signal?: AbortSignal): Promise { + const { maxDatasetStorageSizeBytes: thresholdBytes, targetDatasetStorageSizeBytes: targetBytes } = + this.configService.get("pieceCleanup"); + + let storedBytes: number; + try { + storedBytes = await this.getLiveStoredBytesForProvider(spAddress); + } catch (error) { + this.logger.warn({ + event: "piece_cleanup_live_query_failed", + message: `Failed to query live storage for SP ${spAddress}; falling back to DB`, + spAddress, + error: toStructuredError(error), + }); + storedBytes = await this.getStoredBytesForProvider(spAddress); + } + + if (storedBytes <= thresholdBytes) { + this.logger.debug({ + event: "piece_cleanup_below_threshold", + message: `SP ${spAddress}: ${this.formatBytes(storedBytes)} stored, threshold ${this.formatBytes(thresholdBytes)}; skipping cleanup`, + spAddress, + storedBytes, + thresholdBytes, + }); + return { deleted: 0, failed: 0, skipped: true, storedBytes, thresholdBytes }; + } + + const excessBytes = storedBytes - targetBytes; + this.logger.log({ + event: "piece_cleanup_started", + message: `SP ${spAddress}: ${this.formatBytes(storedBytes)} stored exceeds threshold ${this.formatBytes(thresholdBytes)} by ${this.formatBytes(excessBytes)}; starting cleanup`, + spAddress, + storedBytes, + thresholdBytes, + excessBytes, + }); + + let deleted = 0; + let failed = 0; + let bytesRemoved = 0; + + const synapse = this.sharedSynapse ?? this.createSynapseInstance(); + const providerId = this.walletSdkService.getProviderInfo(spAddress)?.id; + if (providerId === undefined) { + throw new Error(`Provider ID not found for SP address ${spAddress}`); + } + const storage = await synapse.storage.createContext({ + providerId, + }); + + // Fetch candidates in batches. Keep deleting until back under quota or runtime cap. + while (bytesRemoved < excessBytes) { + signal?.throwIfAborted(); + + const candidates = await this.getCleanupCandidates(spAddress, 50); + + if (candidates.length === 0) { + this.logger.warn({ + event: "piece_cleanup_no_candidates", + message: `SP ${spAddress}: above threshold but no more cleanup candidates found`, + spAddress, + }); + break; + } + + let batchDeletedCount = 0; + + for (const deal of candidates) { + signal?.throwIfAborted(); + + if (bytesRemoved >= excessBytes) { + this.logger.debug({ + event: "piece_cleanup_excess_cleared", + message: `SP ${spAddress}: removed ${this.formatBytes(bytesRemoved)} which clears the excess; stopping`, + spAddress, + bytesRemoved, + excessBytes, + }); + break; + } + + try { + await this.deletePiece(deal, signal, storage); + deleted++; + batchDeletedCount++; + bytesRemoved += Number(deal.pieceSize || 0); + this.logger.log({ + event: "piece_cleanup_piece_deleted", + message: `Deleted piece ${deal.pieceId} (pieceCid: ${deal.pieceCid}) from SP ${spAddress}`, + spAddress, + dealId: deal.id, + pieceId: deal.pieceId, + pieceCid: deal.pieceCid, + dataSetId: deal.dataSetId, + pieceSize: deal.pieceSize, + }); + } catch (error) { + failed++; + this.logger.error({ + event: "piece_cleanup_piece_delete_failed", + message: `Failed to delete piece ${deal.pieceId} from SP ${spAddress}`, + spAddress, + dealId: deal.id, + pieceId: deal.pieceId, + pieceCid: deal.pieceCid, + dataSetId: deal.dataSetId, + error: toStructuredError(error), + }); + // Continue to next piece + } + } + + if (batchDeletedCount === 0) { + this.logger.warn({ + event: "piece_cleanup_no_progress", + message: `SP ${spAddress}: no pieces were deleted in the last batch; stopping to avoid infinite loop`, + spAddress, + failed, + }); + break; + } + } + + this.logger.log({ + event: "piece_cleanup_completed", + message: `SP ${spAddress}: cleanup completed — ${deleted} deleted, ${failed} failed, ${this.formatBytes(bytesRemoved)} freed`, + spAddress, + deleted, + failed, + bytesRemoved, + storedBytes, + thresholdBytes, + }); + + return { deleted, failed, skipped: false, storedBytes, thresholdBytes }; + } + + /** + * Query the provider's actual live storage via filecoin-pin. + */ + async getLiveStoredBytesForProvider(spAddress: string, signal?: AbortSignal): Promise { + const synapse = this.sharedSynapse ?? this.createSynapseInstance(); + + const datasets = await listDataSets(synapse, { + filter: (ds) => ds.serviceProvider === spAddress, + }); + + if (datasets.length === 0) { + this.logger.debug({ + event: "piece_cleanup_no_live_datasets", + message: `SP ${spAddress}: no live datasets found on provider`, + spAddress, + }); + return 0; + } + + const result = await calculateActualStorage(synapse, datasets, { signal }); + + this.logger.debug({ + event: "piece_cleanup_live_storage_queried", + message: `SP ${spAddress}: live storage = ${this.formatBytes(Number(result.totalBytes))} across ${result.dataSetCount} datasets (${result.pieceCount} pieces)`, + spAddress, + totalBytes: Number(result.totalBytes), + dataSetCount: result.dataSetCount, + pieceCount: result.pieceCount, + timedOut: result.timedOut, + }); + + return Number(result.totalBytes); + } + + /** + * Calculate total stored bytes for a provider from the deals table. + * Only counts completed deals that have not already been cleaned up. + */ + async getStoredBytesForProvider(spAddress: string): Promise { + const result = await this.dealRepository + .createQueryBuilder("deal") + .select("COALESCE(SUM(deal.piece_size), 0)", "totalBytes") + .where("deal.sp_address = :spAddress", { spAddress }) + .andWhere("deal.status = :status", { status: DealStatus.DEAL_CREATED }) + .andWhere("deal.piece_id IS NOT NULL") + .andWhere("deal.data_set_id IS NOT NULL") + .andWhere("deal.cleaned_up = :cleanedUp", { cleanedUp: false }) + .getRawOne<{ totalBytes: string }>(); + + return Number(result?.totalBytes ?? 0); + } + + /** + * Get the oldest completed deals (candidates for cleanup). + */ + async getCleanupCandidates(spAddress: string, limit: number): Promise { + return this.dealRepository.find({ + where: { + spAddress, + status: DealStatus.DEAL_CREATED, + pieceId: Not(IsNull()), + dataSetId: Not(IsNull()), + cleanedUp: false, + }, + order: { createdAt: "ASC" }, + take: limit, + }); + } + + /** + * Delete a single piece via Synapse SDK and mark the deal as cleaned up. + */ + async deletePiece(deal: Deal, signal?: AbortSignal, existingStorage?: StorageContext): Promise { + if (deal.pieceId == null) { + throw new Error(`Deal ${deal.id} is missing pieceId`); + } + + signal?.throwIfAborted(); + + const providerId = this.walletSdkService.getProviderInfo(deal.spAddress)?.id; + if (providerId === undefined) { + throw new Error(`Provider ID not found for SP address ${deal.spAddress}`); + } + const storage = + existingStorage ?? + (await (this.sharedSynapse ?? this.createSynapseInstance()).storage.createContext({ + providerId, + })); + + try { + await storage.deletePiece({ piece: BigInt(deal.pieceId) }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + // Idempotent: treat "piece already gone" contract reverts as success. + const isAlreadyDeleted = + message.includes("Can only schedule removal of live pieces") || + message.includes("Piece ID already scheduled for removal"); + + if (isAlreadyDeleted) { + this.logger.debug({ + event: "piece_cleanup_already_deleted", + message: `Piece ${deal.pieceId} on SP ${deal.spAddress} already deleted; treating as success`, + dealId: deal.id, + pieceId: deal.pieceId, + spAddress: deal.spAddress, + }); + } else { + throw error; + } + } + + // Mark the deal as cleaned up + deal.cleanedUp = true; + deal.cleanedUpAt = new Date(); + await this.dealRepository.save(deal); + } + + private formatBytes(bytes: number): string { + if (bytes >= 1024 * 1024 * 1024) { + return `${(bytes / (1024 * 1024 * 1024)).toFixed(1)} GiB`; + } + return `${(bytes / (1024 * 1024)).toFixed(1)} MiB`; + } +} diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index 8a079330..39d7db08 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -319,3 +319,63 @@ describe("RetrievalService timeouts", () => { expect(mockRetrievalMetrics.observeCheckDuration).toHaveBeenCalledWith(labels, 900); }); }); + +describe("RetrievalService DB/provider drift", () => { + const mockConfigService = { + get: vi.fn((key: string) => { + if (key === "jobs") return { mode: "cron" }; + if (key === "blockchain") return { useOnlyApprovedProviders: false }; + if (key === "dataset") return { randomDatasetSizes: [10] }; + if (key === "timeouts") return { ipniVerificationTimeoutMs: 10_000, ipniVerificationPollingMs: 2_000 }; + return undefined; + }), + }; + + function createMockQueryBuilder() { + const calls: Array<{ clause: string; params?: Record }> = []; + const qb = { + innerJoin: vi.fn().mockReturnThis(), + where: vi.fn().mockReturnThis(), + andWhere: vi.fn((clause: string, params?: Record) => { + calls.push({ clause, params }); + return qb; + }), + orderBy: vi.fn().mockReturnThis(), + take: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + getMany: vi.fn().mockResolvedValue([]), + getOne: vi.fn().mockResolvedValue(null), + }; + return { qb, calls }; + } + + async function createServiceWithQb(mockQb: ReturnType["qb"]) { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + RetrievalService, + { provide: RetrievalAddonsService, useValue: {} }, + { provide: getRepositoryToken(Deal), useValue: { createQueryBuilder: vi.fn().mockReturnValue(mockQb) } }, + { provide: getRepositoryToken(Retrieval), useValue: {} }, + { provide: getRepositoryToken(StorageProvider), useValue: {} }, + { provide: RetrievalCheckMetrics, useValue: {} }, + { provide: DiscoverabilityCheckMetrics, useValue: {} }, + { provide: IpniVerificationService, useValue: {} }, + { provide: ConfigService, useValue: mockConfigService }, + ], + }).compile(); + return module.get(RetrievalService); + } + + it("selectRandomSuccessfulDealForProvider excludes cleaned-up deals", async () => { + const { qb, calls } = createMockQueryBuilder(); + const svc = (await createServiceWithQb(qb)) as unknown as { + selectRandomSuccessfulDealForProvider: (spAddress: string) => Promise; + }; + + await svc.selectRandomSuccessfulDealForProvider("0xSP"); + + const cleanedUpCall = calls.find((c) => c.clause.includes("cleaned_up")); + expect(cleanedUpCall).toBeDefined(); + expect(cleanedUpCall?.params).toEqual({ cleanedUp: false }); + }); +}); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index cedf1ab3..6cb4a325 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -323,7 +323,8 @@ export class RetrievalService { statuses: [DealStatus.DEAL_CREATED], }) .andWhere("deal.metadata -> 'ipfs_pin' ->> 'enabled' = 'true'") - .andWhere("deal.metadata -> 'ipfs_pin' ->> 'rootCID' IS NOT NULL"); + .andWhere("deal.metadata -> 'ipfs_pin' ->> 'rootCID' IS NOT NULL") + .andWhere("deal.cleaned_up = :cleanedUp", { cleanedUp: false }); if (randomDatasetSizes.length > 0) { query.andWhere("(deal.metadata -> 'ipfs_pin' ->> 'originalSize')::bigint IN (:...sizes)", { sizes: randomDatasetSizes, diff --git a/docs/checks/production-configuration-and-approval-methodology.md b/docs/checks/production-configuration-and-approval-methodology.md index 63f58709..6da7c92f 100644 --- a/docs/checks/production-configuration-and-approval-methodology.md +++ b/docs/checks/production-configuration-and-approval-methodology.md @@ -82,7 +82,7 @@ With the current configuration, Dealbot will add this much synthetic load on SPs Over the course of a day this means: * 75 proof challenges * 960 MB of SP download bandwidth in support of adding new pieces -* 960 MB of disk space for the pieces. +* 960 MB of disk space for the pieces. Piece cleanup removes the oldest pieces once total stored data per SP exceeds a configurable threshold (see [`MAX_DATASET_STORAGE_SIZE_BYTES`](../environment-variables.md#max_dataset_storage_size_bytes)). * 1,920 MB of SP upload bandwidth in support of retrievals ## Appendix @@ -97,7 +97,7 @@ This is in a private repo because it includes other infrastructure configuration ### Does dealbot cleanup old pieces? -No, not currently. See issue [#284](https://github.com/FilOzone/dealbot/issues/284) for more details. +Yes. Dealbot runs a periodic piece cleanup job per SP that monitors total stored data and removes the oldest pieces when a configurable threshold is exceeded. This produces headroom so new pieces can continue to be added. See the [Piece Cleanup](../environment-variables.md#piece-cleanup) environment variables for configuration details. ## How are data storage and retrieval check statistics/thresholds calculated? diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 7d8e0a8e..4b308d93 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -14,6 +14,7 @@ This document provides a comprehensive guide to all environment variables used b | [Jobs (pg-boss)](#jobs-pg-boss) | `DEALBOT_PGBOSS_SCHEDULER_ENABLED`, `DEALBOT_PGBOSS_POOL_MAX`, `DEALS_PER_SP_PER_HOUR`, `DATASET_CREATIONS_PER_SP_PER_HOUR`, `RETRIEVALS_PER_SP_PER_HOUR`, `METRICS_PER_HOUR`, `JOB_SCHEDULER_POLL_SECONDS`, `JOB_WORKER_POLL_SECONDS`, `PG_BOSS_LOCAL_CONCURRENCY`, `JOB_CATCHUP_MAX_ENQUEUE`, `JOB_SCHEDULE_PHASE_SECONDS`, `JOB_ENQUEUE_JITTER_SECONDS`, `DEAL_JOB_TIMEOUT_SECONDS`, `RETRIEVAL_JOB_TIMEOUT_SECONDS`, `IPFS_BLOCK_FETCH_CONCURRENCY` | | [Dataset](#dataset-configuration) | `DEALBOT_LOCAL_DATASETS_PATH`, `RANDOM_PIECE_SIZES` | | [Timeouts](#timeout-configuration) | `CONNECT_TIMEOUT_MS`, `HTTP_REQUEST_TIMEOUT_MS`, `HTTP2_REQUEST_TIMEOUT_MS`, `IPNI_VERIFICATION_TIMEOUT_MS`, `IPNI_VERIFICATION_POLLING_MS` | +| [Piece Cleanup](#piece-cleanup) | `MAX_DATASET_STORAGE_SIZE_BYTES`, `TARGET_DATASET_STORAGE_SIZE_BYTES`, `JOB_PIECE_CLEANUP_PER_SP_PER_HOUR`, `MAX_PIECE_CLEANUP_RUNTIME_SECONDS` | | [Web Frontend](#web-frontend) | `VITE_API_BASE_URL`, `VITE_PLAUSIBLE_DATA_DOMAIN`, `DEALBOT_API_BASE_URL` | --- @@ -783,6 +784,112 @@ Use this to stagger multiple dealbot deployments that are not sharing a database **Note**: This affects the number of concurrent `/ipfs/` requests per retrieval. --- + +## Piece Cleanup + +These variables control the automatic cleanup of old pieces from storage providers to prevent +unbounded data growth. Cleanup runs as a periodic pg-boss job per SP. + +The cleanup flow uses **live provider data** (via `filecoin-pin`'s `calculateActualStorage()`) as the source of truth for how much data an SP is storing. When live usage exceeds the high-water mark (`MAX_DATASET_STORAGE_SIZE_BYTES`), the cleanup job deletes the oldest pieces until usage drops below the low-water mark (`TARGET_DATASET_STORAGE_SIZE_BYTES`). This high-water/low-water approach prevents thrashing near the threshold. + +If the live query fails, cleanup falls back to DB-based `SUM(piece_size)` for the quota decision. + +SPs that are over quota will also have new deal creation skipped until cleanup runs. + +### `MAX_DATASET_STORAGE_SIZE_BYTES` + +- **Type**: `number` (integer, bytes) +- **Required**: No +- **Default**: `25769803776` (24 GiB) +- **Minimum**: `1` + +**Role**: **High-water mark.** Maximum total stored data per SP (in bytes) before cleanup kicks in. When live storage for a provider exceeds this value, the cleanup job triggers and deletes the oldest pieces until usage drops below `TARGET_DATASET_STORAGE_SIZE_BYTES` (the low-water mark). + +**When to update**: + +- Increase for longer runway before cleanup kicks in (e.g. months vs weeks) +- Decrease if SP storage is constrained or costs are a concern + +**Example**: + +```bash +MAX_DATASET_STORAGE_SIZE_BYTES=12884901888 # 12 GiB per SP +``` + +--- + +### `TARGET_DATASET_STORAGE_SIZE_BYTES` + +- **Type**: `number` (integer, bytes) +- **Required**: No +- **Default**: `21474836480` (20 GiB) +- **Minimum**: `1` + +**Role**: **Low-water mark.** When cleanup triggers (live usage exceeds `MAX_DATASET_STORAGE_SIZE_BYTES`), pieces are deleted until usage drops below this target. The gap between MAX and TARGET creates headroom so cleanup doesn't re-trigger immediately. + +**Headroom math**: At 4 deals/SP/hour × 10 MiB = ~960 MiB/day growth. With 4 GiB headroom (24 GiB MAX − 20 GiB TARGET), cleanup provides ~4 days of breathing room per run, which aligns with the daily default cadence. + +**When to update**: + +- Decrease for more aggressive cleanup (larger gap = more headroom) +- Increase toward MAX for minimal cleanup (smaller gap = less headroom) +- Must be less than `MAX_DATASET_STORAGE_SIZE_BYTES` for cleanup to have effect + +**Example**: + +```bash +TARGET_DATASET_STORAGE_SIZE_BYTES=16106127360 # 15 GiB per SP (9 GiB headroom) +``` + +--- + +### `JOB_PIECE_CLEANUP_PER_SP_PER_HOUR` + +- **Type**: `number` +- **Required**: No +- **Default**: `0.0417` (~1/24, approximately once per day) +- **Minimum**: `0.001` +- **Maximum**: `20` + +**Role**: Target number of piece cleanup runs per storage provider per hour. Controls how frequently the cleanup job runs for each SP. The rate is converted to an interval internally (e.g. 1/hr = every 3600s, 1/24/hr ≈ every 86400s = once per day). + +Only used when `DEALBOT_JOBS_MODE=pgboss`. + +**When to update**: + +- Increase to run cleanup more frequently when SPs are frequently over quota +- Decrease to reduce scheduling overhead + +**Example**: + +```bash +# Once per hour (more aggressive) +JOB_PIECE_CLEANUP_PER_SP_PER_HOUR=1 + +# Once per week (very conservative) +JOB_PIECE_CLEANUP_PER_SP_PER_HOUR=0.006 +``` + +--- + +### `MAX_PIECE_CLEANUP_RUNTIME_SECONDS` + +- **Type**: `number` +- **Required**: No +- **Default**: `300` (5 minutes) +- **Minimum**: `60` + +**Role**: Maximum runtime for a cleanup job before forced abort via `AbortController`. Prevents stuck cleanup jobs from blocking the SP work queue. + +Only used when `DEALBOT_JOBS_MODE=pgboss`. + +**When to update**: + +- Increase if piece deletion calls to the Synapse SDK are known to be slow +- Decrease for faster abort detection on stuck jobs + +--- + ## Dataset Configuration ### `DEALBOT_LOCAL_DATASETS_PATH` diff --git a/docs/jobs.md b/docs/jobs.md index c4b6fbef..fca8d595 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -15,7 +15,7 @@ This doc explains what a "job" is in dealbot, how jobs are defined, how they're | --- | --- | --- | | `job_schedule_state` | One per `` plus global rows | Schedule state owned by dealbot. | | Storage provider (SP) | One per SP in registry | Filtered by `USE_ONLY_APPROVED_PROVIDERS` when enabled. | -| Job type | `deal`, `retrieval`, `metrics`, `metrics_cleanup` | `deal` corresponds to "data storage check" externally; we keep `deal` in code/DB for compatibility. | +| Job type | `deal`, `retrieval`, `piece_cleanup`, `metrics`, `metrics_cleanup` | `deal` corresponds to "data storage check" externally; we keep `deal` in code/DB for compatibility. | | pg-boss queue | `sp.work`, `metrics.run`, `metrics.cleanup` | `sp.work` is a singleton queue. | | Dealbot scheduler | One per process (when enabled) | Runs the scheduling loop. | | Dealbot worker process | One Node.js process with `DEALBOT_RUN_MODE=worker` or `both` | Hosts pg-boss workers. | @@ -34,6 +34,7 @@ This doc explains what a "job" is in dealbot, how jobs are defined, how they're | --- | --- | --- | --- | | `deal` | `sp.work` | [`JobsService.handleDealJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'deal', spAddress, intervalSeconds }` | | `retrieval` | `sp.work` | [`JobsService.handleRetrievalJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'retrieval', spAddress, intervalSeconds }` | +| `piece_cleanup` | `sp.work` | [`JobsService.handlePieceCleanupJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'piece_cleanup', spAddress, intervalSeconds }` | | `metrics` | `metrics.run` | [`JobsService.handleMetricsJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ intervalSeconds }` | | `metrics_cleanup` | `metrics.cleanup` | [`JobsService.handleMetricsCleanupJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ intervalSeconds }` | @@ -137,7 +138,7 @@ Use these formulas to reason about whether the system can keep up and how much b Per-SP capacity (one job per SP at a time): -- Per-SP execution-minutes per hour = `(deals_per_sp_per_hour * deal_max_minutes) + (retrievals_per_sp_per_hour * retrieval_max_minutes)` +- Per-SP execution-minutes per hour = `(deals_per_sp_per_hour * deal_max_minutes) + (retrievals_per_sp_per_hour * retrieval_max_minutes) + (piece_cleanup_per_sp_per_hour * cleanup_max_minutes)` - If per-SP execution-minutes per hour > 60, that SP can never catch up (backlog grows), even if we had infinite dealbot workers. - If per-SP execution-minutes per hour <= 60, backlog should eventually drain, assuming there are enough dealbot workers (headroom = `60 - per-SP execution-minutes per hour`). @@ -190,7 +191,7 @@ See the "Jobs (pg-boss)" section in `docs/environment-variables.md` for full def - [`DEALBOT_PGBOSS_SCHEDULER_ENABLED`](./environment-variables.md#dealbot_pgboss_scheduler_enabled) - [`DEALBOT_RUN_MODE`](./environment-variables.md#dealbot_run_mode) -- [`DEALS_PER_SP_PER_HOUR`](./environment-variables.md#deals_per_sp_per_hour), [`RETRIEVALS_PER_SP_PER_HOUR`](./environment-variables.md#retrievals_per_sp_per_hour), [`METRICS_PER_HOUR`](./environment-variables.md#metrics_per_hour) +- [`DEALS_PER_SP_PER_HOUR`](./environment-variables.md#deals_per_sp_per_hour), [`RETRIEVALS_PER_SP_PER_HOUR`](./environment-variables.md#retrievals_per_sp_per_hour), [`METRICS_PER_HOUR`](./environment-variables.md#metrics_per_hour), [`JOB_PIECE_CLEANUP_PER_SP_PER_HOUR`](./environment-variables.md#job_piece_cleanup_per_sp_per_hour) - [`JOB_SCHEDULER_POLL_SECONDS`](./environment-variables.md#job_scheduler_poll_seconds), [`JOB_WORKER_POLL_SECONDS`](./environment-variables.md#job_worker_poll_seconds) - [`JOB_CATCHUP_MAX_ENQUEUE`](./environment-variables.md#job_catchup_max_enqueue) - [`JOB_SCHEDULE_PHASE_SECONDS`](./environment-variables.md#job_schedule_phase_seconds) diff --git a/docs/runbooks/jobs.md b/docs/runbooks/jobs.md index 099433b0..73e2c7fa 100644 --- a/docs/runbooks/jobs.md +++ b/docs/runbooks/jobs.md @@ -14,7 +14,7 @@ For routine daily maintenance windows, prefer `DEALBOT_MAINTENANCE_WINDOWS_UTC` -- Pause all deal and retrieval jobs UPDATE job_schedule_state SET paused = true, updated_at = NOW() -WHERE job_type IN ('deal', 'retrieval'); +WHERE job_type IN ('deal', 'retrieval', 'piece_cleanup'); -- Pause metrics jobs UPDATE job_schedule_state @@ -27,7 +27,7 @@ To pause a single provider: ```sql UPDATE job_schedule_state SET paused = true, updated_at = NOW() -WHERE job_type IN ('deal', 'retrieval') +WHERE job_type IN ('deal', 'retrieval', 'piece_cleanup') AND sp_address = ''; ``` @@ -36,7 +36,7 @@ WHERE job_type IN ('deal', 'retrieval') ```sql UPDATE job_schedule_state SET paused = false, next_run_at = NOW(), updated_at = NOW() -WHERE job_type IN ('deal', 'retrieval', 'metrics', 'metrics_cleanup'); +WHERE job_type IN ('deal', 'retrieval', 'piece_cleanup', 'metrics', 'metrics_cleanup'); ``` To resume a single provider: @@ -44,7 +44,7 @@ To resume a single provider: ```sql UPDATE job_schedule_state SET paused = false, next_run_at = NOW(), updated_at = NOW() -WHERE job_type IN ('deal', 'retrieval') +WHERE job_type IN ('deal', 'retrieval', 'piece_cleanup') AND sp_address = ''; ``` @@ -91,6 +91,15 @@ WHERE job_type = 'retrieval' AND sp_address = ''; ``` +Run piece cleanup for a specific SP: + +```sql +UPDATE job_schedule_state +SET paused = false, next_run_at = NOW(), updated_at = NOW() +WHERE job_type = 'piece_cleanup' + AND sp_address = ''; +``` + - Offsets (`*_START_OFFSET_SECONDS`) are ignored in pg-boss mode. - Job schedules are rate-based (per hour) and persist across restarts. - Paused schedules remain paused until explicitly resumed. Pausing is strictly for manual/admin use.