Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}'
| `WALLET_SECRET` | Yes | - | Either `privateKey:<private key in hex format>` or `mnemonic:<12/24 words>` |
| `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API |
| `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) |
| `UPDATE_INTERVAL_MS` | No | `300000` | Update interval (5 min) |
| `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) |
| `GAS_PRICE` | No | `0.025uakt` | Gas price |
| `DENOM` | No | `uakt` | Token denomination |
| `HEALTHCHECK_PORT` | No | 3000 | healthcheck server port |
| `OTEL_RESOURCE_ATTRIBUTES` | No | <empty> | additional attributes attached to all metrics (e.g., `service.name=hermes,service.version=1.1.0,deployment.environment=production`) |
| `SMART_CONTRACT_CONFIG_CACHE_TTL_MS` | No | 3600000 (1h) | smart contract config cache ttl in milliseconds |

### Instrumentation

Expand Down
16 changes: 8 additions & 8 deletions src/cli-commands/command-config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,32 @@ describe("parseConfig", () => {
expect((result as Extract<typeof result, { ok: true }>).value.rpcEndpoint).toBe("https://custom-rpc:443");
});

it("passes HERMES_ENDPOINT to config", () => {
it("accepts HERMES_ENDPOINT and produces priceProducerFactory", () => {
const result = parseConfig(validEnv({ HERMES_ENDPOINT: "https://hermes.example.com" }));

expect(result.ok).toBe(true);
expect((result as Extract<typeof result, { ok: true }>).value.hermesEndpoint).toBe("https://hermes.example.com");
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
});

it("uses default hermesEndpoint when HERMES_ENDPOINT is not provided", () => {
it("uses default HERMES_ENDPOINT and produces priceProducerFactory", () => {
const result = parseConfig(validEnv());

expect(result.ok).toBe(true);
expect((result as Extract<typeof result, { ok: true }>).value.hermesEndpoint).toBe("https://hermes.pyth.network");
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
});

it("parses UPDATE_INTERVAL_MS as integer", () => {
it("accepts UPDATE_INTERVAL_MS and produces priceProducerFactory", () => {
const result = parseConfig(validEnv({ UPDATE_INTERVAL_MS: "5000" }));

expect(result.ok).toBe(true);
expect((result as Extract<typeof result, { ok: true }>).value.updateIntervalMs).toBe(5000);
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
});

it("uses default updateIntervalMs when UPDATE_INTERVAL_MS is not provided", () => {
it("uses default UPDATE_INTERVAL_MS and produces priceProducerFactory", () => {
const result = parseConfig(validEnv());

expect(result.ok).toBe(true);
expect((result as Extract<typeof result, { ok: true }>).value.updateIntervalMs).toBe(5 * 60 * 1000);
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
});

it("returns error when UPDATE_INTERVAL_MS is not a valid integer", () => {
Expand Down
21 changes: 17 additions & 4 deletions src/cli-commands/command-config.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { z } from "zod";
import { HermesClient, type HermesConfig } from "../hermes-client.ts";
import { validateContractAddress, validateWalletSecret } from "../validation.ts";
import type { PriceProducerFactoryOptions } from "../types.ts";
import { pollPriceStream } from "../price-stream/polling-price-stream.ts";

export interface CommandConfig extends HermesConfig {
createHermesClient: (config: HermesConfig) => Promise<HermesClient>;
signal: AbortSignal;
healthcheckPort: number;
rawConfig: z.infer<typeof configSchema>;
}

const configSchema = z.object({
Expand Down Expand Up @@ -38,11 +41,12 @@ const configSchema = z.object({
});
}
}).optional(),
UPDATE_INTERVAL_MS: z.coerce.number().int().min(1000).positive().default(5 * 60 * 1000), // Default to 5 minutes
UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds
HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000),
GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"),
DENOM: z.string().default("uakt"),
NODE_ENV: z.enum(["development", "production"]).optional(),
SMART_CONTRACT_CONFIG_CACHE_TTL_MS: z.coerce.number().int().min(1000).positive().default(60 * 60 * 1000),
});

type ParsedConfig = Omit<CommandConfig, "signal" | "logger">;
Expand All @@ -54,17 +58,26 @@ export function parseConfig(config: Record<string, string | undefined>): ParseCo
return { ok: false, error: z.prettifyError(result.error) };
}

const unsafeAllowInsecureEndpoints = result.data.NODE_ENV === "development"; // Enforce secure endpoints in production
const parsedConfig: ParsedConfig = {
unsafeAllowInsecureEndpoints: result.data.NODE_ENV === "development", // Enforce secure endpoints in production
rawConfig: result.data,
unsafeAllowInsecureEndpoints,
rpcEndpoint: result.data.RPC_ENDPOINT,
hermesEndpoint: result.data.HERMES_ENDPOINT,
contractAddress: result.data.CONTRACT_ADDRESS,
walletSecret: result.data.WALLET_SECRET,
updateIntervalMs: result.data.UPDATE_INTERVAL_MS,
healthcheckPort: result.data.HEALTHCHECK_PORT,
gasPrice: result.data.GAS_PRICE,
denom: result.data.DENOM,
priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE,
smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS,
priceProducerFactory(options: PriceProducerFactoryOptions) {
return pollPriceStream({
...options,
unsafeAllowInsecureEndpoints,
baseUrl: result.data.HERMES_ENDPOINT,
pollingIntervalMs: result.data.UPDATE_INTERVAL_MS,
});
},
createHermesClient: (cfg: HermesConfig) => HermesClient.connect(cfg),
};

Expand Down
26 changes: 18 additions & 8 deletions src/cli-commands/daemon-command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ describe("daemonCommand", () => {
const promise = daemonCommand(config);
await waitForServer(logger);

const reponse = await fetch(`http://localhost:${config.healthcheckPort}/health`);
const port = getServerPort(logger);
const reponse = await fetch(`http://localhost:${port}/health`);
expect(reponse.status).toBe(200);
expect(client.getStatus).toHaveBeenCalled();
expect(logger.log).toHaveBeenCalledWith(
`Health check endpoint available at http://localhost:${config.healthcheckPort}/health`,
expect.stringMatching(/Health check endpoint available at http:\/\/localhost:\d+\/health/),
);
expect(logger.log).toHaveBeenCalledWith("Daemon started. Press Ctrl+C to stop.\n");

abortController.abort();
await promise;
Expand All @@ -57,7 +57,8 @@ describe("daemonCommand", () => {
const promise = daemonCommand(config);
await waitForServer(logger);

const response = await fetch(`http://localhost:${config.healthcheckPort}/invalid`);
const port = getServerPort(logger);
const response = await fetch(`http://localhost:${port}/invalid`);
expect(response.status).toBe(404);

abortController.abort();
Expand All @@ -74,7 +75,6 @@ describe("daemonCommand", () => {

expect(logger.log).toHaveBeenCalledWith("\n\nShutting down daemon...");
expect(logger.log).toHaveBeenCalledWith("\nStopping health check server...");
expect(logger.log).toHaveBeenCalledWith("Health check server stopped");
});

it("stops server immediately if signal is already aborted on startup", async () => {
Expand All @@ -91,21 +91,31 @@ describe("daemonCommand", () => {
});
}

function getServerPort(logger: Console): number {
const calls = (logger.log as ReturnType<typeof vi.fn>).mock.calls;
const call = calls.find((c: unknown[]) => typeof c[0] === "string" && /localhost:\d+/.test(c[0] as string));
const match = (call![0] as string).match(/localhost:(\d+)/);
return parseInt(match![1], 10);
}

let testAbortController: AbortController | null = null;
function setup() {
const client = mock<HermesClient>();
client.getStatus.mockReturnValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" });
client.getStatus.mockResolvedValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" });
const logger = mock<Console>();
const abortController = new AbortController();
testAbortController = abortController;
const config: CommandConfig = {
rpcEndpoint: "https://rpc.akashnet.net:443",
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" },
priceProducerFactory: vi.fn(async function* () {}) as unknown as CommandConfig["priceProducerFactory"],
logger,
signal: abortController.signal,
healthcheckPort: 3001,
healthcheckPort: 0,
createHermesClient: vi.fn(() => Promise.resolve(client)),
smartContractConfigCacheTTLMs: 0,
rawConfig: {} as CommandConfig["rawConfig"],
};
return { config, client, logger, abortController };
}
Expand Down
53 changes: 23 additions & 30 deletions src/cli-commands/daemon-command.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import http from "node:http";
import { once } from "node:events";
import type { AddressInfo } from "node:net";
import { prometheusExporter } from "../instrumentation/prometheus-exporter.ts";
import type { CommandConfig } from "./command-config.ts";

export async function daemonCommand(config: CommandConfig): Promise<void> {
if (config.signal.aborted) return;

config.logger?.log("Starting daemon mode...\n");

const client = await config.createHermesClient(config);
const server = http.createServer((req, res) => {
if (req.method === "GET" && req.url === "/health") {
const status = client.getStatus();
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(status));
client.getStatus()
.then((status) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(status));
})
.catch((error) => {
config.logger?.log(`Error fetching health status: ${error.message}`);
res.writeHead(500, { "Content-Type": "application/json" });
res.end();
});
} else if (req.method === "GET" && req.url === "/metrics") {
prometheusExporter.getMetricsRequestHandler(req, res);
} else {
Expand All @@ -23,33 +31,18 @@ export async function daemonCommand(config: CommandConfig): Promise<void> {
const abort = () => {
config.logger?.log("\n\nShutting down daemon...");
config.logger?.log("\nStopping health check server...");
return new Promise<void>((resolve) => {
server.close((err) => {
if (err) {
config.logger?.log(`Error stopping health check server: ${err.message}`);
}
resolve();
config.logger?.log("Health check server stopped");
});
});
};
config.signal.addEventListener("abort", abort, { once: true });
await client.start({ signal: config.signal });
await new Promise<void>((resolve, reject) => {
if (config.signal.aborted) return resolve();
server.once("error", reject);
server.listen(config.healthcheckPort, () => {
resolve();
server.off("error", reject);
if (!config.signal.aborted) {
await Promise.all([
client.start({ signal: config.signal }),
new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen({ port: config.healthcheckPort, signal: config.signal }, () => {
config.logger?.log(`Health check endpoint available at http://localhost:${(server.address() as AddressInfo).port}/health`);
}
});
});
if (config.signal.aborted && server.listening) {
await abort();
} else if (server.listening) {
config.logger?.log("Daemon started. Press Ctrl+C to stop.\n");
await once(server, "close");
}
server.off("error", reject);
server.once("close", resolve);
});
}),
]);
config.signal.removeEventListener("abort", abort);
}
21 changes: 10 additions & 11 deletions src/cli-commands/status-command.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import EventEmitter from "node:events";
import { describe, it, expect, vi } from "vitest";
import { mock } from "vitest-mock-extended";
import type { HermesClient } from "../hermes-client.ts";
Expand All @@ -8,22 +7,22 @@ import { statusCommand } from "./status-command.ts";
function setup() {
const client = mock<HermesClient>();
const logger = mock<Console>();
const config: CommandConfig = {
const config = {
rpcEndpoint: "https://rpc.akashnet.net:443",
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
hermesEndpoint: "https://hermes.pyth.network",
rawConfig: {
HERMES_ENDPOINT: "https://hermes.pyth.network",
},
logger,
process: new EventEmitter(),
createHermesClient: vi.fn(() => Promise.resolve(client)),
};
} as unknown as CommandConfig;
return { config, client, logger };
}

describe("statusCommand", () => {
it("displays client status information", async () => {
const { config, client, logger } = setup();
client.getStatus.mockReturnValueOnce({
client.getStatus.mockResolvedValueOnce({
address: "akash1sender",
contractAddress: "akash1contract",
priceFeedId: "feed-123",
Expand All @@ -41,7 +40,7 @@ describe("statusCommand", () => {

it("displays running status as yes when client is running", async () => {
const { config, client, logger } = setup();
client.getStatus.mockReturnValueOnce({
client.getStatus.mockResolvedValueOnce({
address: "akash1sender",
contractAddress: "akash1contract",
priceFeedId: "feed-123",
Expand All @@ -55,7 +54,7 @@ describe("statusCommand", () => {

it("displays RPC and Hermes endpoints from config", async () => {
const { config, client, logger } = setup();
client.getStatus.mockReturnValueOnce({
client.getStatus.mockResolvedValueOnce({
address: "akash1sender",
contractAddress: "akash1contract",
priceFeedId: "feed-123",
Expand All @@ -70,8 +69,8 @@ describe("statusCommand", () => {

it("uses default Hermes endpoint when not configured", async () => {
const { config, client, logger } = setup();
delete config.hermesEndpoint;
client.getStatus.mockReturnValueOnce({
(config.rawConfig as Record<string, unknown>).HERMES_ENDPOINT = "https://hermes.pyth.network";
client.getStatus.mockResolvedValueOnce({
address: "akash1sender",
contractAddress: "akash1contract",
priceFeedId: "feed-123",
Expand Down
4 changes: 2 additions & 2 deletions src/cli-commands/status-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export async function statusCommand(config: CommandConfig): Promise<void> {
config.logger?.log("Contract Status...\n");

const client = await config.createHermesClient(config);
const status = client.getStatus();
const status = await client.getStatus();

config.logger?.log("Client Status:");
config.logger?.log("─────────────────────────────");
Expand All @@ -13,5 +13,5 @@ export async function statusCommand(config: CommandConfig): Promise<void> {
config.logger?.log(`Price Feed ID: ${status.priceFeedId}`);
config.logger?.log(`Running: ${status.isRunning ? "yes" : "no"}`);
config.logger?.log(`RPC Endpoint: ${config.rpcEndpoint}`);
config.logger?.log(`Hermes Endpoint: ${config.hermesEndpoint || "https://hermes.pyth.network"}`);
config.logger?.log(`Hermes Endpoint: ${config.rawConfig.HERMES_ENDPOINT}`);
}
36 changes: 31 additions & 5 deletions src/cli-commands/update-command.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
import EventEmitter from "node:events";
import { describe, it, expect, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { mock } from "vitest-mock-extended";
import type { HermesClient } from "../hermes-client.ts";
import type { PriceUpdate } from "../types.ts";
import type { CommandConfig } from "./command-config.ts";
import { updateCommand } from "./update-command.ts";

const fakePriceUpdate: PriceUpdate = {
priceData: {
id: "test-feed-id",
price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
ema_price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
},
vaa: "dGVzdC12YWE=",
};

async function* fakePriceProducer(): AsyncGenerator<PriceUpdate, void, unknown> {
yield fakePriceUpdate;
}

function setup() {
const client = mock<HermesClient>();
client.queryConfig.mockResolvedValue({
admin: "akash1admin",
wormhole_contract: "akash1wormhole",
update_fee: "1",
price_feed_id: "test-feed-id",
default_denom: "uakt",
default_base_denom: "uakt",
data_sources: [],
});
const logger = mock<Console>();
const config: CommandConfig = {
rpcEndpoint: "https://rpc.akashnet.net:443",
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" },
logger,
process: new EventEmitter(),
signal: AbortSignal.abort(),
healthcheckPort: 3000,
rawConfig: {} as CommandConfig["rawConfig"],
smartContractConfigCacheTTLMs: 60000,
priceProducerFactory: vi.fn(() => fakePriceProducer()),
createHermesClient: vi.fn(() => Promise.resolve(client)),
};
return { config, client, logger };
Expand All @@ -33,7 +59,7 @@ describe("updateCommand", () => {
await updateCommand(config);

expect(config.createHermesClient).toHaveBeenCalledWith(config);
expect(client.updatePrice).toHaveBeenCalledOnce();
expect(client.updatePrice).toHaveBeenCalledWith(fakePriceUpdate);
});

it("propagates errors from updatePrice", async () => {
Expand Down
Loading
Loading