A real-time cryptocurrency token data aggregation service that fetches meme coin data from multiple DEX (Decentralized Exchange) APIs, implements efficient caching with Redis, and provides live updates via WebSockets.
Deployed here- https://dexflow-realtime.onrender.com
- β Multi-Source Aggregation: Fetches data from DexScreener and Jupiter APIs
- β Smart Caching: Redis-based caching with configurable TTL (30s default)
- β Real-time Updates: WebSocket support for live price and volume updates
- β Rate Limiting: Intelligent rate limiting with exponential backoff
- β Token Deduplication: Merges duplicate tokens across different DEXs
- β Advanced Filtering: Filter by time period, volume, and search
- β Sorting Options: Sort by volume, price change, market cap, or liquidity
- β Cursor Pagination: Efficient pagination for large token lists
- β Error Handling: Comprehensive error handling and recovery
- Runtime: Node.js with TypeScript
- Web Framework: Express.js
- WebSocket: Socket.io
- Cache: Redis (ioredis client)
- HTTP Client: Axios
- Task Scheduling: node-cron
- Testing: Jest + Supertest
src/
βββ api/ # External API clients
β βββ dexscreener.api.ts # DexScreener API wrapper
β βββ jupiter.api.ts # Jupiter API wrapper
βββ config/ # Configuration files
β βββ index.ts # App configuration
βββ controllers/ # Route controllers
β βββ token.controller.ts # Token endpoints logic
β βββ routes.ts # API routes
βββ middleware/ # Express middleware
β βββ errorHandler.ts # Error handling middleware
βββ services/ # Business logic
β βββ aggregator.service.ts # Token aggregation
β βββ cache.service.ts # Redis caching
β βββ websocket.service.ts # WebSocket real-time updates
βββ types/ # TypeScript type definitions
β βββ index.ts
βββ utils/ # Utility functions
β βββ logger.ts # Logging utility
β βββ rateLimiter.ts # Rate limiting utility
βββ server.ts # Main application entry point
- Node.js >= 18.x
- Redis >= 6.x
- npm or yarn
-
Clone the repository
git clone https://github.com/raunaksarawgi/dexflow_realtime.git cd dexflow_realtime -
Install dependencies
npm install
-
Set up environment variables
cp .env.example .env
Edit
.envwith your configuration:NODE_ENV=development PORT=3000 REDIS_URL=redis://localhost:6379 CACHE_TTL=30
-
Start Redis (if not already running)
# Using Docker docker run -d -p 6379:6379 redis # Or locally installed Redis redis-server
-
Build the project
npm run build
-
Start the server
# Development mode (with hot reload) npm run dev # Production mode npm start
The server will start on http://localhost:3000
A modern black-themed WebSocket client:
Features:
- Real-time token price updates
- Live volume spike notifications
- Advanced sorting and filtering
- Modern dark UI with smooth animations
- Lightweight and responsive
http://localhost:3000/api
GET /api/tokensQuery Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
limit |
number | 30 | Page size (1-100) |
cursor |
string | - | Pagination cursor |
sortBy |
string | volume | Sort field: volume, price_change, market_cap, liquidity |
order |
string | desc | Sort order: asc, desc |
period |
string | 24h | Time period: 1h, 24h, 7d |
Example Request:
curl "http://localhost:3000/api/tokens?limit=10&sortBy=volume&order=desc"Example Response:
{
"success": true,
"data": {
"data": [
{
"token_address": "576P1t7XsRL4ZVj38LV2eYWxXRPguBADA8BxcNz1xo8y",
"token_name": "PIPE CTO",
"token_ticker": "PIPE",
"price_sol": 4.4141209798877615e-7,
"market_cap_sol": 441.41,
"volume_sol": 1322.43,
"liquidity_sol": 149.36,
"transaction_count": 2205,
"price_24hr_change": 120.61,
"protocol": "Raydium CLMM"
}
],
"pagination": {
"nextCursor": "10",
"total": 50,
"limit": 10
}
},
"timestamp": 1700000000000
}GET /api/tokens/:addressPath Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
address |
string | Yes | Token address (32+ characters) |
Example Request:
curl "http://localhost:3000/api/tokens/576P1t7XsRL4ZVj38LV2eYWxXRPguBADA8BxcNz1xo8y"Example Response:
{
"success": true,
"data": {
"token_address": "576P1t7XsRL4ZVj38LV2eYWxXRPguBADA8BxcNz1xo8y",
"token_name": "PIPE CTO",
"token_ticker": "PIPE",
"price_sol": 4.4141209798877615e-7,
"market_cap_sol": 441.41,
"volume_sol": 1322.43,
"liquidity_sol": 149.36,
"transaction_count": 2205,
"price_24hr_change": 120.61,
"protocol": "Raydium CLMM"
},
"timestamp": 1700000000000
}GET /api/search?q={query}Query Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
q |
string | Yes | Search query (token name or ticker) |
Example Request:
curl "http://localhost:3000/api/search?q=PIPE"Example Response:
{
"success": true,
"data": [
{
"token_address": "576P1t7XsRL4ZVj38LV2eYWxXRPguBADA8BxcNz1xo8y",
"token_name": "PIPE CTO",
"token_ticker": "PIPE",
"price_sol": 4.4141209798877615e-7,
"volume_sol": 1322.43,
"liquidity_sol": 149.36
}
],
"timestamp": 1700000000000
}GET /api/healthDescription: Returns server health status, uptime, and memory usage.
Example Request:
curl "http://localhost:3000/api/health"Example Response:
{
"success": true,
"data": {
"status": "healthy",
"timestamp": 1700000000000,
"uptime": 12345.67,
"memory": {
"rss": 50331648,
"heapTotal": 20971520,
"heapUsed": 15728640,
"external": 1048576
}
}
}const socket = io('http://localhost:3000');
socket.on('connect', () => {
console.log('β
Connected to WebSocket server!');
});
socket.on('disconnect', () => {
console.log('β Disconnected from server');
});Sent immediately after connection with top tokens.
socket.on('initial_data', (event) => {
console.log('π¦ Initial tokens:', event.data);
// event = { type: 'initial_data', data: Array<Token>, timestamp: number }
});Sent when any token data changes (price, volume, liquidity, etc.).
socket.on('tokens_updated', (event) => {
console.log('π Updated tokens:', event.data);
// event = { type: 'tokens_updated', data: Array<Token>, timestamp: number }
});Sent when token prices change.
socket.on('price_update', (event) => {
console.log('π° Price changes:', event.data);
// event = {
// type: 'price_update',
// data: Array<{
// token_address: string,
// old_price: number,
// new_price: number,
// change_percent: number
// }>,
// timestamp: number
// }
});Sent when volume increases significantly (>20%).
socket.on('volume_spike', (event) => {
console.log('π Volume spikes:', event.data);
// event = {
// type: 'volume_spike',
// data: Array<{
// token_address: string,
// old_volume: number,
// new_volume: number,
// spike_percent: number
// }>,
// timestamp: number
// }
});Sent when a new token is discovered.
socket.on('new_token', (event) => {
console.log('π New token:', event.data);
// event = { type: 'new_token', data: Token, timestamp: number }
});Monitor specific tokens by subscribing to their addresses.
socket.emit('subscribe', ['token_address_1', 'token_address_2']);
socket.on('subscribed', (data) => {
console.log('β
Subscribed to:', data.tokens);
});Stop monitoring specific tokens.
socket.emit('unsubscribe', ['token_address_1']);
socket.on('unsubscribed', (data) => {
console.log('β Unsubscribed from:', data.tokens);
});# Run all tests
npm test
# Run with coverage report
npm run test:coverage
# Run in watch mode (auto-rerun on changes)
npm run test:watch
# Run linter
npm run lint
# Fix linting issues automatically
npm run lint:fixTest coverage includes:
- β API endpoint tests
- β Service layer tests
- β Cache functionality tests
- β Error handling tests
A comprehensive Postman collection (postman_collection.json) is included with 19 requests:
- β
Successful requests return
200status codes with data - β
Error test cases return
400/404status codes with error messages (expected behavior) - β Includes custom token address testing for flexible validation
Import Instructions:
- Open Postman β Click "Import" β Upload
postman_collection.json - Set
baseUrlvariable tohttps://dexflow-realtime.onrender.com(orhttp://localhost:3000for local) - Run requests to test API functionality
To demonstrate API performance with rapid calls:
# Make sure server is running first
npm run dev
# In another terminal, run performance test
node performance-test.jsThis will make 10 rapid API calls and show:
- β Individual response times for each endpoint
- β Average/min/max response times
- β Cache performance improvements
- β Success rate
Expected results:
- First call (cache miss): ~100-300ms
- Cached calls: ~10-50ms (90%+ faster)
- Average response time: <100ms
- Dual-layer caching: API responses (configurable TTL, default 30s) and aggregated results (configurable TTL, default 30s)
- Cache key pattern: Organized by source and type (e.g.,
dexscreener:search:SOL,aggregated:popular) - Redis-backed: Fast in-memory caching with automatic expiration
- Graceful degradation: System continues working if Redis is unavailable (cache misses fallback to API)
- Exponential backoff: Prevents API rate limit violations with intelligent retry logic
- Per-API limiters: Separate rate limiters for DexScreener (300/min) and Jupiter (600/min)
- Request queuing: Queues requests when approaching limits to avoid "429 errors"
- Cache-aligned polling: WebSocket updates sync with cache refresh cycle (every 10-30s configurable)
- Change detection: Compares current vs previous data to identify meaningful changes
- Smart broadcasting: Only emits events for significant changes (price changes, volume spikes >20%)
- Address-based matching: Primary key is token address (case-insensitive)
- Multi-source merging: Combines data from DexScreener and Jupiter, preferring most complete info
- Conflict resolution: Uses latest/most reliable data when sources conflict
- Graceful failures: API failures don't crash the server
- Promise.allSettled: Parallel API calls continue even if one fails
- Consistent error format: All errors follow standard
{ success, error: { code, message } }structure - Error logging: Comprehensive error tracking with context
- Parallel API calls: Multiple APIs fetched simultaneously using
Promise.allSettled - Connection pooling: Reuses HTTP connections via axios keep-alive
- Cursor pagination: Memory-efficient pagination that doesn't re-scan entire dataset
- Selective updates: Only broadcasts tokens that actually changed
| Variable | Description | Default |
|---|---|---|
NODE_ENV |
Environment mode | development |
PORT |
Server port | 3000 |
REDIS_URL |
Redis connection URL | redis://localhost:6379 |
CACHE_TTL |
Cache TTL in seconds | 30 |
API_CACHE_TTL |
API cache TTL in seconds | 30 |
RATE_LIMIT_PER_MINUTE |
Max requests per minute | 300 |
WS_UPDATE_INTERVAL |
WebSocket update interval (ms) | 30000 |
CORS_ORIGIN |
CORS origin | * |
# Clean build
rm -rf dist/
npm run build