diff --git a/README.md b/README.md index 9bb4244..2273d05 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,7 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}' | `WALLET_SECRET` | Yes | - | Either `privateKey:` or `mnemonic:<12/24 words>` | | `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API | | `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) | +| `PRICE_FETCHING_METHOD` | No | polling | `polling` or `sse` | | `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) | | `GAS_PRICE` | No | `0.025uakt` | Gas price | | `DENOM` | No | `uakt` | Token denomination | diff --git a/package-lock.json b/package-lock.json index c7550fe..2093e7b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "@opentelemetry/resources": "^2.5.0", "@opentelemetry/sdk-node": "^0.211.0", "commander": "^12.0.0", + "fetch-event-stream": "^0.1.6", "zod": "^4.3.6" }, "bin": { @@ -3338,6 +3339,12 @@ } } }, + "node_modules/fetch-event-stream": { + "version": "0.1.6", + "resolved": "https://registry.npmjs.org/fetch-event-stream/-/fetch-event-stream-0.1.6.tgz", + "integrity": "sha512-GREtJ5HNikdU2AXtZ6E/5bk+aslMU6ie5mPG6H9nvsdDkkHQ6m5lHwmmmDTOBexok9hApQ7EprsXCdmz9ZC68w==", + "license": "MIT" + }, "node_modules/file-entry-cache": { "version": "8.0.0", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-8.0.0.tgz", diff --git a/package.json b/package.json index c19a05a..084a3d4 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "@opentelemetry/resources": "^2.5.0", "@opentelemetry/sdk-node": "^0.211.0", "commander": "^12.0.0", + "fetch-event-stream": "^0.1.6", "zod": "^4.3.6" }, "devDependencies": { diff --git a/src/cli-commands/command-config.ts b/src/cli-commands/command-config.ts index 68633f6..ad8eec7 100644 --- a/src/cli-commands/command-config.ts +++ b/src/cli-commands/command-config.ts @@ -2,7 +2,8 @@ import { z } from "zod"; import { HermesClient, type HermesConfig } from "../hermes-client.ts"; import { validateContractAddress, validateWalletSecret } from "../validation.ts"; import type { PriceProducerFactoryOptions } from "../types.ts"; -import { pollPriceStream } from "../price-stream/polling-price-stream.ts"; +import { pollPriceStream } from "../price-stream/polling-price-stream/polling-price-stream.ts"; +import { priceSSEStream } from "../price-stream/price-sse-stream/price-sse-stream.ts"; export interface CommandConfig extends HermesConfig { createHermesClient: (config: HermesConfig) => Promise; @@ -41,6 +42,7 @@ const configSchema = z.object({ }); } }).optional(), + PRICE_FETCHING_METHOD: z.enum(["polling", "sse"]).default("polling"), UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000), GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"), @@ -71,6 +73,13 @@ export function parseConfig(config: Record): ParseCo priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE, smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS, priceProducerFactory(options: PriceProducerFactoryOptions) { + if (result.data.PRICE_FETCHING_METHOD === "sse") { + return priceSSEStream({ + ...options, + unsafeAllowInsecureEndpoints, + baseUrl: result.data.HERMES_ENDPOINT, + }); + } return pollPriceStream({ ...options, unsafeAllowInsecureEndpoints, diff --git a/src/hermes-client.test.ts b/src/hermes-client.test.ts index 9f437f3..2b487ea 100644 --- a/src/hermes-client.test.ts +++ b/src/hermes-client.test.ts @@ -128,6 +128,7 @@ describe("SEC-08: Sensitive data in config exposure", () => { contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", priceFeedId: "test-feed-id", address: expect.stringMatching(/^akash1[0-9a-z]{38}$/), + lastPriceUpdateReceivedAt: undefined, }); expect(JSON.stringify(status)).not.toContain("abandon"); }); @@ -786,6 +787,45 @@ describe(HermesClient.name, () => { } }); + it("sets lastPriceUpdateReceivedAt in ISO-8601 format after receiving a price update", async () => { + const priceUpdate = buildPriceFeed("123.45", -8, 1234567890); + const factory = blockingFactory(priceUpdate); + const { client, stargateClient } = setup({ priceProducerFactory: factory }); + const ac = new AbortController(); + + stargateClient.queryContractSmart + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) + .mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 }); + stargateClient.execute.mockResolvedValueOnce({ + transactionHash: "TX_TS", + gasUsed: 500000n, + gasWanted: 600000n, + height: 100, + events: [], + logs: [], + }); + + const startPromise = client.start({ signal: ac.signal }); + await vi.waitFor(async () => { + const status = await client.getStatus(); + expect(status.lastPriceUpdateReceivedAt).toBeDefined(); + }); + + const status = await client.getStatus(); + expect(status.lastPriceUpdateReceivedAt).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z$/); + + ac.abort(); + await startPromise; + }); + + it("lastPriceUpdateReceivedAt is undefined before any price update is received", async () => { + const { client } = setup(); + await client.initialize(); + + const status = await client.getStatus(); + expect(status.lastPriceUpdateReceivedAt).toBeUndefined(); + }); + it("processes latest price update from stream when updates arrive faster than consumption", async () => { const priceUpdate1 = buildPriceFeed("10000", -2, 2000); const priceUpdate2 = buildPriceFeed("10100", -2, 3000); diff --git a/src/hermes-client.ts b/src/hermes-client.ts index 81cbd66..37dea3b 100644 --- a/src/hermes-client.ts +++ b/src/hermes-client.ts @@ -164,6 +164,7 @@ export class HermesClient { #senderAddress?: string; readonly #config: Required>; #isRunning = false; + #lastPriceUpdateReceivedAt?: string; #logger: Exclude; #connectWithSigner: typeof SigningCosmWasmClient.connectWithSigner; #smartContractConfig: { @@ -528,6 +529,18 @@ export class HermesClient { const consumePrices = async () => { for await (const priceUpdate of priceStream) { priceUpdates.set(priceUpdate); + + const price = priceUpdate.priceData.price; + this.#logger?.log( + `Received price from Hermes: ${price.price} (expo: ${price.expo})`, + ); + this.#logger?.log( + ` Confidence: ${price.conf}, Publish time: ${price.publish_time}`, + ); + this.#logger?.log( + ` VAA size: ${priceUpdate.vaa.length} bytes (base64)`, + ); + this.#lastPriceUpdateReceivedAt = new Date().toISOString(); } controller.abort(); }; @@ -558,6 +571,7 @@ export class HermesClient { address?: string; priceFeedId?: string; contractAddress: string; + lastPriceUpdateReceivedAt?: string; }> { // SEC-08: Only return non-sensitive operational status fields. // Never include mnemonic, gasPrice, rpcEndpoint, or full config. @@ -568,6 +582,7 @@ export class HermesClient { address: this.#senderAddress, priceFeedId: smartContractConfig.price_feed_id, contractAddress: this.#config.contractAddress, + lastPriceUpdateReceivedAt: this.#lastPriceUpdateReceivedAt, }; } } diff --git a/src/price-stream/polling-price-stream.test.ts b/src/price-stream/polling-price-stream/polling-price-stream.test.ts similarity index 91% rename from src/price-stream/polling-price-stream.test.ts rename to src/price-stream/polling-price-stream/polling-price-stream.test.ts index 2247341..c6c93e9 100644 --- a/src/price-stream/polling-price-stream.test.ts +++ b/src/price-stream/polling-price-stream/polling-price-stream.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from "vitest"; -import type { HermesResponse } from "../types.ts"; +import type { HermesResponse } from "../../types.ts"; import { pollPriceStream, type PollPriceStreamOptions } from "./polling-price-stream.ts"; describe("pollPriceStream", () => { @@ -106,22 +106,6 @@ describe("pollPriceStream", () => { expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] }); }); - it("logs price details on successful fetch", async () => { - const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; - const data = createHermesResponse(); - const options = createOptions({ - fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse(data)), - logger, - }); - - const gen = pollPriceStream(options); - await gen.next(); - - expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Fetched price from Hermes: 1000")); - expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Confidence: 10")); - expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("VAA size:")); - }); - it("polls repeatedly yielding updates", async () => { const data1 = createHermesResponse(); const data2 = createHermesResponse({ diff --git a/src/price-stream/polling-price-stream.ts b/src/price-stream/polling-price-stream/polling-price-stream.ts similarity index 79% rename from src/price-stream/polling-price-stream.ts rename to src/price-stream/polling-price-stream/polling-price-stream.ts index d8a7fd3..cc36e2d 100644 --- a/src/price-stream/polling-price-stream.ts +++ b/src/price-stream/polling-price-stream/polling-price-stream.ts @@ -3,9 +3,10 @@ import https from "node:https"; import { performance } from "node:perf_hooks"; import { Readable } from "node:stream"; import { setTimeout as delay } from "node:timers/promises"; -import { hermesFetchDuration } from "../metrics.ts"; -import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate, PythPriceData } from "../types.ts"; -import { validateEndpointUrl } from "../validation.ts"; +import { hermesFetchDuration } from "../../metrics.ts"; +import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate } from "../../types.ts"; +import { validateEndpointUrl } from "../../validation.ts"; +import { parsePriceUpdate } from "../utils.ts"; export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGenerator { if (!options.priceFeedId) { @@ -37,7 +38,6 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe continue; } finally { hermesFetchDuration.record(performance.now() - fetchStart, { status }); - console.log(`Fetch from Hermes completed with status ${status} in ${performance.now() - fetchStart} ms`); } if (!response.ok) { @@ -48,32 +48,22 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe continue; } - const data = await response.json() as HermesResponse; - - if (!data.parsed || data.parsed.length === 0) { - options.logger?.error("No price data returned from Hermes"); + let parsedData: HermesResponse; + try { + parsedData = await response.json() as HermesResponse; + } catch (error) { + options.logger?.error(`Error parsing JSON from Hermes: ${(error as Error).message}`); continue; } - if (!data.binary?.data || data.binary.data.length === 0) { - options.logger?.error("No VAA binary data returned from Hermes"); + const priceUpdateResult = parsePriceUpdate(parsedData); + + if (!priceUpdateResult.ok) { + options.logger?.error(priceUpdateResult.message); continue; } - const priceData: PythPriceData = data.parsed[0]; - const vaa: string = data.binary.data[0]; - - options.logger?.log( - `Fetched price from Hermes: ${priceData.price.price} (expo: ${priceData.price.expo})`, - ); - options.logger?.log( - ` Confidence: ${priceData.price.conf}, Publish time: ${priceData.price.publish_time}`, - ); - options.logger?.log( - ` VAA size: ${vaa.length} bytes (base64)`, - ); - - yield { priceData, vaa }; + yield priceUpdateResult.value; if (options.pollingIntervalMs > 0) { await delay(options.pollingIntervalMs, undefined, { signal: options.signal }) .catch((error) => options.logger?.warn(`Polling delay interrupted: ${(error as Error).message}`)); diff --git a/src/price-stream/price-sse-stream/price-sse-stream.test.ts b/src/price-stream/price-sse-stream/price-sse-stream.test.ts new file mode 100644 index 0000000..b1d983c --- /dev/null +++ b/src/price-stream/price-sse-stream/price-sse-stream.test.ts @@ -0,0 +1,451 @@ +import { describe, expect, it, vi } from "vitest"; +import type { HermesResponse } from "../../types.ts"; +import { priceSSEStream, type PriceSSEStreamOptions } from "./price-sse-stream.ts"; + +describe(priceSSEStream.name, () => { + it("throws when priceFeedId is not provided", async () => { + const options = createOptions({ priceFeedId: "" }); + const gen = priceSSEStream(options); + + await expect(gen.next()).rejects.toThrow("Price feed ID not provided"); + }); + + it("throws when baseUrl is invalid", async () => { + const options = createOptions({ baseUrl: "not-a-url", unsafeAllowInsecureEndpoints: false }); + const gen = priceSSEStream(options); + + await expect(gen.next()).rejects.toThrow("not a valid URL"); + }); + + it("throws when baseUrl is not HTTPS and insecure endpoints are not allowed", async () => { + const options = createOptions({ baseUrl: "http://example.com", unsafeAllowInsecureEndpoints: false }); + const gen = priceSSEStream(options); + + await expect(gen.next()).rejects.toThrow("only HTTPS endpoints are allowed"); + }); + + it("yields price update on successful SSE event", async () => { + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + events: mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("constructs the correct URL with query params", async () => { + const fetchMock = vi.fn().mockResolvedValueOnce(mockFetchResponse()); + const options = createOptions({ + fetch: fetchMock, + priceFeedId: "feed-xyz", + events: mockEvents([ + { data: JSON.stringify(createHermesResponse()), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + const calledUrl = fetchMock.mock.calls[0][0] as string; + expect(calledUrl).toContain("/v2/updates/price/stream?"); + expect(calledUrl).toContain("ids%5B%5D=feed-xyz"); + expect(calledUrl).toContain("encoding=base64"); + }); + + it("passes signal to fetch", async () => { + const controller = new AbortController(); + const fetchMock = vi.fn().mockResolvedValueOnce(mockFetchResponse()); + const options = createOptions({ + fetch: fetchMock, + signal: controller.signal, + events: mockEvents([ + { data: JSON.stringify(createHermesResponse()), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + expect(fetchMock.mock.calls[0][1]).toEqual( + expect.objectContaining({ signal: controller.signal }), + ); + }); + + it("throws on non-ok HTTP response after max retries", async () => { + const fetchMock = vi.fn().mockResolvedValue(mockFetchResponse(500)); + const options = createOptions({ fetch: fetchMock }); + + const gen = priceSSEStream(options); + await expect(gen.next()).rejects.toThrow("Unable to connect to Hermes price stream after 3 attempts"); + expect(fetchMock).toHaveBeenCalledTimes(4); // 1 initial + 3 retries + }); + + it("logs error on non-ok HTTP response", async () => { + const logger = createLogger(); + const data = createHermesResponse(); + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse(500)) + .mockResolvedValueOnce(mockFetchResponse()); + const options = createOptions({ + fetch: fetchMock, + logger, + events: mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("HTTP 500"), + ); + }); + + it("retries and recovers after fetch failure", async () => { + const data = createHermesResponse(); + const fetchMock = vi.fn() + .mockRejectedValueOnce(new Error("network error")) + .mockResolvedValueOnce(mockFetchResponse()); + const options = createOptions({ + fetch: fetchMock, + events: mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("skips events without data", async () => { + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + events: mockEvents([ + { id: "1" }, + { data: "", id: "2" }, + { data: JSON.stringify(data), id: "3" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("skips events with invalid JSON and logs error", async () => { + const logger = createLogger(); + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + logger, + events: mockEvents([ + { data: "not-json", id: "1" }, + { data: JSON.stringify(data), id: "2" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("Error parsing JSON"), + ); + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("skips events with empty parsed data and logs error", async () => { + const logger = createLogger(); + const goodData = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + logger, + events: mockEvents([ + { data: JSON.stringify(createHermesResponse({ parsed: [] })), id: "1" }, + { data: JSON.stringify(goodData), id: "2" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith("No price data returned from Hermes"); + expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] }); + }); + + it("skips events with empty binary data and logs error", async () => { + const logger = createLogger(); + const goodData = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + logger, + events: mockEvents([ + { data: JSON.stringify(createHermesResponse({ binary: { data: [] } })), id: "1" }, + { data: JSON.stringify(goodData), id: "2" }, + ]), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith("No VAA binary data returned from Hermes"); + expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] }); + }); + + it("sends Last-Event-ID header on reconnect", async () => { + const data = createHermesResponse(); + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse()) + .mockResolvedValueOnce(mockFetchResponse()); + const eventsFn = vi.fn() + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data), id: "42" }, + ])) + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data), id: "43" }, + ])); + const options = createOptions({ fetch: fetchMock, events: eventsFn }); + + const gen = priceSSEStream(options); + await gen.next(); + await gen.next(); + + const secondCallHeaders = fetchMock.mock.calls[1][1]?.headers as Record; + expect(secondCallHeaders["Last-Event-ID"]).toBe("42"); + }); + + it("does not send Last-Event-ID header on first connection", async () => { + const data = createHermesResponse(); + const fetchMock = vi.fn().mockResolvedValueOnce(mockFetchResponse()); + const options = createOptions({ + fetch: fetchMock, + events: mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + const firstCallHeaders = fetchMock.mock.calls[0][1]?.headers as Record; + expect(firstCallHeaders["Last-Event-ID"]).toBeUndefined(); + }); + + it("updates retry delay when event contains retry directive", async () => { + const logger = createLogger(); + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + logger, + events: mockEvents([ + { data: JSON.stringify(data), id: "1", retry: 5000 }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + expect(logger.log).toHaveBeenCalledWith( + expect.stringContaining("retry directive"), + ); + expect(logger.log).toHaveBeenCalledWith( + expect.stringContaining("5000"), + ); + }); + + it("stops immediately when signal is already aborted", async () => { + const fetchMock = vi.fn(); + const options = createOptions({ + fetch: fetchMock, + signal: AbortSignal.abort(), + }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(result.done).toBe(true); + expect(result.value).toBeUndefined(); + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("stops when fetch throws AbortError", async () => { + const abortError = new DOMException("The operation was aborted", "AbortError"); + const fetchMock = vi.fn().mockRejectedValueOnce(abortError); + const options = createOptions({ fetch: fetchMock }); + + const gen = priceSSEStream(options); + const result = await gen.next(); + + expect(result.done).toBe(true); + expect(result.value).toBeUndefined(); + }); + + it("resets retry count after successful connection", async () => { + const data = createHermesResponse(); + const fetchMock = vi.fn() + .mockRejectedValueOnce(new Error("connection refused")) + .mockResolvedValueOnce(mockFetchResponse()) + .mockRejectedValueOnce(new Error("connection refused")) + .mockResolvedValueOnce(mockFetchResponse()); + + const eventsFn = vi.fn() + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data), id: "1" }, + ])) + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data), id: "2" }, + ])); + + const options = createOptions({ fetch: fetchMock, events: eventsFn }); + const gen = priceSSEStream(options); + + const first = await gen.next(); + expect(first.value).toEqual({ priceData: data.parsed[0], vaa: data.binary.data[0] }); + + const second = await gen.next(); + expect(second.value).toEqual({ priceData: data.parsed[0], vaa: data.binary.data[0] }); + }); + + it("reconnects when stream ends without error", async () => { + const data1 = createHermesResponse(); + const data2 = createHermesResponse({ + parsed: [{ + id: "abc123", + price: { price: "2000", conf: "20", expo: -8, publish_time: 1700000001 }, + ema_price: { price: "1999", conf: "21", expo: -8, publish_time: 1700000001 }, + }], + binary: { data: ["BAUG"] }, + }); + + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse()) + .mockResolvedValueOnce(mockFetchResponse()); + const eventsFn = vi.fn() + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data1), id: "1" }, + ])) + .mockReturnValueOnce(toAsyncGenerator([ + { data: JSON.stringify(data2), id: "2" }, + ])); + + const options = createOptions({ fetch: fetchMock, events: eventsFn }); + const gen = priceSSEStream(options); + + const first = await gen.next(); + const second = await gen.next(); + + expect(first.value).toEqual({ priceData: data1.parsed[0], vaa: data1.binary.data[0] }); + expect(second.value).toEqual({ priceData: data2.parsed[0], vaa: data2.binary.data[0] }); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it("logs connection message", async () => { + const logger = createLogger(); + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse()), + logger, + events: mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]), + }); + + const gen = priceSSEStream(options); + await gen.next(); + + expect(logger.log).toHaveBeenCalledWith( + expect.stringContaining("Connecting to Hermes price stream"), + ); + }); + + it("passes signal to events() parser", async () => { + const controller = new AbortController(); + const fetchResponse = mockFetchResponse(); + const fetchMock = vi.fn().mockResolvedValueOnce(fetchResponse); + const data = createHermesResponse(); + const eventsFn = mockEvents([ + { data: JSON.stringify(data), id: "1" }, + ]); + const options = createOptions({ fetch: fetchMock, signal: controller.signal, events: eventsFn }); + + const gen = priceSSEStream(options); + await gen.next(); + + expect(eventsFn).toHaveBeenCalledWith(fetchResponse, controller.signal); + }); +}); + +interface SSEEvent { + data?: string; + id?: string | number; + retry?: number; + event?: string; + comment?: string; +} + +function createHermesResponse(overrides?: Partial): HermesResponse { + return { + parsed: [{ + id: "abc123", + price: { price: "1000", conf: "10", expo: -8, publish_time: 1700000000 }, + ema_price: { price: "999", conf: "11", expo: -8, publish_time: 1700000000 }, + }], + binary: { data: ["AQID"] }, + ...overrides, + }; +} + +function createOptions(overrides?: Partial): PriceSSEStreamOptions { + return { + priceFeedId: "abc123", + baseUrl: "http://localhost:4000", + unsafeAllowInsecureEndpoints: true, + fetch: vi.fn(), + delay: vi.fn().mockResolvedValue(undefined), + events: mockEvents([]), + ...overrides, + }; +} + +function mockFetchResponse(status = 200): Response { + return { + ok: status >= 200 && status < 300, + status, + } as unknown as Response; +} + +function mockEvents(items: SSEEvent[]) { + return vi.fn().mockReturnValueOnce(toAsyncGenerator(items)); +} + +async function* toAsyncGenerator(items: T[]): AsyncGenerator { + for (const item of items) { + yield item; + } +} + +function createLogger() { + return { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; +} diff --git a/src/price-stream/price-sse-stream/price-sse-stream.ts b/src/price-stream/price-sse-stream/price-sse-stream.ts new file mode 100644 index 0000000..edc986b --- /dev/null +++ b/src/price-stream/price-sse-stream/price-sse-stream.ts @@ -0,0 +1,97 @@ +import { events } from "fetch-event-stream"; +import { setTimeout } from "timers/promises"; +import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate } from "../../types.ts"; +import { validateEndpointUrl } from "../../validation.ts"; +import { parsePriceUpdate } from "../utils.ts"; + +export async function *priceSSEStream(options: PriceSSEStreamOptions): AsyncGenerator { + if (!options.priceFeedId) { + throw new Error("Price feed ID not provided to PriceSSEStream"); + } + + validateEndpointUrl(options.baseUrl, "Hermes endpoint", !options.unsafeAllowInsecureEndpoints); + + // Request base64 encoding for VAA data (compatible with CosmWasm Binary) + const params = new URLSearchParams({ + "ids[]": options.priceFeedId, + encoding: "base64", + }); + const fetch = options.fetch ?? globalThis.fetch; + const delay = options.delay ?? setTimeout; + const parseEvents = options.events ?? events; + const maxRetries = 3; + let retryDelayMs = 2000; + let retryCount = 0; + let lastEventId: string | number | undefined = undefined; + while (!options.signal?.aborted) { + try { + const headers: Record = {}; + if (lastEventId) { + headers["Last-Event-ID"] = String(lastEventId); + } + + options.logger?.log(`Connecting to Hermes price stream at ${options.baseUrl}${lastEventId ? ` (Last-Event-ID: ${lastEventId})` : ""}...`); + const response = await fetch(`${options.baseUrl}/v2/updates/price/stream?${params.toString()}`, { + headers, + signal: options.signal, + }); + if (!response.ok) { + const statusText = response.status ? ` (HTTP ${response.status})` : ""; + throw new Error( + `Failed to connect to Hermes price stream${statusText}: price data unavailable`, + ); + } + + retryCount = 0; + const stream = parseEvents(response, options.signal); + for await (const event of stream) { + if (event.id !== undefined) { + lastEventId = event.id; + } + + if (event.retry !== undefined) { + retryDelayMs = event.retry; + options.logger?.log(`Received retry directive from Hermes stream: ${retryDelayMs} ms`); + } + + if (!event.data) continue; + + let parsedData: HermesResponse; + try { + parsedData = JSON.parse(event.data); + } catch (error) { + options.logger?.error(`Error parsing JSON from Hermes stream: ${(error as Error).message}`); + continue; + } + + const priceUpdateResult = parsePriceUpdate(parsedData); + if (!priceUpdateResult.ok) { + options.logger?.error(priceUpdateResult.message); + continue; + } + + yield priceUpdateResult.value; + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + break; + } + options.logger?.error(`Error connecting to Hermes price stream: ${(error as Error).message}`); + if (++retryCount > maxRetries) { + options.logger?.error(`Exceeded maximum retry attempts (${maxRetries}) for connecting to Hermes price stream`); + throw new Error(`Unable to connect to Hermes price stream after ${maxRetries} attempts`); + } + options.logger?.log(`Retrying connection to Hermes price stream (attempt ${retryCount}/${maxRetries})...`); + await delay(retryDelayMs, undefined, { signal: options.signal }) + .catch((error) => options.logger?.warn(`Retry delay interrupted: ${(error as Error).message}`)); + } + } +} + +export interface PriceSSEStreamOptions extends PriceProducerFactoryOptions { + baseUrl: string; + unsafeAllowInsecureEndpoints?: boolean; + fetch?: typeof globalThis.fetch; + delay?: typeof setTimeout; + events?: typeof events; +} diff --git a/src/price-stream/utils.ts b/src/price-stream/utils.ts new file mode 100644 index 0000000..c1a6a1a --- /dev/null +++ b/src/price-stream/utils.ts @@ -0,0 +1,20 @@ +import type { HermesResponse, PriceUpdate, PythPriceData } from "../types.ts"; + +export function parsePriceUpdate(data: HermesResponse): PriceUpdateResult { + if (!data.parsed || data.parsed.length === 0) { + return { ok: false, message: "No price data returned from Hermes" }; + } + + if (!data.binary?.data || data.binary.data.length === 0) { + return { ok: false, message: "No VAA binary data returned from Hermes" }; + } + + const priceData: PythPriceData = data.parsed[0]; + const vaa: string = data.binary.data[0]; + + return { ok: true, value: { priceData, vaa } }; +} + +export type PriceUpdateResult = + | { ok: true; value: PriceUpdate } + | { ok: false; message: string };