Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/aggregator/.env.example
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Server Configuration
PORT=3001

# Ingestor Services (for receiving raw data)
# INGESTOR_URL=http://localhost:3000
# Ingestor Service Configuration
INGESTOR_WS_URL=ws://localhost:3000
INGESTOR_HTTP_URL=http://localhost:3000

# Signer Service (for publishing aggregated data)
# SIGNER_URL=http://localhost:3002
SIGNER_URL=http://localhost:3002
9 changes: 9 additions & 0 deletions apps/aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ Confidence score (0-100) is calculated based on:
- 5 sources, 1% spread, low deviation β†’ ~90% confidence
- 3 sources, 10% spread, high deviation β†’ ~30% confidence

## Features

### Data Reception Layer
Implemented via `DataReceptionService`, this layer connects to Ingestor services to receive real-time and historical data.
- **WebSocket Client**: Real-time price streaming with exponential backoff reconnection.
- **HTTP Fallback**: Retrieval of historical data and latest price snapshots.
- **Event-Driven**: Emits `price.received` events using `EventEmitter2`.
- **Validation**: Schema-based validation using `class-validator`.

## Getting Started

### Prerequisites
Expand Down
11 changes: 10 additions & 1 deletion apps/aggregator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@
"test:cov": "jest --coverage"
},
"dependencies": {
"@nestjs/axios": "^4.0.1",
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^4.0.2",
"@nestjs/core": "^10.0.0",
"@nestjs/event-emitter": "^3.0.1",
"@nestjs/platform-express": "^10.0.0",
"axios": "^1.13.4",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.3",
"reflect-metadata": "^0.1.13",
"rxjs": "^7.8.1"
"rxjs": "^7.8.1",
"ws": "^8.19.0"
},
"devDependencies": {
"@nestjs/cli": "^10.0.0",
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/express": "^4.17.17",
"@types/jest": "^30.0.0",
"@types/node": "^20.3.1",
"@types/ws": "^8.18.1",
"@typescript-eslint/eslint-plugin": "^6.0.0",
"@typescript-eslint/parser": "^6.0.0",
"eslint": "^8.42.0",
Expand Down
15 changes: 13 additions & 2 deletions apps/aggregator/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { HttpModule } from '@nestjs/axios';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { DataReceptionService } from './services/data-reception.service';
import { AggregationService } from './services/aggregation.service';
import { WeightedAverageAggregator } from './strategies/aggregators/weighted-average.aggregator';
import { MedianAggregator } from './strategies/aggregators/median.aggregator';
import { TrimmedMeanAggregator } from './strategies/aggregators/trimmed-mean.aggregator';

@Module({
imports: [],
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
HttpModule,
EventEmitterModule.forRoot(),
],
controllers: [],
providers: [
DataReceptionService,
AggregationService,
WeightedAverageAggregator,
MedianAggregator,
TrimmedMeanAggregator,
],
exports: [AggregationService],
})
export class AppModule {}
export class AppModule { }
16 changes: 16 additions & 0 deletions apps/aggregator/src/dto/price-input.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { IsString, IsNumber, IsPositive, IsISO8601 } from 'class-validator';

export class PriceInputDto {
@IsString()
symbol: string;

@IsNumber()
@IsPositive()
price: number;

@IsString()
source: string;

@IsISO8601()
timestamp: string;
}
197 changes: 197 additions & 0 deletions apps/aggregator/src/services/data-reception.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { HttpService } from '@nestjs/axios';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { DataReceptionService } from './data-reception.service';
import { of, throwError } from 'rxjs';
import { WebSocket } from 'ws';

jest.mock('ws');

describe('DataReceptionService', () => {
let service: DataReceptionService;
let httpService: HttpService;
let eventEmitter: EventEmitter2;

const mockConfigService = {
get: jest.fn((key: string): any => {
if (key === 'INGESTOR_WS_URL') return 'ws://localhost:3000';
if (key === 'INGESTOR_HTTP_URL') return 'http://localhost:3000';
return null;
}),
};

const mockHttpService = {
get: jest.fn(),
};

const mockEventEmitter = {
emit: jest.fn(),
};

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
DataReceptionService,
{ provide: ConfigService, useValue: mockConfigService },
{ provide: HttpService, useValue: mockHttpService },
{ provide: EventEmitter2, useValue: mockEventEmitter },
],
}).compile();

service = module.get<DataReceptionService>(DataReceptionService);
httpService = module.get<HttpService>(HttpService);
eventEmitter = module.get<EventEmitter2>(EventEmitter2);
});

afterEach(() => {
jest.clearAllMocks();
service.onModuleDestroy();
});

it('should be defined', () => {
expect(service).toBeDefined();
});

describe('WebSocket Connection', () => {
it('should initialize WebSocket connection on init', () => {
service.onModuleInit();
expect(WebSocket).toHaveBeenCalledWith('ws://localhost:3000');
});

it('should log error if INGESTOR_WS_URL is missing', () => {
mockConfigService.get.mockReturnValueOnce(null);
const loggerSpy = jest.spyOn((service as any).logger, 'error');
service.onModuleInit();
expect(loggerSpy).toHaveBeenCalledWith('INGESTOR_WS_URL is not defined in the configuration');
});
});

describe('fetchHistoricalData', () => {
it('should fetch historical data successfully', async () => {
const mockData = [{ symbol: 'BTC', price: 50000, source: 'base', timestamp: '2026-01-27T13:00:00Z' }];
mockHttpService.get.mockReturnValue(of({ data: mockData }));

const result = await service.fetchHistoricalData('BTC');
expect(result[0].symbol).toBe('BTC');
expect(httpService.get).toHaveBeenCalledWith('http://localhost:3000/prices/historical/BTC');
});

it('should return empty array if response is not an array', async () => {
mockHttpService.get.mockReturnValue(of({ data: null }));
const result = await service.fetchHistoricalData('BTC');
expect(result).toEqual([]);
});

it('should throw error if INGESTOR_HTTP_URL is missing', async () => {
mockConfigService.get.mockImplementation((key) => key === 'INGESTOR_HTTP_URL' ? null : 'value');
await expect(service.fetchHistoricalData('BTC')).rejects.toThrow('INGESTOR_HTTP_URL is not defined');
});

it('should throw error if fetch fails', async () => {
mockConfigService.get.mockReturnValue('http://localhost:3000');
mockHttpService.get.mockReturnValue(throwError(() => new Error('HTTP Error')));
await expect(service.fetchHistoricalData('BTC')).rejects.toThrow('HTTP Error');
});
});

describe('getLatestSnapshot', () => {
it('should fetch latest snapshot successfully', async () => {
const mockData = { symbol: 'BTC', price: 50000, source: 'base', timestamp: '2026-01-27T13:00:00Z' };
mockHttpService.get.mockReturnValue(of({ data: mockData }));

const result = await service.getLatestSnapshot('BTC');
expect(result.symbol).toBe('BTC');
expect(httpService.get).toHaveBeenCalledWith('http://localhost:3000/prices/latest/BTC');
});

it('should throw error if snapshot fetch fails', async () => {
mockHttpService.get.mockReturnValue(throwError(() => new Error('Snapshot Error')));
await expect(service.getLatestSnapshot('BTC')).rejects.toThrow('Snapshot Error');
});
});

describe('WebSocket reconnection', () => {
let wsInstance: any;

beforeEach(() => {
jest.useFakeTimers();
service.onModuleInit();
wsInstance = (WebSocket as any).mock.instances[0];
});

afterEach(() => {
jest.useRealTimers();
});

it('should attempt reconnection on close', () => {
const connectSpy = jest.spyOn(service as any, 'connectWebSocket');

// Get the close handler
const closeHandler = wsInstance.on.mock.calls.find((call: any) => call[0] === 'close')[1];
closeHandler();

expect((service as any).reconnectAttempts).toBe(1);

// Fast-forward time for backoff
jest.advanceTimersByTime(2000);

expect(connectSpy).toHaveBeenCalledTimes(1);
});

it('should stop reconnecting after max attempts', () => {
(service as any).reconnectAttempts = 5;
const loggerSpy = jest.spyOn((service as any).logger, 'error');

(service as any).handleReconnection();

expect(loggerSpy).toHaveBeenCalledWith('Max reconnection attempts reached or service stopping.');
});
});

describe('process incoming messages', () => {
let wsInstance: any;

beforeEach(() => {
service.onModuleInit();
wsInstance = (WebSocket as any).mock.instances[0];
});

it('should validate and emit event for valid price data', async () => {
const validPayload = {
symbol: 'ETH',
price: 2500,
source: 'binance',
timestamp: '2026-01-27T13:00:00Z',
};

const messageHandler = wsInstance.on.mock.calls.find((call: any) => call[0] === 'message')[1];
await messageHandler(JSON.stringify(validPayload));

expect(eventEmitter.emit).toHaveBeenCalledWith('price.received', expect.any(Object));
});

it('should log error for invalid JSON', async () => {
const loggerSpy = jest.spyOn((service as any).logger, 'error');
const messageHandler = wsInstance.on.mock.calls.find((call: any) => call[0] === 'message')[1];

await messageHandler('invalid-json');

expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining('Error processing message'));
});

it('should log error for invalid price data', async () => {
const invalidPayload = {
symbol: 'ETH',
price: -100,
};

const loggerSpy = jest.spyOn((service as any).logger, 'error');
const messageHandler = wsInstance.on.mock.calls.find((call: any) => call[0] === 'message')[1];
await messageHandler(JSON.stringify(invalidPayload));

expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining('Validation failed'));
expect(eventEmitter.emit).not.toHaveBeenCalled();
});
});
});
Loading