Skip to content

Commit 651047c

Browse files
authored
[HUMAN App] Implement exponential backoff for exchange oracle jobs fetching (#2752)
* Exponential backoff for exchange oracle jobs fetching * Remove max request retries * Removed retriesCount and executionsToSkip from oracles response
1 parent be1435d commit 651047c

File tree

12 files changed

+123
-113
lines changed

12 files changed

+123
-113
lines changed

packages/apps/human-app/server/src/app.module.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ const JOI_BOOLEAN_STRING_SCHEMA = Joi.string().valid('true', 'false');
9090
JOB_ASSIGNMENTS_DATA_RETENTION_DAYS: Joi.number(),
9191
CACHE_TTL_EXCHANGE_ORACLE_URL: Joi.number(),
9292
CACHE_TTL_EXCHANGE_ORACLE_REGISTRATION_NEEDED: Joi.number(),
93-
MAX_REQUEST_RETRIES: Joi.number(),
93+
MAX_EXECUTIONS_TO_SKIP: Joi.number(),
9494
FEATURE_FLAG_JOBS_DISCOVERY: JOI_BOOLEAN_STRING_SCHEMA,
9595
}),
9696
}),

packages/apps/human-app/server/src/common/config/environment-config.service.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const DEFAULT_CORS_ALLOWED_ORIGIN = 'http://localhost:5173';
1111
const DEFAULT_CORS_ALLOWED_HEADERS =
1212
'Content-Type,Authorization,X-Requested-With,Accept,Origin';
1313
const DEFAULT_CACHE_TTL_EXCHANGE_ORACLE_URL = 24 * 60 * 60;
14-
const DEFAULT_MAX_REQUEST_RETRIES = 5;
14+
const DEFAULT_MAX_EXECUTIONS_TO_SKIP = 32;
1515
const DEFAULT_CACHE_TTL_EXCHANGE_ORACLE_REGISTRATION_NEEDED = 24 * 60 * 60;
1616

1717
@Injectable()
@@ -291,13 +291,13 @@ export class EnvironmentConfigService {
291291
}
292292

293293
/**
294-
* The maximum number of retries for requests.
294+
* The maximum number of iteration to skip.
295295
* Default: 5
296296
*/
297-
get maxRequestRetries(): number {
297+
get maxExecutionToSkip(): number {
298298
return this.configService.get<number>(
299-
'MAX_REQUEST_RETRIES',
300-
DEFAULT_MAX_REQUEST_RETRIES,
299+
'MAX_EXECUTIONS_TO_SKIP',
300+
DEFAULT_MAX_EXECUTIONS_TO_SKIP,
301301
);
302302
}
303303

packages/apps/human-app/server/src/common/interceptors/transform-enum.interceptor.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ describe('TransformEnumInterceptor', () => {
8282

8383
// Access the modified request query
8484
const request = executionContext.switchToHttp().getRequest();
85-
console.log(123213, request.query);
85+
8686
// Expectations
8787
expect(request.query.userType).toBe('operator');
8888
expect(request.query).toEqual({

packages/apps/human-app/server/src/main.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
import { NestFactory } from '@nestjs/core';
1+
import { NestFactory, Reflector } from '@nestjs/core';
22
import { AppModule } from './app.module';
33
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
44
import { ConfigService } from '@nestjs/config';
5-
import { Logger, ValidationPipe } from '@nestjs/common';
5+
import {
6+
ClassSerializerInterceptor,
7+
Logger,
8+
ValidationPipe,
9+
} from '@nestjs/common';
610
import { EnvironmentConfigService } from './common/config/environment-config.service';
711
import { GlobalExceptionsFilter } from './common/filter/global-exceptions.filter';
812
import { CACHE_MANAGER } from '@nestjs/cache-manager';
@@ -37,6 +41,7 @@ async function bootstrap() {
3741
await cacheManager.reset();
3842
}
3943
app.useGlobalFilters(new GlobalExceptionsFilter());
44+
app.useGlobalInterceptors(new ClassSerializerInterceptor(app.get(Reflector)));
4045
app.useGlobalPipes(new ValidationPipe({ transform: true }));
4146

4247
await app.listen(port, host, async () => {

packages/apps/human-app/server/src/modules/cron-job/cron-job.service.ts

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ export class CronJobService {
6161
});
6262

6363
for (const oracle of oracles) {
64+
if (oracle.executionsToSkip > 0) {
65+
this.logger.log(
66+
`Skipping execution for oracle: ${oracle.address}. Remaining skips: ${oracle.executionsToSkip}`,
67+
);
68+
69+
await this.updateOracleInCache(oracle, {
70+
executionsToSkip: oracle.executionsToSkip - 1,
71+
});
72+
continue;
73+
}
74+
6475
await this.updateJobsListCache(
6576
oracle,
6677
'Bearer ' + response.access_token,
@@ -109,9 +120,11 @@ export class CronJobService {
109120
allResults = this.mergeJobs(allResults, response.results);
110121
}
111122

112-
await this.resetRetriesCount(oracle);
123+
await this.updateOracleInCache(oracle, {
124+
retriesCount: 0,
125+
executionsToSkip: 0,
126+
});
113127

114-
// Cache the job results (original behavior)
115128
await this.cacheManager.set(
116129
`${JOB_DISCOVERY_CACHE_KEY}:${oracle.address}`,
117130
allResults,
@@ -122,16 +135,18 @@ export class CronJobService {
122135
}
123136
}
124137

125-
private async resetRetriesCount(oracleData: OracleDiscoveryResponse) {
126-
oracleData.retriesCount = 0;
127-
138+
private async updateOracleInCache(
139+
oracleData: OracleDiscoveryResponse,
140+
updates: Partial<OracleDiscoveryResponse>,
141+
) {
142+
const updatedOracle = { ...oracleData, ...updates };
128143
const chainId = oracleData.chainId;
129144
const cachedOracles =
130145
await this.cacheManager.get<OracleDiscoveryResponse[]>(chainId);
131146

132147
if (cachedOracles) {
133148
const updatedOracles = cachedOracles.map((oracle) =>
134-
oracle.address === oracleData.address ? oracleData : oracle,
149+
oracle.address === oracleData.address ? updatedOracle : oracle,
135150
);
136151
await this.cacheManager.set(
137152
chainId,
@@ -142,29 +157,16 @@ export class CronJobService {
142157
}
143158

144159
private async handleJobListError(oracleData: OracleDiscoveryResponse) {
145-
const chainId = oracleData.chainId;
146-
const cachedOracles =
147-
await this.cacheManager.get<OracleDiscoveryResponse[]>(chainId);
148-
149-
if (cachedOracles) {
150-
const cachedOracle = cachedOracles.find(
151-
(oracle) => oracle.address === oracleData.address,
152-
);
153-
154-
if (cachedOracle) {
155-
cachedOracle.retriesCount = (cachedOracle.retriesCount || 0) + 1;
156-
157-
const updatedOracles = cachedOracles.map((oracle) =>
158-
oracle.address === cachedOracle.address ? cachedOracle : oracle,
159-
);
160+
const retriesCount = oracleData.retriesCount || 0;
161+
const newExecutionsToSkip = Math.min(
162+
(oracleData.executionsToSkip || 0) + Math.pow(2, retriesCount),
163+
this.configService.maxExecutionToSkip,
164+
);
160165

161-
await this.cacheManager.set(
162-
chainId,
163-
updatedOracles,
164-
this.configService.cacheTtlOracleDiscovery,
165-
);
166-
}
167-
}
166+
await this.updateOracleInCache(oracleData, {
167+
retriesCount: retriesCount + 1,
168+
executionsToSkip: newExecutionsToSkip,
169+
});
168170
}
169171

170172
private mergeJobs(

packages/apps/human-app/server/src/modules/cron-job/spec/cron-job.service.spec.ts

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ describe('CronJobService', () => {
5454
cacheTtlOracleDiscovery: 600,
5555
chainIdsEnabled: ['137', '1'],
5656
jobsDiscoveryFlag: false,
57+
maxExecutionToSkip: 32,
5758
};
5859

5960
const module: TestingModule = await Test.createTestingModule({
@@ -168,6 +169,7 @@ describe('CronJobService', () => {
168169
role: 'validator',
169170
chainId: '137',
170171
retriesCount: 0,
172+
executionsToSkip: 0,
171173
};
172174
const token = 'Bearer token';
173175
const initialResponse = {
@@ -195,6 +197,7 @@ describe('CronJobService', () => {
195197
role: 'validator',
196198
chainId: '137',
197199
retriesCount: 0,
200+
executionsToSkip: 0,
198201
};
199202
const token = 'Bearer token';
200203
const error = new Error('Test error');
@@ -220,6 +223,7 @@ describe('CronJobService', () => {
220223
role: 'validator',
221224
chainId: '137',
222225
retriesCount: 3,
226+
executionsToSkip: 0,
223227
};
224228
const token = 'Bearer token';
225229
const initialResponse = {
@@ -230,14 +234,17 @@ describe('CronJobService', () => {
230234
initialResponse,
231235
);
232236

233-
const resetRetriesCountSpy = jest.spyOn(
237+
const updateOracleInCacheSpy = jest.spyOn(
234238
service as any,
235-
'resetRetriesCount',
239+
'updateOracleInCache',
236240
);
237241

238242
await service.updateJobsListCache(oracle, token);
239243

240-
expect(resetRetriesCountSpy).toHaveBeenCalledWith(oracle);
244+
expect(updateOracleInCacheSpy).toHaveBeenCalledWith(oracle, {
245+
retriesCount: 0,
246+
executionsToSkip: 0,
247+
});
241248
});
242249
});
243250

@@ -309,71 +316,74 @@ describe('CronJobService', () => {
309316
});
310317
});
311318

312-
describe('resetRetriesCount', () => {
313-
it('should reset retries count and activate oracle', async () => {
319+
describe('updateOracleInCache', () => {
320+
it('should update oracle in cache', async () => {
314321
const oracleData: OracleDiscoveryResponse = {
315322
address: 'mockAddress1',
316323
role: 'validator',
317324
chainId: '137',
318325
retriesCount: 5,
326+
executionsToSkip: 2,
319327
};
320328

321329
cacheManagerMock.get.mockResolvedValue([oracleData]);
322330

323-
await (service as any).resetRetriesCount(oracleData);
331+
await (service as any).updateOracleInCache(oracleData, {
332+
retriesCount: 0,
333+
executionsToSkip: 0,
334+
});
324335

325-
expect(oracleData.retriesCount).toBe(0);
326336
expect(cacheManagerMock.set).toHaveBeenCalledWith(
327337
oracleData.chainId,
328-
[oracleData],
338+
[{ ...oracleData, retriesCount: 0, executionsToSkip: 0 }],
329339
configServiceMock.cacheTtlOracleDiscovery,
330340
);
331341
});
332342
});
333343

334344
describe('handleJobListError', () => {
335-
it('should increment retries count and deactivate oracle after 5 failures', async () => {
345+
it('should increment retries count and executions to skip but not exceed the limit', async () => {
336346
const oracleData: OracleDiscoveryResponse = {
337347
address: 'mockAddress1',
338348
role: 'validator',
339349
chainId: '137',
340-
retriesCount: 4,
350+
retriesCount: 6,
351+
executionsToSkip: 0,
341352
};
342353

343354
cacheManagerMock.get.mockResolvedValue([oracleData]);
344355

345356
await (service as any).handleJobListError(oracleData);
346357

347-
expect(oracleData.retriesCount).toBe(5);
348358
expect(cacheManagerMock.set).toHaveBeenCalledWith(
349359
oracleData.chainId,
350-
[oracleData],
360+
[{ ...oracleData, retriesCount: 7, executionsToSkip: 32 }],
351361
configServiceMock.cacheTtlOracleDiscovery,
352362
);
353363
});
354364

355-
it('should increment retries count but keep oracle active if less than 5 failures', async () => {
365+
it('should increment retries count and executions to skip', async () => {
356366
const oracleData: OracleDiscoveryResponse = {
357367
address: 'mockAddress1',
358368
role: 'validator',
359369
chainId: '137',
360370
retriesCount: 2,
371+
executionsToSkip: 0,
361372
};
362373

363374
cacheManagerMock.get.mockResolvedValue([oracleData]);
364375

365376
await (service as any).handleJobListError(oracleData);
366377

367-
expect(oracleData.retriesCount).toBe(3);
368378
expect(cacheManagerMock.set).toHaveBeenCalledWith(
369379
oracleData.chainId,
370-
[oracleData],
380+
[{ ...oracleData, retriesCount: 3, executionsToSkip: 4 }],
371381
configServiceMock.cacheTtlOracleDiscovery,
372382
);
373383
});
374384

375-
it('should do nothing if oracle is not found in cache', async () => {
376-
cacheManagerMock.get.mockResolvedValue([]);
385+
it('should do nothing if chainId is not found in cache', async () => {
386+
cacheManagerMock.get.mockResolvedValue(undefined);
377387

378388
await (service as any).handleJobListError('unknownAddress');
379389

packages/apps/human-app/server/src/modules/jobs-discovery/jobs-discovery.controller.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ import {
55
HttpStatus,
66
Query,
77
} from '@nestjs/common';
8-
import { ApiBearerAuth, ApiOperation, ApiTags } from '@nestjs/swagger';
8+
import {
9+
ApiBearerAuth,
10+
ApiOkResponse,
11+
ApiOperation,
12+
ApiTags,
13+
} from '@nestjs/swagger';
914
import { InjectMapper } from '@automapper/nestjs';
1015
import { Mapper } from '@automapper/core';
1116
import { JobsDiscoveryService } from './jobs-discovery.service';
@@ -22,19 +27,20 @@ import { JwtUserData } from '../../common/utils/jwt-token.model';
2227
import { EnvironmentConfigService } from '../../common/config/environment-config.service';
2328

2429
@Controller()
30+
@ApiTags('Jobs-Discovery')
2531
export class JobsDiscoveryController {
2632
constructor(
2733
private readonly service: JobsDiscoveryService,
2834
private readonly environmentConfigService: EnvironmentConfigService,
2935
@InjectMapper() private readonly mapper: Mapper,
3036
) {}
3137

32-
@ApiTags('Jobs-Discovery')
3338
@Get('/jobs')
3439
@ApiBearerAuth()
3540
@ApiOperation({
3641
summary: 'Retrieve a list of jobs for given Exchange Oracle',
3742
})
43+
@ApiOkResponse({ type: JobsDiscoveryResponse, description: 'List of jobs' })
3844
public async getJobs(
3945
@Query() jobsDiscoveryParamsDto: JobsDiscoveryParamsDto,
4046
@JwtPayload() jwtPayload: JwtUserData,

packages/apps/human-app/server/src/modules/oracle-discovery/model/oracle-discovery.model.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,42 @@
11
import { IOperator } from '@human-protocol/sdk';
22
import { AutoMap } from '@automapper/classes';
3-
import { ApiPropertyOptional } from '@nestjs/swagger';
3+
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
44
import { IsArray, IsOptional } from 'class-validator';
5-
import { Transform } from 'class-transformer';
5+
import { Exclude, Transform } from 'class-transformer';
66

77
export class OracleDiscoveryResponse implements IOperator {
8+
@ApiProperty({ description: 'Address of the oracle operator' })
89
address: string;
10+
11+
@ApiProperty({ description: 'Chain ID where the oracle is registered' })
912
chainId: string;
13+
14+
@ApiPropertyOptional({ description: 'Role of the oracle operator' })
1015
role?: string;
16+
17+
@ApiPropertyOptional({ description: 'URL of the oracle operator' })
1118
url?: string;
19+
20+
@ApiPropertyOptional({
21+
type: [String],
22+
description: 'Types of jobs the oracle supports',
23+
})
1224
jobTypes?: string[];
25+
26+
@ApiPropertyOptional({ description: 'Indicates if registration is needed' })
1327
registrationNeeded?: boolean;
28+
29+
@ApiPropertyOptional({
30+
description: 'Instructions for registration, if needed',
31+
})
1432
registrationInstructions?: string;
33+
34+
@Exclude()
1535
retriesCount = 0;
36+
37+
@Exclude()
38+
executionsToSkip = 0;
39+
1640
constructor(
1741
address: string,
1842
chainId: string,

0 commit comments

Comments
 (0)