Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ export const configValidationSchema = Joi.object({
DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs
IPFS_BLOCK_FETCH_CONCURRENCY: Joi.number().integer().min(1).max(32).default(6),

// Piece Cleanup
MAX_DATASET_STORAGE_SIZE_BYTES: Joi.number()
.integer()
.min(1)
.default(24 * 1024 * 1024 * 1024), // 24 GiB per SP
TARGET_DATASET_STORAGE_SIZE_BYTES: Joi.number()
.integer()
.min(1)
.default(20 * 1024 * 1024 * 1024), // 20 GiB per SP
JOB_PIECE_CLEANUP_PER_SP_PER_HOUR: Joi.number()
.min(0.001)
.max(20)
.default(1 / 24), // ~once per day
MAX_PIECE_CLEANUP_RUNTIME_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for cleanup jobs

// Dataset
DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH),
RANDOM_PIECE_SIZES: Joi.string().default("10485760"), // 10 MiB
Expand Down Expand Up @@ -204,6 +219,20 @@ export interface IJobsConfig {
* Uses AbortController to actively cancel job execution.
*/
retrievalJobTimeoutSeconds: number;
/**
* Target number of piece cleanup runs per storage provider per hour.
*
* Increasing this makes cleanup more aggressive at the cost of more SP API calls.
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
pieceCleanupPerSpPerHour: number;
/**
* Maximum runtime (seconds) for piece cleanup jobs before forced abort.
*
* Uses AbortController to actively cancel job execution.
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
maxPieceCleanupRuntimeSeconds: number;
}

export interface IDatasetConfig {
Expand All @@ -223,6 +252,11 @@ export interface IRetrievalConfig {
ipfsBlockFetchConcurrency: number;
}

export interface IPieceCleanupConfig {
maxDatasetStorageSizeBytes: number;
targetDatasetStorageSizeBytes: number;
}

export interface IConfig {
app: IAppConfig;
database: IDatabaseConfig;
Expand All @@ -232,6 +266,7 @@ export interface IConfig {
dataset: IDatasetConfig;
timeouts: ITimeoutConfig;
retrieval: IRetrievalConfig;
pieceCleanup: IPieceCleanupConfig;
}

export function loadConfig(): IConfig {
Expand Down Expand Up @@ -293,6 +328,8 @@ export function loadConfig(): IConfig {
dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || "1"),
maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
},
dataset: {
localDatasetsPath: process.env.DEALBOT_LOCAL_DATASETS_PATH || DEFAULT_LOCAL_DATASETS_PATH,
Expand Down Expand Up @@ -322,5 +359,15 @@ export function loadConfig(): IConfig {
retrieval: {
ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10),
},
pieceCleanup: {
maxDatasetStorageSizeBytes: Number.parseInt(
process.env.MAX_DATASET_STORAGE_SIZE_BYTES || String(24 * 1024 * 1024 * 1024),
10,
),
targetDatasetStorageSizeBytes: Number.parseInt(
process.env.TARGET_DATASET_STORAGE_SIZE_BYTES || String(20 * 1024 * 1024 * 1024),
10,
),
},
};
}
7 changes: 7 additions & 0 deletions apps/backend/src/database/entities/deal.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ export class Deal {
@Column({ name: "retry_count", default: 0 })
retryCount: number;

// Piece cleanup tracking
@Column({ name: "cleaned_up", default: false })
cleanedUp: boolean;

@Column({ name: "cleaned_up_at", type: "timestamptz", nullable: true })
cleanedUpAt: Date;

@CreateDateColumn({ name: "created_at", type: "timestamptz" })
createdAt: Date;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export type JobType =
| "metrics"
| "metrics_cleanup"
| "providers_refresh"
| "data_retention_poll";
| "data_retention_poll"
| "piece_cleanup";

@Entity("job_schedule_state")
@Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { MigrationInterface, QueryRunner } from "typeorm";

export class AddPieceCleanupColumns1761500000002 implements MigrationInterface {
name = "AddPieceCleanupColumns1761500000002";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
ALTER TABLE deals
ADD COLUMN IF NOT EXISTS cleaned_up BOOLEAN NOT NULL DEFAULT false,
ADD COLUMN IF NOT EXISTS cleaned_up_at TIMESTAMPTZ NULL
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS cleaned_up_at`);
await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS cleaned_up`);
}
}
2 changes: 2 additions & 0 deletions apps/backend/src/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { StorageProvider } from "../database/entities/storage-provider.entity.js
import { DealModule } from "../deal/deal.module.js";
import { MetricsModule } from "../metrics/metrics.module.js";
import { MetricsWorkerModule } from "../metrics/metrics-worker.module.js";
import { PieceCleanupModule } from "../piece-cleanup/piece-cleanup.module.js";
import { RetrievalModule } from "../retrieval/retrieval.module.js";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
import { JobsService } from "./jobs.service.js";
Expand All @@ -25,6 +26,7 @@ const metricsModule = runMode === "worker" ? MetricsWorkerModule : MetricsModule
metricsModule,
WalletSdkModule,
DataRetentionModule,
PieceCleanupModule,
],
providers: [JobsService, JobScheduleRepository],
})
Expand Down
191 changes: 159 additions & 32 deletions apps/backend/src/jobs/jobs.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ describe("JobsService schedule rows", () => {
};
let dataRetentionServiceMock: { pollDataRetention: ReturnType<typeof vi.fn> };
let metricsMocks: {
jobsQueuedGauge: JobsServiceDeps[8];
jobsRetryScheduledGauge: JobsServiceDeps[9];
oldestQueuedAgeGauge: JobsServiceDeps[10];
oldestInFlightAgeGauge: JobsServiceDeps[11];
jobsInFlightGauge: JobsServiceDeps[12];
jobsEnqueueAttemptsCounter: JobsServiceDeps[13];
jobsStartedCounter: JobsServiceDeps[14];
jobsCompletedCounter: JobsServiceDeps[15];
jobsPausedGauge: JobsServiceDeps[16];
jobDuration: JobsServiceDeps[17];
jobsQueuedGauge: JobsServiceDeps[9];
jobsRetryScheduledGauge: JobsServiceDeps[10];
oldestQueuedAgeGauge: JobsServiceDeps[11];
oldestInFlightAgeGauge: JobsServiceDeps[12];
jobsInFlightGauge: JobsServiceDeps[13];
jobsEnqueueAttemptsCounter: JobsServiceDeps[14];
jobsStartedCounter: JobsServiceDeps[15];
jobsCompletedCounter: JobsServiceDeps[16];
jobsPausedGauge: JobsServiceDeps[17];
jobDuration: JobsServiceDeps[18];
};
let baseConfigValues: Partial<IConfig>;
let configService: JobsServiceDeps[0];
Expand All @@ -54,16 +54,17 @@ describe("JobsService schedule rows", () => {
metricsSchedulerService: JobsServiceDeps[5];
walletSdkService: JobsServiceDeps[6];
dataRetentionService: JobsServiceDeps[7];
jobsQueuedGauge: JobsServiceDeps[8];
jobsRetryScheduledGauge: JobsServiceDeps[9];
oldestQueuedAgeGauge: JobsServiceDeps[10];
oldestInFlightAgeGauge: JobsServiceDeps[11];
jobsInFlightGauge: JobsServiceDeps[12];
jobsEnqueueAttemptsCounter: JobsServiceDeps[13];
jobsStartedCounter: JobsServiceDeps[14];
jobsCompletedCounter: JobsServiceDeps[15];
jobsPausedGauge: JobsServiceDeps[16];
jobDuration: JobsServiceDeps[17];
pieceCleanupService: JobsServiceDeps[8];
jobsQueuedGauge: JobsServiceDeps[9];
jobsRetryScheduledGauge: JobsServiceDeps[10];
oldestQueuedAgeGauge: JobsServiceDeps[11];
oldestInFlightAgeGauge: JobsServiceDeps[12];
jobsInFlightGauge: JobsServiceDeps[13];
jobsEnqueueAttemptsCounter: JobsServiceDeps[14];
jobsStartedCounter: JobsServiceDeps[15];
jobsCompletedCounter: JobsServiceDeps[16];
jobsPausedGauge: JobsServiceDeps[17];
jobDuration: JobsServiceDeps[18];
}>,
) => JobsService;

Expand Down Expand Up @@ -92,16 +93,16 @@ describe("JobsService schedule rows", () => {
};

metricsMocks = {
jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[8],
jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9],
oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10],
oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11],
jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12],
jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[13],
jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14],
jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15],
jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[16],
jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[17],
jobsQueuedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[9],
jobsRetryScheduledGauge: { set: vi.fn() } as unknown as JobsServiceDeps[10],
oldestQueuedAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[11],
oldestInFlightAgeGauge: { set: vi.fn() } as unknown as JobsServiceDeps[12],
jobsInFlightGauge: { set: vi.fn() } as unknown as JobsServiceDeps[13],
jobsEnqueueAttemptsCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[14],
jobsStartedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[15],
jobsCompletedCounter: { inc: vi.fn() } as unknown as JobsServiceDeps[16],
jobsPausedGauge: { set: vi.fn() } as unknown as JobsServiceDeps[17],
jobDuration: { observe: vi.fn() } as unknown as JobsServiceDeps[18],
};

baseConfigValues = {
Expand All @@ -120,6 +121,8 @@ describe("JobsService schedule rows", () => {
pgbossSchedulerEnabled: true,
workerPollSeconds: 60,
dataSetCreationJobTimeoutSeconds: 300,
pieceCleanupPerSpPerHour: 1,
maxPieceCleanupRuntimeSeconds: 300,
} as IConfig["jobs"],
database: {
host: "localhost",
Expand All @@ -128,6 +131,9 @@ describe("JobsService schedule rows", () => {
password: "pass",
database: "dealbot",
} as IConfig["database"],
pieceCleanup: {
maxDatasetStorageSizeBytes: 24 * 1024 * 1024 * 1024,
} as IConfig["pieceCleanup"],
};

configService = {
Expand All @@ -144,6 +150,8 @@ describe("JobsService schedule rows", () => {
overrides.metricsSchedulerService ?? ({} as JobsServiceDeps[5]),
overrides.walletSdkService ?? ({} as JobsServiceDeps[6]),
overrides.dataRetentionService ?? (dataRetentionServiceMock as unknown as JobsServiceDeps[7]),
overrides.pieceCleanupService ??
({ isProviderOverQuota: vi.fn().mockResolvedValue(false) } as unknown as JobsServiceDeps[8]),
overrides.jobsQueuedGauge ?? metricsMocks.jobsQueuedGauge,
overrides.jobsRetryScheduledGauge ?? metricsMocks.jobsRetryScheduledGauge,
overrides.oldestQueuedAgeGauge ?? metricsMocks.oldestQueuedAgeGauge,
Expand Down Expand Up @@ -613,8 +621,13 @@ describe("JobsService schedule rows", () => {
// Check upserts for providerB
const upsertCalls = jobScheduleRepositoryMock.upsertSchedule.mock.calls;
const upsertsForB = upsertCalls.filter((call) => call[1] === providerB.address);
expect(upsertsForB).toHaveLength(3);
expect(upsertsForB.map((call) => call[0]).sort()).toEqual(["data_set_creation", "deal", "retrieval"]);
expect(upsertsForB).toHaveLength(4);
expect(upsertsForB.map((call) => call[0]).sort()).toEqual([
"data_set_creation",
"deal",
"piece_cleanup",
"retrieval",
]);
});

it("deletes schedule rows for providers no longer present", async () => {
Expand Down Expand Up @@ -913,6 +926,120 @@ describe("JobsService schedule rows", () => {
);
});

it("deal job skips deal creation when SP is over storage quota", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2024-01-01T12:00:00Z"));

const dealService = {
createDealForProvider: vi.fn(),
getTestingDealOptions: vi.fn(() => ({ enableIpni: false })),
getBaseDataSetMetadata: vi.fn(() => ({})),
checkDataSetExists: vi.fn(async () => false),
};

const walletSdkService = {
getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]),
ensureWalletAllowances: vi.fn(),
loadProviders: vi.fn(),
getProviderInfo: vi.fn(() => ({ id: 1 })),
};

const pieceCleanupService = {
isProviderOverQuota: vi.fn().mockResolvedValue(true),
};

service = buildService({
dealService: dealService as unknown as ConstructorParameters<typeof JobsService>[3],
walletSdkService: walletSdkService as unknown as ConstructorParameters<typeof JobsService>[6],
pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8],
});

await callPrivate(service, "handleDealJob", {
id: "job-over-quota",
data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 },
});

expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa");
expect(dealService.createDealForProvider).not.toHaveBeenCalled();
});

it("deal job proceeds with deal creation when quota check throws (fail-open)", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2024-01-01T12:00:00Z"));

const dealService = {
createDealForProvider: vi.fn(async () => ({})),
getTestingDealOptions: vi.fn(() => ({ enableIpni: false })),
getBaseDataSetMetadata: vi.fn(() => ({})),
checkDataSetExists: vi.fn(async () => false),
};

const walletSdkService = {
getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]),
ensureWalletAllowances: vi.fn(),
loadProviders: vi.fn(),
getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })),
};

const pieceCleanupService = {
isProviderOverQuota: vi.fn().mockRejectedValue(new Error("DB connection failed")),
};

service = buildService({
dealService: dealService as unknown as ConstructorParameters<typeof JobsService>[3],
walletSdkService: walletSdkService as unknown as ConstructorParameters<typeof JobsService>[6],
pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8],
});

await callPrivate(service, "handleDealJob", {
id: "job-quota-error",
data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 },
});

expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa");
// Should proceed despite the quota check failure (fail-open)
expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1);
});

it("deal job uses live provider data for quota (DB/provider drift: live says OK → deal proceeds)", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2024-01-01T12:00:00Z"));

const dealService = {
createDealForProvider: vi.fn(async () => ({})),
getTestingDealOptions: vi.fn(() => ({ enableIpni: false })),
getBaseDataSetMetadata: vi.fn(() => ({})),
checkDataSetExists: vi.fn(async () => false),
};

const walletSdkService = {
getTestingProviders: vi.fn(() => [{ serviceProvider: "0xaaa" }]),
ensureWalletAllowances: vi.fn(),
loadProviders: vi.fn(),
getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })),
};

// Live provider data says NOT over quota (even if DB SUM disagrees)
const pieceCleanupService = {
isProviderOverQuota: vi.fn().mockResolvedValue(false),
};

service = buildService({
dealService: dealService as unknown as ConstructorParameters<typeof JobsService>[3],
walletSdkService: walletSdkService as unknown as ConstructorParameters<typeof JobsService>[6],
pieceCleanupService: pieceCleanupService as unknown as JobsServiceDeps[8],
});

await callPrivate(service, "handleDealJob", {
id: "job-drift-ok",
data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 },
});

expect(pieceCleanupService.isProviderOverQuota).toHaveBeenCalledWith("0xaaa");
// Live data says under quota, so deal creation should proceed
expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1);
});

it("deal job creates deal without metadata when minNumDataSetsForChecks is 1", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2024-01-01T12:00:00Z"));
Expand Down
Loading
Loading