diff --git a/README.md b/README.md index dd682b8..9bb4244 100644 --- a/README.md +++ b/README.md @@ -175,11 +175,12 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}' | `WALLET_SECRET` | Yes | - | Either `privateKey:` or `mnemonic:<12/24 words>` | | `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API | | `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) | -| `UPDATE_INTERVAL_MS` | No | `300000` | Update interval (5 min) | +| `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) | | `GAS_PRICE` | No | `0.025uakt` | Gas price | | `DENOM` | No | `uakt` | Token denomination | | `HEALTHCHECK_PORT` | No | 3000 | healthcheck server port | | `OTEL_RESOURCE_ATTRIBUTES` | No | | additional attributes attached to all metrics (e.g., `service.name=hermes,service.version=1.1.0,deployment.environment=production`) | +| `SMART_CONTRACT_CONFIG_CACHE_TTL_MS` | No | 3600000 (1h) | smart contract config cache ttl in milliseconds | ### Instrumentation diff --git a/src/cli-commands/command-config.test.ts b/src/cli-commands/command-config.test.ts index 9e0c006..680ccdb 100644 --- a/src/cli-commands/command-config.test.ts +++ b/src/cli-commands/command-config.test.ts @@ -64,32 +64,32 @@ describe("parseConfig", () => { expect((result as Extract).value.rpcEndpoint).toBe("https://custom-rpc:443"); }); - it("passes HERMES_ENDPOINT to config", () => { + it("accepts HERMES_ENDPOINT and produces priceProducerFactory", () => { const result = parseConfig(validEnv({ HERMES_ENDPOINT: "https://hermes.example.com" })); expect(result.ok).toBe(true); - expect((result as Extract).value.hermesEndpoint).toBe("https://hermes.example.com"); + expect((result as Extract).value.priceProducerFactory).toBeTypeOf("function"); }); - it("uses default hermesEndpoint when HERMES_ENDPOINT is not provided", () => { + it("uses default HERMES_ENDPOINT and produces priceProducerFactory", () => { const result = parseConfig(validEnv()); expect(result.ok).toBe(true); - expect((result as Extract).value.hermesEndpoint).toBe("https://hermes.pyth.network"); + expect((result as Extract).value.priceProducerFactory).toBeTypeOf("function"); }); - it("parses UPDATE_INTERVAL_MS as integer", () => { + it("accepts UPDATE_INTERVAL_MS and produces priceProducerFactory", () => { const result = parseConfig(validEnv({ UPDATE_INTERVAL_MS: "5000" })); expect(result.ok).toBe(true); - expect((result as Extract).value.updateIntervalMs).toBe(5000); + expect((result as Extract).value.priceProducerFactory).toBeTypeOf("function"); }); - it("uses default updateIntervalMs when UPDATE_INTERVAL_MS is not provided", () => { + it("uses default UPDATE_INTERVAL_MS and produces priceProducerFactory", () => { const result = parseConfig(validEnv()); expect(result.ok).toBe(true); - expect((result as Extract).value.updateIntervalMs).toBe(5 * 60 * 1000); + expect((result as Extract).value.priceProducerFactory).toBeTypeOf("function"); }); it("returns error when UPDATE_INTERVAL_MS is not a valid integer", () => { diff --git a/src/cli-commands/command-config.ts b/src/cli-commands/command-config.ts index 71c82c8..68633f6 100644 --- a/src/cli-commands/command-config.ts +++ b/src/cli-commands/command-config.ts @@ -1,11 +1,14 @@ import { z } from "zod"; import { HermesClient, type HermesConfig } from "../hermes-client.ts"; import { validateContractAddress, validateWalletSecret } from "../validation.ts"; +import type { PriceProducerFactoryOptions } from "../types.ts"; +import { pollPriceStream } from "../price-stream/polling-price-stream.ts"; export interface CommandConfig extends HermesConfig { createHermesClient: (config: HermesConfig) => Promise; signal: AbortSignal; healthcheckPort: number; + rawConfig: z.infer; } const configSchema = z.object({ @@ -38,11 +41,12 @@ const configSchema = z.object({ }); } }).optional(), - UPDATE_INTERVAL_MS: z.coerce.number().int().min(1000).positive().default(5 * 60 * 1000), // Default to 5 minutes + UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000), GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"), DENOM: z.string().default("uakt"), NODE_ENV: z.enum(["development", "production"]).optional(), + SMART_CONTRACT_CONFIG_CACHE_TTL_MS: z.coerce.number().int().min(1000).positive().default(60 * 60 * 1000), }); type ParsedConfig = Omit; @@ -54,17 +58,26 @@ export function parseConfig(config: Record): ParseCo return { ok: false, error: z.prettifyError(result.error) }; } + const unsafeAllowInsecureEndpoints = result.data.NODE_ENV === "development"; // Enforce secure endpoints in production const parsedConfig: ParsedConfig = { - unsafeAllowInsecureEndpoints: result.data.NODE_ENV === "development", // Enforce secure endpoints in production + rawConfig: result.data, + unsafeAllowInsecureEndpoints, rpcEndpoint: result.data.RPC_ENDPOINT, - hermesEndpoint: result.data.HERMES_ENDPOINT, contractAddress: result.data.CONTRACT_ADDRESS, walletSecret: result.data.WALLET_SECRET, - updateIntervalMs: result.data.UPDATE_INTERVAL_MS, healthcheckPort: result.data.HEALTHCHECK_PORT, gasPrice: result.data.GAS_PRICE, denom: result.data.DENOM, priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE, + smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS, + priceProducerFactory(options: PriceProducerFactoryOptions) { + return pollPriceStream({ + ...options, + unsafeAllowInsecureEndpoints, + baseUrl: result.data.HERMES_ENDPOINT, + pollingIntervalMs: result.data.UPDATE_INTERVAL_MS, + }); + }, createHermesClient: (cfg: HermesConfig) => HermesClient.connect(cfg), }; diff --git a/src/cli-commands/daemon-command.test.ts b/src/cli-commands/daemon-command.test.ts index 728aabb..a462a44 100644 --- a/src/cli-commands/daemon-command.test.ts +++ b/src/cli-commands/daemon-command.test.ts @@ -40,13 +40,13 @@ describe("daemonCommand", () => { const promise = daemonCommand(config); await waitForServer(logger); - const reponse = await fetch(`http://localhost:${config.healthcheckPort}/health`); + const port = getServerPort(logger); + const reponse = await fetch(`http://localhost:${port}/health`); expect(reponse.status).toBe(200); expect(client.getStatus).toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( - `Health check endpoint available at http://localhost:${config.healthcheckPort}/health`, + expect.stringMatching(/Health check endpoint available at http:\/\/localhost:\d+\/health/), ); - expect(logger.log).toHaveBeenCalledWith("Daemon started. Press Ctrl+C to stop.\n"); abortController.abort(); await promise; @@ -57,7 +57,8 @@ describe("daemonCommand", () => { const promise = daemonCommand(config); await waitForServer(logger); - const response = await fetch(`http://localhost:${config.healthcheckPort}/invalid`); + const port = getServerPort(logger); + const response = await fetch(`http://localhost:${port}/invalid`); expect(response.status).toBe(404); abortController.abort(); @@ -74,7 +75,6 @@ describe("daemonCommand", () => { expect(logger.log).toHaveBeenCalledWith("\n\nShutting down daemon..."); expect(logger.log).toHaveBeenCalledWith("\nStopping health check server..."); - expect(logger.log).toHaveBeenCalledWith("Health check server stopped"); }); it("stops server immediately if signal is already aborted on startup", async () => { @@ -91,21 +91,31 @@ describe("daemonCommand", () => { }); } + function getServerPort(logger: Console): number { + const calls = (logger.log as ReturnType).mock.calls; + const call = calls.find((c: unknown[]) => typeof c[0] === "string" && /localhost:\d+/.test(c[0] as string)); + const match = (call![0] as string).match(/localhost:(\d+)/); + return parseInt(match![1], 10); + } + let testAbortController: AbortController | null = null; function setup() { const client = mock(); - client.getStatus.mockReturnValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" }); + client.getStatus.mockResolvedValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" }); const logger = mock(); const abortController = new AbortController(); testAbortController = abortController; const config: CommandConfig = { rpcEndpoint: "https://rpc.akashnet.net:443", contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", - mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" }, + priceProducerFactory: vi.fn(async function* () {}) as unknown as CommandConfig["priceProducerFactory"], logger, signal: abortController.signal, - healthcheckPort: 3001, + healthcheckPort: 0, createHermesClient: vi.fn(() => Promise.resolve(client)), + smartContractConfigCacheTTLMs: 0, + rawConfig: {} as CommandConfig["rawConfig"], }; return { config, client, logger, abortController }; } diff --git a/src/cli-commands/daemon-command.ts b/src/cli-commands/daemon-command.ts index d3e13d6..5016425 100644 --- a/src/cli-commands/daemon-command.ts +++ b/src/cli-commands/daemon-command.ts @@ -1,18 +1,26 @@ import http from "node:http"; -import { once } from "node:events"; import type { AddressInfo } from "node:net"; import { prometheusExporter } from "../instrumentation/prometheus-exporter.ts"; import type { CommandConfig } from "./command-config.ts"; export async function daemonCommand(config: CommandConfig): Promise { + if (config.signal.aborted) return; + config.logger?.log("Starting daemon mode...\n"); const client = await config.createHermesClient(config); const server = http.createServer((req, res) => { if (req.method === "GET" && req.url === "/health") { - const status = client.getStatus(); - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(status)); + client.getStatus() + .then((status) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify(status)); + }) + .catch((error) => { + config.logger?.log(`Error fetching health status: ${error.message}`); + res.writeHead(500, { "Content-Type": "application/json" }); + res.end(); + }); } else if (req.method === "GET" && req.url === "/metrics") { prometheusExporter.getMetricsRequestHandler(req, res); } else { @@ -23,33 +31,18 @@ export async function daemonCommand(config: CommandConfig): Promise { const abort = () => { config.logger?.log("\n\nShutting down daemon..."); config.logger?.log("\nStopping health check server..."); - return new Promise((resolve) => { - server.close((err) => { - if (err) { - config.logger?.log(`Error stopping health check server: ${err.message}`); - } - resolve(); - config.logger?.log("Health check server stopped"); - }); - }); }; config.signal.addEventListener("abort", abort, { once: true }); - await client.start({ signal: config.signal }); - await new Promise((resolve, reject) => { - if (config.signal.aborted) return resolve(); - server.once("error", reject); - server.listen(config.healthcheckPort, () => { - resolve(); - server.off("error", reject); - if (!config.signal.aborted) { + await Promise.all([ + client.start({ signal: config.signal }), + new Promise((resolve, reject) => { + server.once("error", reject); + server.listen({ port: config.healthcheckPort, signal: config.signal }, () => { config.logger?.log(`Health check endpoint available at http://localhost:${(server.address() as AddressInfo).port}/health`); - } - }); - }); - if (config.signal.aborted && server.listening) { - await abort(); - } else if (server.listening) { - config.logger?.log("Daemon started. Press Ctrl+C to stop.\n"); - await once(server, "close"); - } + server.off("error", reject); + server.once("close", resolve); + }); + }), + ]); + config.signal.removeEventListener("abort", abort); } diff --git a/src/cli-commands/status-command.test.ts b/src/cli-commands/status-command.test.ts index e31777d..f830ddb 100644 --- a/src/cli-commands/status-command.test.ts +++ b/src/cli-commands/status-command.test.ts @@ -1,4 +1,3 @@ -import EventEmitter from "node:events"; import { describe, it, expect, vi } from "vitest"; import { mock } from "vitest-mock-extended"; import type { HermesClient } from "../hermes-client.ts"; @@ -8,22 +7,22 @@ import { statusCommand } from "./status-command.ts"; function setup() { const client = mock(); const logger = mock(); - const config: CommandConfig = { + const config = { rpcEndpoint: "https://rpc.akashnet.net:443", contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", - mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", - hermesEndpoint: "https://hermes.pyth.network", + rawConfig: { + HERMES_ENDPOINT: "https://hermes.pyth.network", + }, logger, - process: new EventEmitter(), createHermesClient: vi.fn(() => Promise.resolve(client)), - }; + } as unknown as CommandConfig; return { config, client, logger }; } describe("statusCommand", () => { it("displays client status information", async () => { const { config, client, logger } = setup(); - client.getStatus.mockReturnValueOnce({ + client.getStatus.mockResolvedValueOnce({ address: "akash1sender", contractAddress: "akash1contract", priceFeedId: "feed-123", @@ -41,7 +40,7 @@ describe("statusCommand", () => { it("displays running status as yes when client is running", async () => { const { config, client, logger } = setup(); - client.getStatus.mockReturnValueOnce({ + client.getStatus.mockResolvedValueOnce({ address: "akash1sender", contractAddress: "akash1contract", priceFeedId: "feed-123", @@ -55,7 +54,7 @@ describe("statusCommand", () => { it("displays RPC and Hermes endpoints from config", async () => { const { config, client, logger } = setup(); - client.getStatus.mockReturnValueOnce({ + client.getStatus.mockResolvedValueOnce({ address: "akash1sender", contractAddress: "akash1contract", priceFeedId: "feed-123", @@ -70,8 +69,8 @@ describe("statusCommand", () => { it("uses default Hermes endpoint when not configured", async () => { const { config, client, logger } = setup(); - delete config.hermesEndpoint; - client.getStatus.mockReturnValueOnce({ + (config.rawConfig as Record).HERMES_ENDPOINT = "https://hermes.pyth.network"; + client.getStatus.mockResolvedValueOnce({ address: "akash1sender", contractAddress: "akash1contract", priceFeedId: "feed-123", diff --git a/src/cli-commands/status-command.ts b/src/cli-commands/status-command.ts index 3c31c2f..29551ae 100644 --- a/src/cli-commands/status-command.ts +++ b/src/cli-commands/status-command.ts @@ -4,7 +4,7 @@ export async function statusCommand(config: CommandConfig): Promise { config.logger?.log("Contract Status...\n"); const client = await config.createHermesClient(config); - const status = client.getStatus(); + const status = await client.getStatus(); config.logger?.log("Client Status:"); config.logger?.log("─────────────────────────────"); @@ -13,5 +13,5 @@ export async function statusCommand(config: CommandConfig): Promise { config.logger?.log(`Price Feed ID: ${status.priceFeedId}`); config.logger?.log(`Running: ${status.isRunning ? "yes" : "no"}`); config.logger?.log(`RPC Endpoint: ${config.rpcEndpoint}`); - config.logger?.log(`Hermes Endpoint: ${config.hermesEndpoint || "https://hermes.pyth.network"}`); + config.logger?.log(`Hermes Endpoint: ${config.rawConfig.HERMES_ENDPOINT}`); } diff --git a/src/cli-commands/update-command.test.ts b/src/cli-commands/update-command.test.ts index 02ccee9..1ee83a5 100644 --- a/src/cli-commands/update-command.test.ts +++ b/src/cli-commands/update-command.test.ts @@ -1,19 +1,45 @@ -import EventEmitter from "node:events"; -import { describe, it, expect, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { mock } from "vitest-mock-extended"; import type { HermesClient } from "../hermes-client.ts"; +import type { PriceUpdate } from "../types.ts"; import type { CommandConfig } from "./command-config.ts"; import { updateCommand } from "./update-command.ts"; +const fakePriceUpdate: PriceUpdate = { + priceData: { + id: "test-feed-id", + price: { price: "100", conf: "1", expo: -8, publish_time: 1000 }, + ema_price: { price: "100", conf: "1", expo: -8, publish_time: 1000 }, + }, + vaa: "dGVzdC12YWE=", +}; + +async function* fakePriceProducer(): AsyncGenerator { + yield fakePriceUpdate; +} + function setup() { const client = mock(); + client.queryConfig.mockResolvedValue({ + admin: "akash1admin", + wormhole_contract: "akash1wormhole", + update_fee: "1", + price_feed_id: "test-feed-id", + default_denom: "uakt", + default_base_denom: "uakt", + data_sources: [], + }); const logger = mock(); const config: CommandConfig = { rpcEndpoint: "https://rpc.akashnet.net:443", contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", - mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about", + walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" }, logger, - process: new EventEmitter(), + signal: AbortSignal.abort(), + healthcheckPort: 3000, + rawConfig: {} as CommandConfig["rawConfig"], + smartContractConfigCacheTTLMs: 60000, + priceProducerFactory: vi.fn(() => fakePriceProducer()), createHermesClient: vi.fn(() => Promise.resolve(client)), }; return { config, client, logger }; @@ -33,7 +59,7 @@ describe("updateCommand", () => { await updateCommand(config); expect(config.createHermesClient).toHaveBeenCalledWith(config); - expect(client.updatePrice).toHaveBeenCalledOnce(); + expect(client.updatePrice).toHaveBeenCalledWith(fakePriceUpdate); }); it("propagates errors from updatePrice", async () => { diff --git a/src/cli-commands/update-command.ts b/src/cli-commands/update-command.ts index b97f3d8..d00fa65 100644 --- a/src/cli-commands/update-command.ts +++ b/src/cli-commands/update-command.ts @@ -3,6 +3,17 @@ import type { CommandConfig } from "./command-config.ts"; export async function updateCommand(config: CommandConfig): Promise { config.logger?.log("Updating oracle price...\n"); const client = await config.createHermesClient(config); - await client.updatePrice(); - config.logger?.log("\nUpdate completed successfully!"); + const smartCotractConfig = await client.queryConfig(); + const priceStream = config.priceProducerFactory({ + priceFeedId: smartCotractConfig.price_feed_id, + logger: config.logger, + signal: config.signal, + }); + const priceUpdate = await priceStream.next(); + if (priceUpdate.value) { + await client.updatePrice(priceUpdate.value); + config.logger?.log("\nUpdate completed successfully!"); + } else { + config.logger?.log("\nUpdate skipped because no new price was available."); + } } diff --git a/src/hermes-client.test.ts b/src/hermes-client.test.ts index 6a3fb2e..9f437f3 100644 --- a/src/hermes-client.test.ts +++ b/src/hermes-client.test.ts @@ -1,7 +1,8 @@ import { SigningCosmWasmClient } from "@cosmjs/cosmwasm-stargate"; import { afterEach, describe, expect, it, vi } from "vitest"; import { mock } from "vitest-mock-extended"; -import { HermesClient, HermesConfig, HermesResponse } from "./hermes-client"; +import { HermesClient, HermesConfig } from "./hermes-client"; +import type { PriceUpdate, PriceProducerFactory, PriceProducerFactoryOptions } from "./types.ts"; // ============================================================ // SEC-01: Mnemonic must never appear in logs or error messages @@ -47,16 +48,8 @@ describe("SEC-02: Endpoint URL validation in HermesClient", () => { })).toThrow("private or internal addresses are not allowed"); }); - it("rejects SSRF-targeted Hermes endpoints (metadata service)", () => { - expect(() => setup({ - hermesEndpoint: "https://169.254.169.254/metadata", - })).toThrow("private or internal addresses are not allowed"); - }); - it("accepts valid HTTPS endpoints", () => { - const { client } = setup({ - hermesEndpoint: "https://hermes.pyth.network", - }); + const { client } = setup(); expect(client).toBeDefined(); }); }); @@ -66,10 +59,10 @@ describe("SEC-02: Endpoint URL validation in HermesClient", () => { // ============================================================ describe("SEC-04: Error message information leakage", () => { it("updatePrice errors do not leak internal paths or stack traces", async () => { - const { client } = setup(); + const { client, priceUpdate } = setup(); // Without initializing, calling updatePrice should fail gracefully - const result = await client.updatePrice().catch(error => ({ error })); + const result = await client.updatePrice(priceUpdate).catch(error => ({ error })); expect(result).toHaveProperty("error"); const { error } = result as { error: Error }; @@ -118,25 +111,17 @@ describe("SEC-05: Admin input validation", () => { // SEC-08: Config/status must not expose sensitive data // ============================================================ describe("SEC-08: Sensitive data in config exposure", () => { - it("getStatus must not include mnemonic, gasPrice, or internal config", () => { + it("getStatus must not include mnemonic, gasPrice, or internal config", async () => { const { client } = setup(); - const status = client.getStatus(); - - expect(status).toEqual({ - isRunning: false, - contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", - priceFeedId: undefined, - address: undefined, - }); - expect(JSON.stringify(status)).not.toContain("abandon"); + await expect(client.getStatus()).rejects.toThrow("Client not initialized"); }); it("getStatus must not include config object or mnemonic when initialized", async () => { const { client } = setup(); await client.initialize(); - const status = client.getStatus(); + const status = await client.getStatus(); expect(status).toEqual({ isRunning: false, @@ -202,7 +187,7 @@ describe(HermesClient.name, () => { "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", { get_config: {} }, ); - const status = client.getStatus(); + const status = await client.getStatus(); expect(status.priceFeedId).toBe("test-feed-id"); expect(status.address).toMatch(/^akash1[0-9a-z]{38}$/); }); @@ -218,7 +203,7 @@ describe(HermesClient.name, () => { "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", { get_config: {} }, ); - const status = client.getStatus(); + const status = await client.getStatus(); expect(status.priceFeedId).toBe("test-feed-id"); expect(status.address).toMatch(/^akash1[0-9a-z]{38}$/); }); @@ -238,17 +223,16 @@ describe(HermesClient.name, () => { describe("updatePrice()", () => { it("throws when client is not initialized", async () => { - const { client } = setup(); - await expect(client.updatePrice()).rejects.toThrow("Client not initialized"); + const { client, priceUpdate } = setup(); + await expect(client.updatePrice(priceUpdate)).rejects.toThrow("Client not initialized"); }); - it("fetches price from Hermes and submits VAA when price is stale", async () => { - const { client, fetch, stargateClient } = setup(); + it("submits VAA when price is stale", async () => { + const { client, priceUpdate, stargateClient } = setup(); stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) - .mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 }) - .mockResolvedValueOnce({ update_fee: "1", wormhole_contract: "akash1wormhole" }); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) + .mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 }); stargateClient.execute.mockResolvedValueOnce({ transactionHash: "ABCD1234", gasUsed: 500000n, @@ -259,9 +243,8 @@ describe(HermesClient.name, () => { }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); - expect(fetch).toHaveBeenCalledTimes(1); expect(stargateClient.execute).toHaveBeenCalledWith( expect.stringMatching(/^akash1/), "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", @@ -273,14 +256,14 @@ describe(HermesClient.name, () => { }); it("skips update when price is already up to date", async () => { - const { client, stargateClient, logger } = setup(); + const { client, priceUpdate, stargateClient, logger } = setup(); stargateClient.queryContractSmart .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) .mockResolvedValueOnce({ publish_time: 1234567890 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( @@ -289,49 +272,28 @@ describe(HermesClient.name, () => { }); it("skips update when contract has newer publish_time", async () => { - const { client, stargateClient } = setup(); + const { client, priceUpdate, stargateClient } = setup(); stargateClient.queryContractSmart .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) .mockResolvedValueOnce({ publish_time: 9999999999 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); }); - it("wraps Hermes HTTP errors with sanitized message", async () => { - const { client, fetch } = setup(); - await client.initialize(); - - fetch.mockResolvedValueOnce(new Response("", { status: 500 })); - - await expect(client.updatePrice()).rejects.toThrow("Failed to update price"); - }); - - it("throws when Hermes returns no parsed data", async () => { - const { client, fetch } = setup(); - await client.initialize(); - - fetch.mockResolvedValueOnce(new Response(JSON.stringify({ - parsed: [], - binary: { data: ["abc"] }, - }))); - - await expect(client.updatePrice()).rejects.toThrow("Failed to update price"); - }); - describe("priceDeviationTolerance", () => { it("skips update when absolute deviation is within tolerance", async () => { - const { client, stargateClient, logger } = setup({ + const { client, priceUpdate, stargateClient, logger } = setup({ priceDeviationTolerance: { type: "absolute", value: 1.0 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForSkip(stargateClient, { price: "10050", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( @@ -340,27 +302,27 @@ describe(HermesClient.name, () => { }); it("updates when absolute deviation exceeds tolerance", async () => { - const { client, stargateClient } = setup({ + const { client, priceUpdate, stargateClient } = setup({ priceDeviationTolerance: { type: "absolute", value: 1.0 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForUpdate(stargateClient, { price: "10200", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); it("skips update when absolute deviation equals tolerance exactly", async () => { - const { client, stargateClient, logger } = setup({ + const { client, priceUpdate, stargateClient, logger } = setup({ priceDeviationTolerance: { type: "absolute", value: 1.0 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForSkip(stargateClient, { price: "10100", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( @@ -369,14 +331,14 @@ describe(HermesClient.name, () => { }); it("skips update when percentage deviation is within tolerance", async () => { - const { client, stargateClient, logger } = setup({ + const { client, priceUpdate, stargateClient, logger } = setup({ priceDeviationTolerance: { type: "percentage", value: 1 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForSkip(stargateClient, { price: "10050", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( @@ -385,27 +347,27 @@ describe(HermesClient.name, () => { }); it("updates when percentage deviation exceeds tolerance", async () => { - const { client, stargateClient } = setup({ + const { client, priceUpdate, stargateClient } = setup({ priceDeviationTolerance: { type: "percentage", value: 1 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForUpdate(stargateClient, { price: "10500", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); it("skips update when percentage deviation equals tolerance exactly", async () => { - const { client, stargateClient, logger } = setup({ + const { client, priceUpdate, stargateClient, logger } = setup({ priceDeviationTolerance: { type: "percentage", value: 1 }, priceFeed: buildPriceFeed("10100", -2, 2000), }); mockForSkip(stargateClient, { price: "10000", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).not.toHaveBeenCalled(); expect(logger.log).toHaveBeenCalledWith( @@ -414,65 +376,48 @@ describe(HermesClient.name, () => { }); it("updates on any price difference with default tolerance (absolute 0)", async () => { - const { client, stargateClient } = setup({ + const { client, priceUpdate, stargateClient } = setup({ priceFeed: buildPriceFeed("10001", -2, 2000), }); mockForUpdate(stargateClient, { price: "10000", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); it("handles different exponents between new and current price", async () => { - const { client, stargateClient } = setup({ + const { client, priceUpdate, stargateClient } = setup({ priceDeviationTolerance: { type: "absolute", value: 1.0 }, priceFeed: buildPriceFeed("1000000", -4, 2000), }); mockForUpdate(stargateClient, { price: "10200", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); it("handles zero current price when calculating percentage deviation", async () => { - const { client, stargateClient } = setup({ + const { client, priceUpdate, stargateClient } = setup({ priceDeviationTolerance: { type: "percentage", value: 10 }, priceFeed: buildPriceFeed("10000", -2, 2000), }); mockForUpdate(stargateClient, { price: "0", expo: -2, publish_time: 1000 }); await client.initialize(); - await client.updatePrice(); + await client.updatePrice(priceUpdate); expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); }); - it("throws when Hermes returns no binary data", async () => { - const { client, fetch } = setup(); - await client.initialize(); - - fetch.mockResolvedValueOnce(new Response(JSON.stringify({ - parsed: [{ - id: "test", - price: { price: "100", conf: "1", expo: -8, publish_time: 123 }, - ema_price: { price: "100", conf: "1", expo: -8, publish_time: 123 }, - }], - binary: { data: [] }, - }))); - - await expect(client.updatePrice()).rejects.toThrow("Failed to update price"); - }); - function mockForUpdate(stargateClient: ReturnType["stargateClient"], currentPrice: { price: string; expo: number; publish_time: number }) { stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) - .mockResolvedValueOnce({ price: currentPrice.price, conf: "10", expo: currentPrice.expo, publish_time: currentPrice.publish_time }) - .mockResolvedValueOnce({ update_fee: "1", wormhole_contract: "akash1wormhole" }); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) + .mockResolvedValueOnce({ price: currentPrice.price, conf: "10", expo: currentPrice.expo, publish_time: currentPrice.publish_time }); stargateClient.execute.mockResolvedValueOnce({ transactionHash: "TX_DEV", gasUsed: 500000n, @@ -549,7 +494,7 @@ describe(HermesClient.name, () => { describe("queryConfig()", () => { it("queries contract for config", async () => { - const { client, stargateClient } = setup(); + const { client, stargateClient } = setup({ smartContractConfigCacheTTLMs: -1 }); const expectedConfig = { admin: "akash1admin", wormhole_contract: "akash1wormhole", @@ -561,7 +506,7 @@ describe(HermesClient.name, () => { }; stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) .mockResolvedValueOnce(expectedConfig); await client.initialize(); @@ -700,14 +645,15 @@ describe(HermesClient.name, () => { }); it("starts once if called concurrently", async () => { - const { client, stargateClient, fetch } = setup(); + const priceUpdate = buildPriceFeed("123.45", -8, 1234567890); + const factory = blockingFactory(priceUpdate); + const { client, stargateClient } = setup({ priceProducerFactory: factory }); const start = client.start.bind(client); const abortController = new AbortController(); stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) - .mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 }) - .mockResolvedValueOnce({ update_fee: "1", wormhole_contract: "akash1wormhole" }); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) + .mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 }); stargateClient.execute.mockResolvedValueOnce({ transactionHash: "TX_CONCURRENT", gasUsed: 500000n, @@ -717,52 +663,58 @@ describe(HermesClient.name, () => { logs: [], }); - await Promise.all([ + const allPromise = Promise.all([ start({ signal: abortController.signal }), start({ signal: abortController.signal }), start({ signal: abortController.signal }), start({ signal: abortController.signal }), - ]).finally(() => abortController.abort()); + ]); - expect(stargateClient.execute).toHaveBeenCalledTimes(1); - expect(fetch).toHaveBeenCalledTimes(1); + await vi.waitFor(() => { + expect(stargateClient.execute).toHaveBeenCalledTimes(1); + }); + + abortController.abort(); + await allPromise; + + expect(factory).toHaveBeenCalledTimes(1); }); it("logs and returns when already running", async () => { - const { client, logger, stargateClient } = setup(); + const priceUpdate = buildPriceFeed("123.45", -8, 1234567890); + const factory = blockingFactory(priceUpdate); + const { client, logger, stargateClient } = setup({ priceProducerFactory: factory }); const abortController = new AbortController(); stargateClient.queryContractSmart .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) .mockResolvedValueOnce({ publish_time: 1234567890 }); - try { - await client.start({ signal: abortController.signal }); - await client.start({ signal: abortController.signal }); - } finally { - abortController.abort(); - } + const startPromise = client.start({ signal: abortController.signal }); + await vi.waitFor(async () => { + expect((await client.getStatus()).isRunning).toBe(true); + }); + await client.start({ signal: abortController.signal }); expect(logger.log).toHaveBeenCalledWith("Hermes client is already running"); + + abortController.abort(); + await startPromise; }); it("initializes client when not already initialized", async () => { const { client, stargateClient } = setup(); stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) - .mockResolvedValueOnce({ publish_time: 1234567890 }); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }); const abortController = new AbortController(); - try { - await client.start({ signal: abortController.signal }); - const status = client.getStatus(); - expect(status.isRunning).toBe(true); - expect(status.priceFeedId).toBe("test-feed-id"); - expect(status.address).toMatch(/^akash1/); - } finally { - abortController.abort(); - } + await client.start({ signal: abortController.signal }); + abortController.abort(); + + const status = await client.getStatus(); + expect(status.priceFeedId).toBe("test-feed-id"); + expect(status.address).toMatch(/^akash1/); }); it("skips initialization when already initialized", async () => { @@ -773,49 +725,53 @@ describe(HermesClient.name, () => { .mockResolvedValueOnce({ publish_time: 1234567890 }); await client.initialize(); - await callAndAbort(client.start.bind(client)); + const abortController = new AbortController(); + await client.start({ signal: abortController.signal }); + abortController.abort(); expect(stargateClient.queryContractSmart).toHaveBeenCalledTimes(2); }); it("stops when abort signal fires", async () => { - const { client, stargateClient, logger } = setup(); + const priceUpdate = buildPriceFeed("123.45", -8, 1234567890); + const factory = blockingFactory(priceUpdate); + const { client, stargateClient, logger } = setup({ priceProducerFactory: factory }); const ac = new AbortController(); stargateClient.queryContractSmart .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) .mockResolvedValueOnce({ publish_time: 1234567890 }); - await client.start({ signal: ac.signal }); - expect(client.getStatus().isRunning).toBe(true); + const startPromise = client.start({ signal: ac.signal }); + await vi.waitFor(async () => { + expect((await client.getStatus()).isRunning).toBe(true); + }); ac.abort(); - expect(client.getStatus().isRunning).toBe(false); + await startPromise; + + expect((await client.getStatus()).isRunning).toBe(false); expect(logger.log).toHaveBeenCalledWith("Hermes client stopped"); }); it("continues running when updatePrice throws", async () => { - const { client, stargateClient, fetch, logger } = setup(); + const { client, stargateClient, logger } = setup(); + const abortController = new AbortController(); stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }); - fetch.mockResolvedValueOnce(new Response("", { status: 500 })); - const abortController = new AbortController(); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) + .mockRejectedValueOnce(new Error("query failed")); - try { - await client.start({ signal: abortController.signal }); + await client.start({ signal: abortController.signal }); + abortController.abort(); - expect(client.getStatus().isRunning).toBe(true); - expect(logger.error).toHaveBeenCalledWith( - "Error in scheduled update:", - expect.any(Error), - ); - } finally { - abortController.abort(); - } + expect(logger.error).toHaveBeenCalledWith( + "Error in scheduled update:", + expect.any(Error), + ); }); - it("sets isRunning to false when initialization fails", async () => { + it("rejects when initialization fails", async () => { const { client, stargateClient } = setup(); stargateClient.queryContractSmart.mockRejectedValueOnce( @@ -825,55 +781,59 @@ describe(HermesClient.name, () => { const ac = new AbortController(); try { await expect(client.start({ signal: ac.signal })).rejects.toThrow("Failed to start Hermes client"); - expect(client.getStatus().isRunning).toBe(false); } finally { ac.abort(); } }); - it("schedules periodic updates after initial update", async () => { - vi.useFakeTimers(); - const { client, stargateClient, fetch } = setup({ - updateIntervalMs: 10_000, + it("processes latest price update from stream when updates arrive faster than consumption", async () => { + const priceUpdate1 = buildPriceFeed("10000", -2, 2000); + const priceUpdate2 = buildPriceFeed("10100", -2, 3000); + const factory = vi.fn(async function* () { + yield priceUpdate1; + yield priceUpdate2; }); + const { client, stargateClient } = setup({ priceProducerFactory: factory as unknown as PriceProducerFactory }); stargateClient.queryContractSmart - .mockResolvedValueOnce({ price_feed_id: "test-feed-id" }) - .mockResolvedValueOnce({ publish_time: 1234567890 }) - .mockResolvedValueOnce({ publish_time: 1234567890 }); + .mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] }) + .mockResolvedValueOnce({ price: "9000", conf: "10", expo: -2, publish_time: 1000 }); + stargateClient.execute.mockResolvedValue({ + transactionHash: "TX", + gasUsed: 500000n, + gasWanted: 600000n, + height: 100, + events: [], + logs: [], + }); - const abortController = new AbortController(); - try { - await client.start({ signal: abortController.signal }); - expect(fetch).toHaveBeenCalledTimes(1); + const ac = new AbortController(); + await client.start({ signal: ac.signal }); + ac.abort(); - await vi.advanceTimersByTimeAsync(10000); - expect(fetch).toHaveBeenCalledTimes(2); - } finally { - abortController.abort(); - } + expect(stargateClient.execute).toHaveBeenCalledTimes(1); }); - - function callAndAbort(fn: (input: { signal: AbortSignal }) => Promise) { - const abortController = new AbortController(); - return fn({ signal: abortController.signal }).finally(() => abortController.abort()); - } }); }); function setup(input?: Partial & { - priceFeed?: HermesResponse; + priceFeed?: PriceUpdate; }) { - const fetch = vi.fn(async () => new Response(JSON.stringify(input?.priceFeed ?? { - parsed: [ - { price: { price: "123.45", conf: "0.01", expo: -8, publish_time: 1234567890 }, ema_price: { price: "123.45", conf: "0.01", expo: -8, publish_time: 1234567890 }, id: "test-id" }, - ], - binary: { - data: [btoa("some base 64 endcodable data")], + const priceUpdate: PriceUpdate = input?.priceFeed ?? { + priceData: { + id: "test-id", + price: { price: "123.45", conf: "0.01", expo: -8, publish_time: 1234567890 }, + ema_price: { price: "123.45", conf: "0.01", expo: -8, publish_time: 1234567890 }, }, - } satisfies HermesResponse))); + vaa: btoa("some base 64 endcodable data"), + }; + + const priceProducerFactory = vi.fn(async function* () { + yield priceUpdate; + }); + const stargateClient = mock({ - queryContractSmart: vi.fn(async () => ({ price_feed_id: "test-feed-id" })), + queryContractSmart: vi.fn(async () => ({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] })), }); const logger = mock(); const client = new HermesClient({ @@ -881,25 +841,35 @@ function setup(input?: Partial & { contractAddress: input?.contractAddress ?? "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu", walletSecret: input?.walletSecret ?? { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" }, gasPrice: input?.gasPrice ?? "0.025uakt", - fetch, connectWithSigner: async () => stargateClient, logger, - updateIntervalMs: input?.updateIntervalMs ?? 300_000, - hermesEndpoint: input?.hermesEndpoint ?? "https://hermes.pyth.network", unsafeAllowInsecureEndpoints: input?.unsafeAllowInsecureEndpoints, priceDeviationTolerance: input?.priceDeviationTolerance ?? { type: "absolute", value: 0 }, + priceProducerFactory: (input?.priceProducerFactory ?? priceProducerFactory) as PriceProducerFactory, + smartContractConfigCacheTTLMs: input?.smartContractConfigCacheTTLMs ?? 60_000, }); - return { client, fetch, logger, stargateClient }; + return { client, priceUpdate, priceProducerFactory, logger, stargateClient }; } -function buildPriceFeed(price: string, expo: number, publishTime: number): HermesResponse { +function buildPriceFeed(price: string, expo: number, publishTime: number): PriceUpdate { return { - parsed: [{ + priceData: { id: "test-id", price: { price, conf: "10", expo, publish_time: publishTime }, ema_price: { price, conf: "10", expo, publish_time: publishTime }, - }], - binary: { data: [btoa("vaa-data")] }, + }, + vaa: btoa("vaa-data"), }; } + +function blockingFactory(priceUpdate: PriceUpdate) { + return vi.fn(async function* ({ signal }: PriceProducerFactoryOptions) { + yield priceUpdate; + if (signal && !signal.aborted) { + await new Promise(resolve => { + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + } + }); +} diff --git a/src/hermes-client.ts b/src/hermes-client.ts index 6c171e9..81cbd66 100644 --- a/src/hermes-client.ts +++ b/src/hermes-client.ts @@ -11,7 +11,6 @@ * 4. Relays validated price to x/oracle module */ -import { performance } from "node:perf_hooks"; import { SigningCosmWasmClient } from "@cosmjs/cosmwasm-stargate"; import { DirectSecp256k1HdWallet, DirectSecp256k1Wallet, type OfflineDirectSigner } from "@cosmjs/proto-signing"; import { GasPrice } from "@cosmjs/stargate"; @@ -23,7 +22,9 @@ import { sanitizeErrorMessage, validateWalletSecret, } from "./validation.ts"; -import { priceUpdateCounter, hermesFetchDuration } from "./metrics.ts"; +import { priceUpdateCounter } from "./metrics.ts"; +import type { PriceUpdate, PythPriceData, PriceProducerFactory, Logger } from "./types.ts"; +import { latestValue } from "./price-stream/latest-value/latest-value.ts"; export interface HermesConfig { /** @@ -44,10 +45,9 @@ export interface HermesConfig { /** hex-encoded private key for wallet */ value: string; }; - hermesEndpoint: string; - updateIntervalMs: number; denom?: string; gasPrice?: string; + smartContractConfigCacheTTLMs: number; /** * Optional threshold for skipping updates when the price change is below a tolerance. * @@ -60,46 +60,22 @@ export interface HermesConfig { type: "absolute" | "percentage"; value: number; }; + /** - * `fetch` implementation to use for HTTP requests. Defaults to globalThis.fetch. + * Factory function to create a price producer (AsyncGenerator) that yields price updates. + * This allows for different implementations of price fetching logic (e.g. polling, SSE). */ - fetch?: typeof globalThis.fetch; + priceProducerFactory: PriceProducerFactory; /** * Optional logger for informational messages. Should implement log, error, and warn methods. */ - logger?: Pick; + logger?: Logger; /** * Optional custom connectWithSigner function for testing or advanced use cases. Defaults to SigningCosmWasmClient.connectWithSigner. */ connectWithSigner?: typeof SigningCosmWasmClient.connectWithSigner; } -// Pyth price data from Hermes API -interface PythPriceData { - id: string; - price: { - price: string; - conf: string; - expo: number; - publish_time: number; - }; - ema_price: { - price: string; - conf: string; - expo: number; - publish_time: number; - }; -} - -// Hermes API response with VAA binary data -export interface HermesResponse { - binary: { - // Base64 encoded VAA data array - data: string[]; - }; - parsed: PythPriceData[]; -} - // ===================== // Contract Execute Messages // Matches pyth contract msg.rs @@ -187,12 +163,13 @@ export class HermesClient { #wallet?: OfflineDirectSigner; #senderAddress?: string; readonly #config: Required>; - #priceFeedId?: string; #isRunning = false; - #updateTimer?: NodeJS.Timeout; - #fetch: Exclude; #logger: Exclude; #connectWithSigner: typeof SigningCosmWasmClient.connectWithSigner; + #smartContractConfig: { + expiresAt: number; + value?: Promise; + } = { expiresAt: 0 }; static async connect(config: HermesConfig): Promise { const client = new HermesClient(config); @@ -204,7 +181,6 @@ export class HermesClient { const unsafeAllowInsecureEndpoints = config.unsafeAllowInsecureEndpoints ?? false; validateEndpointUrl(config.rpcEndpoint, "RPC endpoint", !unsafeAllowInsecureEndpoints); - validateEndpointUrl(config.hermesEndpoint, "Hermes endpoint", !unsafeAllowInsecureEndpoints); validateWalletSecret(config.walletSecret); validateContractAddress(config.contractAddress); @@ -215,7 +191,6 @@ export class HermesClient { unsafeAllowInsecureEndpoints, priceDeviationTolerance: config.priceDeviationTolerance ?? DEFAULT_PRICE_DEVIATION_TOLERANCE, }; - this.#fetch = config.fetch ?? globalThis.fetch; this.#logger = config.logger ?? console; this.#connectWithSigner = config.connectWithSigner ?? SigningCosmWasmClient.connectWithSigner; } @@ -242,7 +217,10 @@ export class HermesClient { ); this.#logger.log("Connected to chain successfully"); - await this.#fetchPriceFeedId(); + this.#logger.log("Fetching smart contract configuration..."); + const smartContractConfig = await this.queryConfig(); + this.#logger.log(`Using Pyth Price Feed ID: ${smartContractConfig.price_feed_id}`); + this.#logger.log(`Update fee: ${smartContractConfig.update_fee} ${this.#config.denom}`); this.#logger.log("Hermes client initialized successfully"); } catch (error) { @@ -263,81 +241,6 @@ export class HermesClient { return DirectSecp256k1Wallet.fromKey(privateKeyBytes, prefix); } - /** - * Fetch the price feed ID from the contract - */ - async #fetchPriceFeedId(): Promise { - const config: ConfigResponse = await this.#getCosmClient().queryContractSmart( - this.#config.contractAddress, - { get_config: {} }, - ); - - this.#priceFeedId = config.price_feed_id; - this.#logger.log(`Using Pyth Price Feed ID: ${this.#priceFeedId}`); - this.#logger.log(`Update fee: ${config.update_fee} ${this.#config.denom}`); - } - - /** - * Fetch latest price data with VAA from Hermes - * Returns both parsed price data (for logging) and raw VAA (for contract) - */ - async #fetchPriceFromHermes(): Promise<{ - priceData: PythPriceData; - vaa: string; - }> { - if (!this.#priceFeedId) { - throw new Error("Price feed ID not loaded"); - } - - // Request base64 encoding for VAA data (compatible with CosmWasm Binary) - const params = new URLSearchParams({ - "ids[]": this.#priceFeedId, - encoding: "base64", - }); - - let response: Response; - let status = 0; - const fetchStart = performance.now(); - try { - response = await this.#fetch(`${this.#config.hermesEndpoint}/v2/updates/price/latest?${params.toString()}`); - status = response.status; - } finally { - hermesFetchDuration.record(performance.now() - fetchStart, { status }); - } - - if (!response.ok) { - const statusText = response.status ? ` (HTTP ${response.status})` : ""; - throw new Error( - `Failed to fetch from Hermes${statusText}: price data unavailable`, - ); - } - - const data = await response.json() as HermesResponse; - - if (!data.parsed || data.parsed.length === 0) { - throw new Error("No price data returned from Hermes"); - } - - if (!data.binary?.data || data.binary.data.length === 0) { - throw new Error("No VAA binary data returned from Hermes"); - } - - const priceData: PythPriceData = data.parsed[0]; - const vaa: string = data.binary.data[0]; - - this.#logger.log( - `Fetched price from Hermes: ${priceData.price.price} (expo: ${priceData.price.expo})`, - ); - this.#logger.log( - ` Confidence: ${priceData.price.conf}, Publish time: ${priceData.price.publish_time}`, - ); - this.#logger.log( - ` VAA size: ${vaa.length} bytes (base64)`, - ); - - return { priceData, vaa }; - } - #getCosmClient(): SigningCosmWasmClient { if (!this.#cosmClient) { throw new Error("Client not initialized"); @@ -373,10 +276,18 @@ export class HermesClient { * Query contract configuration */ async queryConfig(): Promise { - const config: ConfigResponse = await this.#getCosmClient().queryContractSmart( - this.#config.contractAddress, - { get_config: {} }, - ); + if (!this.#smartContractConfig.value || Date.now() > this.#smartContractConfig.expiresAt) { + this.#smartContractConfig.value = this.#getCosmClient().queryContractSmart( + this.#config.contractAddress, + { get_config: {} }, + ).catch((error) => { + this.#smartContractConfig.value = undefined; + throw error; + }); + this.#smartContractConfig.expiresAt = Date.now() + this.#config.smartContractConfigCacheTTLMs; + } + + const config = await this.#smartContractConfig.value; return config; } @@ -478,13 +389,15 @@ export class HermesClient { * 3. Send VAA to Pyth contract * 4. Contract verifies VAA via Wormhole, parses Pyth payload, relays to x/oracle */ - async updatePrice(): Promise { + async updatePrice(priceUpdate: PriceUpdate): Promise { if (!this.#senderAddress) { throw new Error("Client not initialized"); } + const startTime = performance.now(); + try { - const { priceData, vaa } = await this.#fetchPriceFromHermes(); + const { priceData, vaa } = priceUpdate; const currentPrice = await this.queryCurrentPrice(); if (this.#canIgnorePriceUpdate(priceData, currentPrice)) { @@ -504,11 +417,7 @@ export class HermesClient { }, }; - // Get config to determine update fee - const config: ConfigResponse = await this.#getCosmClient().queryContractSmart( - this.#config.contractAddress, - { get_config: {} }, - ); + const config = await this.queryConfig(); // Execute update this.#logger.log("Submitting VAA to Pyth contract..."); @@ -532,6 +441,8 @@ export class HermesClient { this.#logger.error(safeMessage); priceUpdateCounter.add(1, { result: "failure" }); throw new Error(safeMessage); + } finally { + this.#logger.log(`Price updated in ${((performance.now() - startTime) / 1000).toFixed(2)} s`); } } @@ -586,45 +497,53 @@ export class HermesClient { return; } + if (options?.signal?.aborted) return; + + const controller = new AbortController(); + const signal = options?.signal ? AbortSignal.any([options.signal, controller.signal]) : controller.signal; + // important to be set before any async operation to prevent multiple concurrent starts this.#isRunning = true; - - options?.signal?.addEventListener("abort", () => { - if (this.#updateTimer) { - clearTimeout(this.#updateTimer); - this.#updateTimer = undefined; - } + signal.addEventListener("abort", () => { this.#isRunning = false; this.#logger.log("Hermes client stopped"); }, { once: true }); try { + this.#logger.log( + "Starting automatic price consumption", + ); + if (!this.#cosmClient) { await this.initialize(); } - this.#logger.log( - `Starting automatic updates every ${this.#config.updateIntervalMs / 1000}s`, - ); - - const updatePrice = async () => { - if (!this.#isRunning) return; - try { - await this.updatePrice(); - } catch (error) { - this.#logger.error("Error in scheduled update:", error); - } finally { - this.#updateTimer = undefined; + const smartContractConfig = await this.queryConfig(); + const priceStream = this.#config.priceProducerFactory({ + priceFeedId: smartContractConfig.price_feed_id, + signal, + logger: this.#logger, + }); + const priceUpdates = latestValue({ signal }); + const consumePrices = async () => { + for await (const priceUpdate of priceStream) { + priceUpdates.set(priceUpdate); } - if (this.#isRunning) { - this.#updateTimer = setTimeout(updatePrice, this.#config.updateIntervalMs); + controller.abort(); + }; + const updatePrices = async () => { + for await (const priceUpdate of priceUpdates) { + try { + await this.updatePrice(priceUpdate); + } catch (error) { + this.#logger.error("Error in scheduled update:", error); + } } }; - // Initial update + Schedule periodic updates - await updatePrice(); + await Promise.all([consumePrices(), updatePrices()]); } catch (error) { - this.#isRunning = false; + controller.abort(); const safeMessage = sanitizeErrorMessage(error, "Failed to start Hermes client"); this.#logger.error(safeMessage); throw new Error(safeMessage); @@ -634,18 +553,20 @@ export class HermesClient { /** * Get client status */ - getStatus(): { + async getStatus(): Promise<{ isRunning: boolean; address?: string; priceFeedId?: string; contractAddress: string; - } { + }> { // SEC-08: Only return non-sensitive operational status fields. // Never include mnemonic, gasPrice, rpcEndpoint, or full config. + const smartContractConfig = await this.queryConfig(); + return { isRunning: this.#isRunning, address: this.#senderAddress, - priceFeedId: this.#priceFeedId, + priceFeedId: smartContractConfig.price_feed_id, contractAddress: this.#config.contractAddress, }; } @@ -653,7 +574,6 @@ export class HermesClient { // Export types for external use export type { - PythPriceData, DataSourceResponse, UpdatePriceFeedMsg, UpdateFeeMsg, diff --git a/src/price-stream/latest-value/latest-value.ts b/src/price-stream/latest-value/latest-value.ts new file mode 100644 index 0000000..8c66e18 --- /dev/null +++ b/src/price-stream/latest-value/latest-value.ts @@ -0,0 +1,36 @@ +export function latestValue(options?: SignalOptions): Signal { + let value: T | undefined; + let notify: (() => void) | null = null; + + return { + set(newValue) { + value = newValue; + notify?.(); + }, + async *[Symbol.asyncIterator]() { + while (!options?.signal?.aborted) { + if (value === undefined) { + const { promise, resolve } = Promise.withResolvers(); + notify = resolve; + const resolveOnAbort = () => resolve(); + options?.signal?.addEventListener("abort", resolveOnAbort, { once: true }); + await promise; + options?.signal?.removeEventListener("abort", resolveOnAbort); + if (value === undefined) continue; + } + const v = value!; + value = undefined; + yield v; + } + }, + }; +} + +export interface Signal { + set(value: T): void; + [Symbol.asyncIterator](): AsyncIterableIterator; +} + +export interface SignalOptions { + signal?: AbortSignal; +} diff --git a/src/price-stream/polling-price-stream.test.ts b/src/price-stream/polling-price-stream.test.ts new file mode 100644 index 0000000..2247341 --- /dev/null +++ b/src/price-stream/polling-price-stream.test.ts @@ -0,0 +1,220 @@ +import { describe, expect, it, vi } from "vitest"; +import type { HermesResponse } from "../types.ts"; +import { pollPriceStream, type PollPriceStreamOptions } from "./polling-price-stream.ts"; + +describe("pollPriceStream", () => { + it("throws when priceFeedId is not provided", async () => { + const options = createOptions({ priceFeedId: "" }); + const gen = pollPriceStream(options); + + await expect(gen.next()).rejects.toThrow("Price feed ID not provided"); + }); + + it("throws when baseUrl is invalid", async () => { + const options = createOptions({ baseUrl: "not-a-url", unsafeAllowInsecureEndpoints: false }); + const gen = pollPriceStream(options); + + await expect(gen.next()).rejects.toThrow("not a valid URL"); + }); + + it("throws when baseUrl is not HTTPS and insecure endpoints are not allowed", async () => { + const options = createOptions({ baseUrl: "http://example.com", unsafeAllowInsecureEndpoints: false }); + const gen = pollPriceStream(options); + + await expect(gen.next()).rejects.toThrow("only HTTPS endpoints are allowed"); + }); + + it("yields price update on successful response", async () => { + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse(data)), + }); + + const gen = pollPriceStream(options); + const result = await gen.next(); + + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("constructs the correct URL with query params", async () => { + const data = createHermesResponse(); + const fetchMock = vi.fn().mockResolvedValueOnce(mockFetchResponse(data)); + const options = createOptions({ fetch: fetchMock, priceFeedId: "feed-xyz" }); + + const gen = pollPriceStream(options); + await gen.next(); + + const calledUrl = fetchMock.mock.calls[0][0] as string; + expect(calledUrl).toContain("/v2/updates/price/latest?"); + expect(calledUrl).toContain("ids%5B%5D=feed-xyz"); + expect(calledUrl).toContain("encoding=base64"); + }); + + it("logs error and retries on non-ok response", async () => { + const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; + const data = createHermesResponse(); + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse(data, 500)) + .mockResolvedValueOnce(mockFetchResponse(data)); + + const options = createOptions({ fetch: fetchMock, logger }); + const gen = pollPriceStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("Failed to fetch from Hermes"), + ); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining("HTTP 500"), + ); + expect(result.value).toEqual({ + priceData: data.parsed[0], + vaa: data.binary.data[0], + }); + }); + + it("logs error and retries when parsed data is empty", async () => { + const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; + const goodData = createHermesResponse(); + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse(createHermesResponse({ parsed: [] }))) + .mockResolvedValueOnce(mockFetchResponse(goodData)); + + const options = createOptions({ fetch: fetchMock, logger }); + const gen = pollPriceStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith("No price data returned from Hermes"); + expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] }); + }); + + it("logs error and retries when binary data is empty", async () => { + const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; + const goodData = createHermesResponse(); + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse(createHermesResponse({ binary: { data: [] } }))) + .mockResolvedValueOnce(mockFetchResponse(goodData)); + + const options = createOptions({ fetch: fetchMock, logger }); + const gen = pollPriceStream(options); + const result = await gen.next(); + + expect(logger.error).toHaveBeenCalledWith("No VAA binary data returned from Hermes"); + expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] }); + }); + + it("logs price details on successful fetch", async () => { + const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() }; + const data = createHermesResponse(); + const options = createOptions({ + fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse(data)), + logger, + }); + + const gen = pollPriceStream(options); + await gen.next(); + + expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Fetched price from Hermes: 1000")); + expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Confidence: 10")); + expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("VAA size:")); + }); + + it("polls repeatedly yielding updates", async () => { + const data1 = createHermesResponse(); + const data2 = createHermesResponse({ + parsed: [{ + id: "abc123", + price: { price: "2000", conf: "20", expo: -8, publish_time: 1700000001 }, + ema_price: { price: "1999", conf: "21", expo: -8, publish_time: 1700000001 }, + }], + binary: { data: ["BAUG"] }, + }); + + const fetchMock = vi.fn() + .mockResolvedValueOnce(mockFetchResponse(data1)) + .mockResolvedValueOnce(mockFetchResponse(data2)); + + const options = createOptions({ fetch: fetchMock, pollingIntervalMs: 10 }); + const gen = pollPriceStream(options); + + const first = await gen.next(); + const second = await gen.next(); + + expect(first.value).toEqual({ priceData: data1.parsed[0], vaa: data1.binary.data[0] }); + expect(second.value).toEqual({ priceData: data2.parsed[0], vaa: data2.binary.data[0] }); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it("stops when signal is aborted during polling delay", async () => { + const controller = new AbortController(); + const data = createHermesResponse(); + const fetchMock = vi.fn().mockResolvedValue(mockFetchResponse(data)); + const options = createOptions({ + fetch: fetchMock, + pollingIntervalMs: 60_000, + signal: controller.signal, + }); + + const gen = pollPriceStream(options); + const first = await gen.next(); + expect(first.done).toBe(false); + + // Abort after gen.next() resumes and enters the delay + const secondPromise = gen.next(); + controller.abort(); + + const second = await secondPromise; + expect(second.done).toBe(true); + expect(second.value).toBeUndefined(); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it("stops immediately when signal is already aborted", async () => { + const fetchMock = vi.fn(); + const options = createOptions({ + fetch: fetchMock, + signal: AbortSignal.abort(), + }); + + const gen = pollPriceStream(options); + const result = await gen.next(); + + expect(result.done).toBe(true); + expect(result.value).toBeUndefined(); + expect(fetchMock).not.toHaveBeenCalled(); + }); +}); + +function createHermesResponse(overrides?: Partial): HermesResponse { + return { + parsed: [{ + id: "abc123", + price: { price: "1000", conf: "10", expo: -8, publish_time: 1700000000 }, + ema_price: { price: "999", conf: "11", expo: -8, publish_time: 1700000000 }, + }], + binary: { data: ["AQID"] }, + ...overrides, + }; +} + +function createOptions(overrides?: Partial): PollPriceStreamOptions { + return { + priceFeedId: "abc123", + baseUrl: "http://localhost:4000", + pollingIntervalMs: 100, + unsafeAllowInsecureEndpoints: true, + fetch: vi.fn(), + ...overrides, + }; +} + +function mockFetchResponse(data: HermesResponse, status = 200): Response { + return { + ok: status >= 200 && status < 300, + status, + json: () => Promise.resolve(data), + } as unknown as Response; +} diff --git a/src/price-stream/polling-price-stream.ts b/src/price-stream/polling-price-stream.ts new file mode 100644 index 0000000..d8a7fd3 --- /dev/null +++ b/src/price-stream/polling-price-stream.ts @@ -0,0 +1,140 @@ +import http from "node:http"; +import https from "node:https"; +import { performance } from "node:perf_hooks"; +import { Readable } from "node:stream"; +import { setTimeout as delay } from "node:timers/promises"; +import { hermesFetchDuration } from "../metrics.ts"; +import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate, PythPriceData } from "../types.ts"; +import { validateEndpointUrl } from "../validation.ts"; + +export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGenerator { + if (!options.priceFeedId) { + throw new Error("Price feed ID not provided to PollPriceStream"); + } + + validateEndpointUrl(options.baseUrl, "Hermes endpoint", !options.unsafeAllowInsecureEndpoints); + + // Request base64 encoding for VAA data (compatible with CosmWasm Binary) + const params = new URLSearchParams({ + "ids[]": options.priceFeedId, + encoding: "base64", + }); + const fetch = options.fetch ?? createFetch(); + + let response: Response | undefined; + let status = 0; + while (!options.signal?.aborted) { + const fetchStart = performance.now(); + response = undefined; + try { + response = await fetch(`${options.baseUrl}/v2/updates/price/latest?${params.toString()}`); + status = response.status; + } catch (error) { + if (error instanceof Error && (error.name === "AbortError" || error.message === "AbortError")) { + break; + } + options.logger?.error(`Error fetching from Hermes: ${(error as Error).message}`); + continue; + } finally { + hermesFetchDuration.record(performance.now() - fetchStart, { status }); + console.log(`Fetch from Hermes completed with status ${status} in ${performance.now() - fetchStart} ms`); + } + + if (!response.ok) { + const statusText = response.status ? ` (HTTP ${response.status})` : ""; + options.logger?.error( + `Failed to fetch from Hermes${statusText}: price data unavailable`, + ); + continue; + } + + const data = await response.json() as HermesResponse; + + if (!data.parsed || data.parsed.length === 0) { + options.logger?.error("No price data returned from Hermes"); + continue; + } + + if (!data.binary?.data || data.binary.data.length === 0) { + options.logger?.error("No VAA binary data returned from Hermes"); + continue; + } + + const priceData: PythPriceData = data.parsed[0]; + const vaa: string = data.binary.data[0]; + + options.logger?.log( + `Fetched price from Hermes: ${priceData.price.price} (expo: ${priceData.price.expo})`, + ); + options.logger?.log( + ` Confidence: ${priceData.price.conf}, Publish time: ${priceData.price.publish_time}`, + ); + options.logger?.log( + ` VAA size: ${vaa.length} bytes (base64)`, + ); + + yield { priceData, vaa }; + if (options.pollingIntervalMs > 0) { + await delay(options.pollingIntervalMs, undefined, { signal: options.signal }) + .catch((error) => options.logger?.warn(`Polling delay interrupted: ${(error as Error).message}`)); + } + } +} + +export interface PollPriceStreamOptions extends PriceProducerFactoryOptions { + baseUrl: string; + pollingIntervalMs: number; + unsafeAllowInsecureEndpoints?: boolean; + fetch?: typeof globalThis.fetch; +} + +function createFetch() { + // Agent is created to enable TLS session resumption + const agent = new https.Agent({ keepAlive: true }); + + return function fetch(url: string, options?: RequestInit): Promise { + return new Promise((resolve, reject) => { + if (options?.signal?.aborted) { + reject(createAbortError()); + return; + } + + const parsed = new URL(url); + const isHttps = parsed.protocol === "https:"; + const mod = isHttps ? https : http; + const requestOptions: https.RequestOptions = { + method: "GET", + hostname: parsed.hostname, + port: parsed.port || (isHttps ? 443 : 80), + path: `${parsed.pathname}${parsed.search}`, + headers: { + accept: "application/json", + }, + agent: isHttps ? agent : undefined, + }; + + const req = mod.request(requestOptions, (res: http.IncomingMessage) => { + options?.signal?.removeEventListener("abort", destroyRequest); + resolve(new Response(Readable.toWeb(res) as ReadableStream, { + status: res.statusCode ?? 0, + statusText: res.statusMessage ?? "", + headers: res.headers as Record, + })); + }); + const destroyRequest = () => { + req.destroy(); + reject(createAbortError()); + }; + options?.signal?.addEventListener("abort", destroyRequest, { once: true }); + + req.on("error", reject); + req.end(); + }); + }; +} + +function createAbortError() { + const error = new Error("AbortError"); + error.name = "AbortError"; + return error; +} diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..e9be847 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,39 @@ +// Pyth price data from Hermes API +export interface PythPriceData { + id: string; + price: { + price: string; + conf: string; + expo: number; + publish_time: number; + }; + ema_price: { + price: string; + conf: string; + expo: number; + publish_time: number; + }; +} + +export interface PriceUpdate { + priceData: PythPriceData; + vaa: string; +} + +export type PriceProducerFactory = (options: PriceProducerFactoryOptions) => AsyncGenerator; +export interface PriceProducerFactoryOptions { + priceFeedId: string; + signal?: AbortSignal; + logger?: Logger; +} + +export type Logger = Pick; + +// Hermes API response with VAA binary data +export interface HermesResponse { + binary: { + // Base64 encoded VAA data array + data: string[]; + }; + parsed: PythPriceData[]; +} diff --git a/tsconfig.json b/tsconfig.json index 0ebd8de..de6cd39 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,7 +2,7 @@ "compilerOptions": { "target": "ES2022", "module": "esnext", - "lib": ["ES2022"], + "lib": ["ESNext"], "outDir": "./dist", "rootDir": "./src", "strict": true,