From 94489af476725767b5c196000b329d9e4e448605 Mon Sep 17 00:00:00 2001 From: v0l Date: Tue, 16 Jun 2026 10:32:48 +0100 Subject: [PATCH] feat: add lnvps_agent support bot with email (IMAP IDLE) and Nostr kind1 channels AI-powered support agent using an OpenAI-compatible backend with tool-calling capabilities scoped per customer (list VMs, payments, history, extend/refund/delete). Support channels: - Email: IMAP IDLE for push-based inbox monitoring, SMTP replies - Kind1: real-time Nostr mention monitoring via nostr-sdk subscriptions, NIP-10 reply threading Shared nsec key used for NIP-98 API auth and Nostr channel operations. Conversation history persisted as JSON with LLM-powered compaction. --- .gitignore | 4 + Cargo.lock | 393 +++++++++++++++- Cargo.toml | 1 + lnvps_agent/Cargo.toml | 36 ++ lnvps_agent/Dockerfile | 36 ++ lnvps_agent/settings.example.yaml | 54 +++ lnvps_agent/src/agent.rs | 724 ++++++++++++++++++++++++++++++ lnvps_agent/src/api_client.rs | 434 ++++++++++++++++++ lnvps_agent/src/channel/email.rs | 558 +++++++++++++++++++++++ lnvps_agent/src/channel/kind1.rs | 265 +++++++++++ lnvps_agent/src/channel/mod.rs | 52 +++ lnvps_agent/src/conversation.rs | 319 +++++++++++++ lnvps_agent/src/lib.rs | 7 + lnvps_agent/src/main.rs | 61 +++ lnvps_agent/src/nip98.rs | 41 ++ lnvps_agent/src/settings.rs | 148 ++++++ lnvps_agent/src/tools/mod.rs | 179 ++++++++ 17 files changed, 3304 insertions(+), 8 deletions(-) create mode 100644 lnvps_agent/Cargo.toml create mode 100644 lnvps_agent/Dockerfile create mode 100644 lnvps_agent/settings.example.yaml create mode 100644 lnvps_agent/src/agent.rs create mode 100644 lnvps_agent/src/api_client.rs create mode 100644 lnvps_agent/src/channel/email.rs create mode 100644 lnvps_agent/src/channel/kind1.rs create mode 100644 lnvps_agent/src/channel/mod.rs create mode 100644 lnvps_agent/src/conversation.rs create mode 100644 lnvps_agent/src/lib.rs create mode 100644 lnvps_agent/src/main.rs create mode 100644 lnvps_agent/src/nip98.rs create mode 100644 lnvps_agent/src/settings.rs create mode 100644 lnvps_agent/src/tools/mod.rs diff --git a/.gitignore b/.gitignore index 6701a8b8..76a5b0d6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ *.config.yaml **/*.log *.key + +# Agent secrets and runtime data +lnvps_agent/settings.yaml +lnvps_agent/conversation_history/ diff --git a/Cargo.lock b/Cargo.lock index aec65cfb..82550c18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "aead" version = "0.5.2" @@ -173,12 +179,107 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "435a87a52755b8f27fcf321ac4f04b2802e337c8c4872923137471ec39c37532" dependencies = [ - "event-listener", + "event-listener 5.4.1", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", "event-listener-strategy", "futures-core", "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "async-imap" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a78dceaba06f029d8f4d7df20addd4b7370a30206e3926267ecda2915b0f3f66" +dependencies = [ + "async-channel 2.5.0", + "async-compression", + "base64 0.22.1", + "bytes", + "chrono", + "futures", + "imap-proto", + "log 0.4.29", + "nom 7.1.3", + "pin-project", + "pin-utils", + "self_cell", + "stop-token", + "thiserror 1.0.69", + "tokio", +] + +[[package]] +name = "async-openai" +version = "0.28.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4df839a6643e1e3248733b01f229dc4f462d7256f808bbaf04cac40739b345c2" +dependencies = [ + "async-openai-macros", + "backoff", + "base64 0.22.1", + "bytes", + "derive_builder", + "eventsource-stream", + "futures", + "rand 0.8.5", + "reqwest 0.12.28", + "reqwest-eventsource", + "secrecy", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + +[[package]] +name = "async-openai-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81872a8e595e8ceceab71c6ba1f9078e313b452a1e31934e6763ef5d308705e4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -359,6 +460,20 @@ dependencies = [ "syn", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.17", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backon" version = "1.6.0" @@ -693,6 +808,22 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "compression-codecs" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" +dependencies = [ + "compression-core", + "flate2", +] + +[[package]] +name = "compression-core" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -813,6 +944,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -960,6 +1100,37 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "derive_more" version = "2.1.1" @@ -1206,6 +1377,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "event-listener" version = "5.4.1" @@ -1223,7 +1400,18 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" dependencies = [ - "event-listener", + "event-listener 5.4.1", + "pin-project-lite", +] + +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom 7.1.3", "pin-project-lite", ] @@ -1274,6 +1462,16 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "flate2" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -1409,6 +1607,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af43fadb8a98512d547e37b4e92e0ced13e205c061b87b4623eff01d918d6968" + [[package]] name = "futures-util" version = "0.3.31" @@ -1995,6 +2199,15 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "imap-proto" +version = "0.16.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f6af35c6a517aea5c72314abe90134980d2ae6a763809b50c208b3e429d71f" +dependencies = [ + "nom 7.1.3", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -2361,7 +2574,7 @@ dependencies = [ "idna", "mime", "native-tls", - "nom", + "nom 8.0.0", "percent-encoding", "quoted_printable", "rustls", @@ -2482,6 +2695,32 @@ dependencies = [ "url", ] +[[package]] +name = "lnvps_agent" +version = "0.2.0" +dependencies = [ + "anyhow", + "async-imap", + "async-openai", + "async-trait", + "base64 0.22.1", + "chrono", + "config", + "env_logger", + "futures", + "lettre", + "log 0.4.29", + "native-tls", + "nostr 0.44.2", + "nostr-sdk", + "reqwest 0.13.2", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-native-tls", +] + [[package]] name = "lnvps_api" version = "0.2.0" @@ -2515,7 +2754,7 @@ dependencies = [ "payments-rs", "quick-xml", "rand 0.9.2", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "ssh-key", @@ -2576,7 +2815,7 @@ dependencies = [ "payments-rs", "rand 0.9.2", "redis", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "tokio", @@ -2615,7 +2854,7 @@ dependencies = [ "chrono", "hex", "nostr 0.44.2", - "reqwest", + "reqwest 0.13.2", "serde", "serde_json", "sqlx", @@ -2739,6 +2978,32 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -2806,6 +3071,16 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0efe882e02d206d8d279c20eb40e03baf7cb5136a1476dc084a324fbc3ec42d" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nom" version = "8.0.0" @@ -3215,7 +3490,7 @@ dependencies = [ "hmac", "lightning-invoice", "log 0.4.29", - "reqwest", + "reqwest 0.13.2", "rustls", "serde", "serde_html_form", @@ -3772,6 +4047,48 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log 0.4.29", + "mime_guess", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs 0.8.3", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", +] + [[package]] name = "reqwest" version = "0.13.2" @@ -3812,6 +4129,22 @@ dependencies = [ "web-sys", ] +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom 7.1.3", + "pin-project-lite", + "reqwest 0.12.28", + "thiserror 1.0.69", +] + [[package]] name = "resolv-conf" version = "0.7.6" @@ -4145,6 +4478,7 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" dependencies = [ + "serde", "zeroize", ] @@ -4184,6 +4518,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" + [[package]] name = "semver" version = "1.0.27" @@ -4378,6 +4718,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" + [[package]] name = "slab" version = "0.4.12" @@ -4457,7 +4803,7 @@ dependencies = [ "crc", "crossbeam-queue", "either", - "event-listener", + "event-listener 5.4.1", "futures-core", "futures-intrusive", "futures-io", @@ -4696,6 +5042,18 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "stop-token" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af91f480ee899ab2d9f8435bfdfc14d08a5754bd9d3fef1f1a1c23336aad6c8b" +dependencies = [ + "async-channel 1.9.0", + "cfg-if", + "futures-core", + "pin-project-lite", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -5208,6 +5566,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -5479,6 +5843,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/Cargo.toml b/Cargo.toml index 815a9926..51a7e612 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "lnvps_health", "try-procedure", "lnvps_e2e", + "lnvps_agent", ] [workspace.package] diff --git a/lnvps_agent/Cargo.toml b/lnvps_agent/Cargo.toml new file mode 100644 index 00000000..53057c33 --- /dev/null +++ b/lnvps_agent/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "lnvps_agent" +version.workspace = true +edition.workspace = true +description = "AI support agent for LNVPS — processes support requests using an async-openai agent with API-calling tools" + +[[bin]] +name = "lnvps_agent" +path = "src/main.rs" + +[dependencies] +tokio = { workspace = true, features = ["signal", "time"] } +anyhow.workspace = true +log.workspace = true +env_logger.workspace = true +serde = { workspace = true } +serde_json.workspace = true +config = { workspace = true } +async-trait.workspace = true +reqwest = { workspace = true, features = ["json"] } +futures = { workspace = true } + +async-openai = "0.28" +chrono = { version = "0.4", features = ["serde"] } +nostr = { version = "0.44", default-features = false, features = ["std"] } +nostr-sdk = { rev = "c69c4ae2bf1a5f340aba70c20d09323f5309a4e2", git = "https://github.com/rust-nostr/nostr", default-features = false } +base64 = "0.22" + +# Email support channel (IMAP poll + SMTP reply) +lettre = { version = "0.11", default-features = false, features = ["tokio1-native-tls", "smtp-transport", "builder"] } +async-imap = { version = "0.11", default-features = false, features = ["runtime-tokio"] } +tokio-native-tls = "0.3" +native-tls = "0.2" + +[dev-dependencies] +tempfile = "3" diff --git a/lnvps_agent/Dockerfile b/lnvps_agent/Dockerfile new file mode 100644 index 00000000..f57c84ac --- /dev/null +++ b/lnvps_agent/Dockerfile @@ -0,0 +1,36 @@ +# Build stage +FROM rust:1.94-slim-bookworm AS builder + +RUN apt-get update && apt-get install -y pkg-config libssl-dev && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Copy workspace manifests +COPY Cargo.toml Cargo.lock ./ +COPY lnvps_agent/Cargo.toml lnvps_agent/ +COPY lnvps_api_common/Cargo.toml lnvps_api_common/ + +# Build dependencies (cached layer) +RUN mkdir -p lnvps_agent/src && \ + echo 'fn main() {}' > lnvps_agent/src/main.rs && \ + mkdir -p lnvps_api_common/src && \ + echo 'fn main() {}' > lnvps_api_common/src/lib.rs && \ + cargo build --release -p lnvps_agent && \ + rm -rf lnvps_agent/src lnvps_api_common/src + +# Copy source and rebuild +COPY lnvps_agent/src lnvps_agent/src +COPY lnvps_api_common/src lnvps_api_common/src +RUN cargo build --release -p lnvps_agent + +# Runtime stage +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/lnvps_agent /usr/local/bin/lnvps_agent + +# Default settings path (mount your settings.yaml at /etc/lnvps-agent/settings.yaml) +ENV LNVPS_AGENT_CONFIG=/etc/lnvps-agent/settings.yaml + +ENTRYPOINT ["/usr/local/bin/lnvps_agent"] diff --git a/lnvps_agent/settings.example.yaml b/lnvps_agent/settings.example.yaml new file mode 100644 index 00000000..68f2ee3e --- /dev/null +++ b/lnvps_agent/settings.example.yaml @@ -0,0 +1,54 @@ +# LNVPS Support Agent Configuration + +listen: 0.0.0.0:8080 + +# Base URLs for the LNVPS APIs +admin-api-url: https://admin.lnvps.io +user-api-url: https://api.lnvps.io + +# Nsec key for signing NIP-98 auth events and Nostr channel operations (bech32 nsec1...) +nsec: nsec1... + +# OpenAI-compatible API configuration (works with Ollama, LM Studio, etc.) +openai: + # For Ollama: http://localhost:11434/v1 + # For OpenAI: https://api.openai.com/v1 + base-url: http://localhost:11434/v1 + # API key — not needed for Ollama, required for OpenAI + # api-key: sk-... + # Model to use (e.g., llama3.2, qwen2.5, gpt-4o) + model: llama3.2 + # Max completion tokens + max-tokens: 2048 + +# Optional: custom system prompt override +# system-prompt: | +# You are the LNVPS support agent... + +# Email support channel (optional) +# Uses IMAP IDLE for push-based email notifications +# email: +# imap-server: imap.gmail.com:993 +# imap-username: support@lnvps.io +# imap-password: your-app-password +# imap-mailbox: INBOX +# smtp-server: smtp.gmail.com:587 +# smtp-username: support@lnvps.io +# smtp-password: your-app-password +# smtp-from: support@lnvps.io +# smtp-from-name: LNVPS Support + +# Kind 1 Nostr channel (optional) +# Replies to mentions on nostr kind 1 events in real-time +# kind1: +# relays: +# - wss://nos.lol +# - wss://relay.damus.io +# - wss://relay.primal.net +# # Optional: specific pubkey(s) whose mentions trigger replies +# # Defaults to the bot's own pubkey (derived from the shared nsec above) +# # mention-pubkeys: +# # - "hex_pubkey_here" + +# Path to conversation history storage directory +# conversation-history-path: ./conversation_history diff --git a/lnvps_agent/src/agent.rs b/lnvps_agent/src/agent.rs new file mode 100644 index 00000000..77d03946 --- /dev/null +++ b/lnvps_agent/src/agent.rs @@ -0,0 +1,724 @@ +use anyhow::{Result, anyhow, bail}; +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::api_client::ApiClient; +use crate::conversation::{ConversationEntry, ConversationStore, SenderConversation}; +use crate::settings::Settings; + +/// Executes tool calls by invoking the LNVPS APIs. +/// Each instance is scoped to a single user — all tools operate +/// within that user's context without taking user identifiers. +#[async_trait] +pub trait ToolExecutor: Send + Sync { + async fn execute(&self, name: &str, arguments: &str) -> Result; +} + +/// The actual tool executor backed by the API client, scoped to one user. +pub struct LnvpsToolExecutor { + api: Arc, + user_id: u64, +} + +impl LnvpsToolExecutor { + pub fn new(api: Arc, user_id: u64) -> Self { + Self { api, user_id } + } + + async fn check_vm_ownership(&self, vm_id: u64) -> Result<()> { + let vm = self.api.admin_get_vm(vm_id).await?; + let owner = vm["user_id"] + .as_u64() + .ok_or_else(|| anyhow!("VM {} has no user_id field", vm_id))?; + if owner != self.user_id { + bail!( + "VM {} does not belong to the current user (owner is {})", + vm_id, + owner + ); + } + Ok(()) + } +} + +#[async_trait] +impl ToolExecutor for LnvpsToolExecutor { + async fn execute(&self, name: &str, arguments: &str) -> Result { + let args: HashMap = + serde_json::from_str(arguments).unwrap_or_default(); + + let uid = self.user_id; + + match name { + "get_my_account" => { + let user = self.api.admin_get_user(uid).await?; + Ok(serde_json::to_string_pretty(&user)?) + } + + "list_my_vms" => { + let vms = self.api.admin_list_vms(Some(uid), None).await?; + Ok(serde_json::to_string_pretty(&vms)?) + } + + "get_vm_details" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let vm = self.api.admin_get_vm(vm_id).await?; + Ok(serde_json::to_string_pretty(&vm)?) + } + + "list_vm_payments" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let payments = self.api.admin_list_vm_payments(vm_id).await?; + Ok(serde_json::to_string_pretty(&payments)?) + } + + "list_vm_history" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let history = self.api.admin_list_vm_history(vm_id).await?; + Ok(serde_json::to_string_pretty(&history)?) + } + + "extend_vm" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let days = args + .get("days") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("days required"))?; + let rsp = self.api.admin_extend_vm(vm_id, days).await?; + Ok(serde_json::to_string_pretty(&rsp)?) + } + + "refund_vm" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let rsp = self.api.admin_refund_vm(vm_id).await?; + Ok(serde_json::to_string_pretty(&rsp)?) + } + + "delete_vm" => { + let vm_id = args + .get("vm_id") + .and_then(|v| v.as_u64()) + .ok_or_else(|| anyhow!("vm_id required"))?; + self.check_vm_ownership(vm_id).await?; + let rsp = self.api.admin_delete_vm(vm_id).await?; + Ok(serde_json::to_string_pretty(&rsp)?) + } + + "list_regions" => { + let regions = self.api.admin_list_regions().await?; + Ok(serde_json::to_string_pretty(®ions)?) + } + + _ => bail!("Unknown tool: {}", name), + } + } +} + +/// Public tool executor for non-customer requests. +/// Only exposes read-only endpoints that don't require authentication. +pub struct PublicToolExecutor { + api: Arc, +} + +impl PublicToolExecutor { + pub fn new(api: Arc) -> Self { + Self { api } + } +} + +#[async_trait] +impl ToolExecutor for PublicToolExecutor { + async fn execute(&self, name: &str, _arguments: &str) -> Result { + match name { + "list_regions" => { + let regions = self.api.admin_list_regions().await?; + Ok(serde_json::to_string_pretty(®ions)?) + } + "list_templates" => { + let templates = self.api.admin_list_templates().await?; + Ok(serde_json::to_string_pretty(&templates)?) + } + "list_os_images" => { + let images = self.api.admin_list_os_images().await?; + Ok(serde_json::to_string_pretty(&images)?) + } + _ => bail!("Unknown tool: {}", name), + } + } +} + +fn general_system_message() -> String { + r#"You are the LNVPS support agent, helping potential customers and the general public with +questions about LNVPS VPS hosting services. + +The sender has not been identified as an existing LNVPS customer, but you have access to +the following tools to help answer their questions: +- list_regions — see all available hosting regions +- list_templates — see all available VM plans with specs and pricing +- list_os_images — see all available operating system images + +Use these tools to give accurate, up-to-date answers about pricing, available plans, +regions, and OS options. Never guess or fabricate data. + +If the person is an existing customer and needs account-specific help, ask them to +send their support request from the email address registered on their LNVPS account, +or include their nostr pubkey (64 hex characters) in the email so you can look up +their account. + +Be friendly, professional, and concise."# + .to_string() +} + +/// The AI support agent that handles a support conversation +pub struct SupportAgent { + api: Arc, + settings: Settings, + store: Arc, + /// Maximum conversation exchanges to retain per sender (before compaction). + max_history: usize, +} + +impl SupportAgent { + pub fn new(api: Arc, settings: Settings, store: Arc) -> Self { + Self { + api, + settings, + store, + max_history: 10, + } + } + + fn openai_client(&self) -> async_openai::Client { + use async_openai::Client; + use async_openai::config::OpenAIConfig; + let mut config = OpenAIConfig::new().with_api_base(&self.settings.openai.base_url); + if let Some(ref key) = self.settings.openai.api_key { + config = config.with_api_key(key); + } + Client::with_config(config) + } + + fn system_message(&self, user_pubkey: &str, account: &serde_json::Value) -> String { + let account_pretty = serde_json::to_string_pretty(account).unwrap_or_default(); + + self.settings.system_prompt.clone().unwrap_or_else(|| { + format!( + r#"You are the LNVPS support agent. You help customers with their VPS hosting +accounts, virtual machines, payments, and billing. + +Current user context: +- Nostr pubkey: {user_pubkey} +- Account info: {account_pretty} + +All your tools are automatically scoped to this user — you do NOT need to pass +pubkey or user_id. Just call get_my_account or list_my_vms directly to see +their data. You can manage only this user's VMs and account. + +Guidelines: +1. Be friendly and professional. The user may be frustrated — be empathetic. +2. Use list_my_vms first to see what VMs the user has, then get_vm_details + for specifics. +3. Check list_vm_payments to understand billing issues, and list_vm_history + for activity logs. +4. Be VERY careful with destructive actions (refund, delete, extend_vm). + Always confirm verbally with the user before executing them — tell them + exactly what will happen and ask for explicit confirmation. +5. If you don't have enough info, ask the customer for more details. +6. When presenting payment data, always include amounts, currencies, dates, + and paid/unpaid status. +7. If a VM is expired, check payment history to see what happened. +8. For connectivity issues, check VM details for IP assignments. +9. NEVER fabricate data. Only report what your tools actually return. +10. If a tool call fails, explain the error honestly and suggest next steps. + +LNVPS product info: +- VMs are provisioned on Proxmox and LibVirt hypervisors +- Payments via Lightning Network (Bitcoin) or fiat (Revolut, Stripe, PayPal) +- VMs auto-expire if not renewed +- Customers can manage SSH keys, upgrade specs, reinstall OS images, and + access console via WebSocket +- Custom VM templates are available in regions that support them"# + ) + }) + } + + /// Build the messages vec with system prompt, summary block, and history entries. + async fn build_messages_with_history( + &self, + messages: &mut Vec, + sender_id: &str, + system_prompt: String, + ) { + use async_openai::types::{ + ChatCompletionRequestAssistantMessage, + ChatCompletionRequestUserMessageArgs, + ChatCompletionRequestSystemMessageArgs, + }; + + let conv = self.store.load(sender_id).await; + + // Inject the summary as a memory block before the main system prompt + let full_system = if let Some(ref summary) = conv.summary { + format!( + r#"{system_prompt} + + +{summary} + + +The above is your accumulated knowledge from all prior conversations with this sender. +Use it to provide continuity — reference past issues, remember what was tried, and +avoid repeating yourself."# + ) + } else { + system_prompt + }; + + messages.push( + ChatCompletionRequestSystemMessageArgs::default() + .content(full_system) + .build() + .unwrap() + .into(), + ); + + // Inject recent raw exchanges after the system prompt + for entry in &conv.entries { + messages.push( + ChatCompletionRequestUserMessageArgs::default() + .content(entry.user_message.clone()) + .build() + .unwrap() + .into(), + ); + messages.push( + ChatCompletionRequestAssistantMessage { + content: Some( + async_openai::types::ChatCompletionRequestAssistantMessageContent::Text( + entry.agent_response.clone(), + ), + ), + ..Default::default() + } + .into(), + ); + } + } + + /// Record a conversation exchange for a sender. + async fn record_exchange(&self, sender_id: &str, user_msg: &str, agent_resp: &str) { + let entry = ConversationEntry { + user_message: user_msg.to_string(), + agent_response: agent_resp.to_string(), + timestamp: chrono::Utc::now().timestamp(), + }; + if let Err(e) = self.store.append(sender_id, entry).await { + log::error!("Failed to record conversation for {}: {}", sender_id, e); + return; + } + + // Auto-compact when entries exceed max_history + let conv = self.store.load(sender_id).await; + if conv.entries.len() > self.max_history { + log::info!( + "Conversation for {} has {} entries, triggering compaction", + sender_id, + conv.entries.len() + ); + if let Err(e) = self.compact(sender_id).await { + log::error!("Failed to compact conversation for {}: {}", sender_id, e); + } + } + } + + /// Compact the conversation history for a sender using the LLM. + /// + /// Summarises all raw entries into a persistent `` block that + /// is injected into the system prompt on future requests. Clears the + /// raw entries after compaction so only the summary carries forward. + pub async fn compact(&self, sender_id: &str) -> Result<()> { + use async_openai::types::{ + ChatCompletionRequestMessage, + ChatCompletionRequestSystemMessageArgs, + ChatCompletionRequestUserMessageArgs, + CreateChatCompletionRequestArgs, + }; + + let conv = self.store.load(sender_id).await; + + if conv.entries.is_empty() { + log::info!("No entries to compact for {}", sender_id); + return Ok(()); + } + + // Build the conversation transcript for summarisation + let mut transcript = String::new(); + if let Some(ref existing) = conv.summary { + transcript.push_str("Existing summary (incorporate into your updated summary):\n"); + transcript.push_str(existing); + transcript.push_str("\n\nNew exchanges to fold in:\n"); + } + for entry in &conv.entries { + transcript.push_str(&format!("User: {}\nAgent: {}\n\n", entry.user_message, entry.agent_response)); + } + + let client = self.openai_client(); + + let messages: Vec = vec![ + ChatCompletionRequestSystemMessageArgs::default() + .content( + r#"You are a conversation summariser for a support agent. +Your job is to produce a concise but complete memory block that will be injected +into the agent's system prompt so it remembers everything important about this +sender's support history. + +When writing the summary: +- Preserve ALL concrete facts: VM IDs, IPs, hostnames, region names, dates, + error messages, what was tried and whether it worked, outstanding issues, + payment amounts and statuses, refund decisions, and any explicit user + preferences. +- Note anything the agent should remember to do or NOT do with this sender + (e.g. "always explain pricing before extending", "user is non-technical"). +- If a prior issue was resolved, say so briefly so the agent doesn't re-open it. +- If an issue is still open, make that very clear. +- Write in third person ("The customer", "The user"). +- Keep it under 800 words. +- Output ONLY the summary text — no markdown fences, no preamble."# + ) + .build() + .unwrap() + .into(), + ChatCompletionRequestUserMessageArgs::default() + .content(transcript) + .build() + .unwrap() + .into(), + ]; + + let request = CreateChatCompletionRequestArgs::default() + .model(&self.settings.openai.model) + .max_completion_tokens(1024u32) + .messages(messages) + .build()?; + + let response = client.chat().create(request).await?; + let summary = response.choices[0] + .message + .content + .clone() + .ok_or_else(|| anyhow!("LLM returned empty summary"))?; + + log::info!( + "Compacted conversation for {}: {} entries -> {} chars summary", + sender_id, + conv.entries.len(), + summary.len() + ); + + // Save summary and clear raw entries + self.store.save( + sender_id, + SenderConversation { + summary: Some(summary), + entries: vec![], + }, + ) + .await + } + + pub async fn process_request( + &self, + sender_id: &str, + user_pubkey: Option<&str>, + user_message: &str, + channel_prompt: &str, + ) -> Result { + use async_openai::types::{ + ChatCompletionRequestAssistantMessage, ChatCompletionRequestMessage, + ChatCompletionRequestToolMessageArgs, + ChatCompletionRequestUserMessageArgs, ChatCompletionTool, ChatCompletionToolType, + CreateChatCompletionRequestArgs, + }; + + let client = self.openai_client(); + + // --- General question (no known user) --- + let Some(pubkey) = user_pubkey else { + let system = if channel_prompt.is_empty() { + general_system_message() + } else { + format!("{}\n\n{}", general_system_message(), channel_prompt) + }; + + let tools: Vec = super::tools::public_tools() + .into_iter() + .map(|f| ChatCompletionTool { + function: f, + r#type: ChatCompletionToolType::Function, + }) + .collect(); + + let mut messages: Vec = Vec::new(); + self.build_messages_with_history( + &mut messages, + sender_id, + system, + ) + .await; + + messages.push( + ChatCompletionRequestUserMessageArgs::default() + .content(user_message.to_string()) + .build() + .unwrap() + .into(), + ); + + let executor = Arc::new(PublicToolExecutor::new(self.api.clone())); + let max_iterations = 5; + + for _ in 0..max_iterations { + let request = CreateChatCompletionRequestArgs::default() + .model(&self.settings.openai.model) + .max_completion_tokens(self.settings.openai.max_tokens.unwrap_or(2048)) + .messages(messages.clone()) + .tools(tools.clone()) + .build()?; + + let response = client.chat().create(request).await?; + let choice = &response.choices[0]; + + if let Some(ref tool_calls) = choice.message.tool_calls + && !tool_calls.is_empty() + { + let assistant_tool_calls: Vec< + async_openai::types::ChatCompletionMessageToolCall, + > = tool_calls.to_vec(); + + messages.push( + ChatCompletionRequestAssistantMessage { + content: None, + tool_calls: Some(assistant_tool_calls.clone()), + ..Default::default() + } + .into(), + ); + + for tc in tool_calls { + let name = tc.function.name.clone(); + let args = tc.function.arguments.clone(); + let call_id = tc.id.clone(); + + log::info!("Executing public tool: {} with args: {}", name, args); + + let result = match executor.execute(&name, &args).await { + Ok(content) => content, + Err(e) => format!("Error: {}", e), + }; + + log::info!("Tool {} result: {}", name, &result[..result.len().min(200)]); + + messages.push( + ChatCompletionRequestToolMessageArgs::default() + .tool_call_id(call_id) + .content(result) + .build() + .unwrap() + .into(), + ); + } + continue; + } + + let content = choice + .message + .content + .clone() + .unwrap_or_else(|| "I'm sorry, I couldn't generate a response.".to_string()); + + self.record_exchange(sender_id, user_message, &content).await; + return Ok(content); + } + + let fallback = "I wasn't able to generate a complete response. Could you try rephrasing your question?".to_string(); + self.record_exchange(sender_id, user_message, &fallback).await; + return Ok(fallback); + }; + + // --- Known user: resolve, create scoped executor with tools --- + let user = self + .api + .admin_find_user_by_pubkey(pubkey) + .await? + .ok_or_else(|| anyhow!("No user found with pubkey: {}", pubkey))?; + + let user_id = user["id"] + .as_u64() + .ok_or_else(|| anyhow!("User record missing 'id' field"))?; + + let account = self.api.admin_get_user(user_id).await?; + let executor = Arc::new(LnvpsToolExecutor::new(self.api.clone(), user_id)); + + let tools: Vec = super::tools::support_tools() + .into_iter() + .map(|f| ChatCompletionTool { + function: f, + r#type: ChatCompletionToolType::Function, + }) + .collect(); + + let mut messages: Vec = Vec::new(); + let sys = if channel_prompt.is_empty() { + self.system_message(pubkey, &account) + } else { + format!("{}\n\n{}", self.system_message(pubkey, &account), channel_prompt) + }; + self.build_messages_with_history( + &mut messages, + sender_id, + sys, + ) + .await; + + messages.push( + ChatCompletionRequestUserMessageArgs::default() + .content(user_message.to_string()) + .build() + .unwrap() + .into(), + ); + + let max_iterations = 10; + + for _ in 0..max_iterations { + let request = CreateChatCompletionRequestArgs::default() + .model(&self.settings.openai.model) + .max_completion_tokens(self.settings.openai.max_tokens.unwrap_or(2048)) + .messages(messages.clone()) + .tools(tools.clone()) + .build()?; + + let response = client.chat().create(request).await?; + let choice = &response.choices[0]; + + if let Some(ref tool_calls) = choice.message.tool_calls + && !tool_calls.is_empty() + { + let assistant_tool_calls: Vec< + async_openai::types::ChatCompletionMessageToolCall, + > = tool_calls.to_vec(); + + messages.push( + ChatCompletionRequestAssistantMessage { + content: None, + tool_calls: Some(assistant_tool_calls.clone()), + ..Default::default() + } + .into(), + ); + + for tc in tool_calls { + let name = tc.function.name.clone(); + let args = tc.function.arguments.clone(); + let call_id = tc.id.clone(); + + log::info!("Executing tool: {} with args: {}", name, args); + + let result = match executor.execute(&name, &args).await { + Ok(content) => content, + Err(e) => format!("Error: {}", e), + }; + + log::info!("Tool {} result: {}", name, &result[..result.len().min(200)]); + + messages.push( + ChatCompletionRequestToolMessageArgs::default() + .tool_call_id(call_id) + .content(result) + .build() + .unwrap() + .into(), + ); + } + continue; + } + + let content = choice + .message + .content + .clone() + .unwrap_or_else(|| "I processed your request but have no further response.".to_string()); + + self.record_exchange(sender_id, user_message, &content).await; + return Ok(content); + } + + Ok("I've checked everything I can but the issue may need more investigation. Please open a manual support ticket.".to_string()) + } + + pub async fn run_loop(&self, channel: Box) { + use crate::channel::SupportReply; + + let channel_prompt = channel.channel_prompt().to_string(); + + while let Some(req) = channel.next_request().await { + let pubkey_display = req.pubkey.as_deref().unwrap_or("(general)"); + log::info!( + "Processing request from {} (sender={}): {}", + pubkey_display, + req.sender_id, + &req.message[..req.message.len().min(100)] + ); + + let reply_ctx = req.channel_context.clone(); + let response = match self + .process_request(&req.sender_id, req.pubkey.as_deref(), &req.message, &channel_prompt) + .await + { + Ok(text) => text, + Err(e) => { + log::error!("Agent error: {}", e); + format!( + "I encountered an error processing your request. Please try again later. ({})", + e + ) + } + }; + + log::info!("Response: {}", &response[..response.len().min(200)]); + + if let Err(e) = channel + .send_reply(SupportReply { + response, + channel_context: reply_ctx, + }) + .await + { + log::error!("Failed to send reply: {}", e); + } + } + + log::info!("Support channel closed, agent exiting."); + } +} diff --git a/lnvps_agent/src/api_client.rs b/lnvps_agent/src/api_client.rs new file mode 100644 index 00000000..6d73e3b0 --- /dev/null +++ b/lnvps_agent/src/api_client.rs @@ -0,0 +1,434 @@ +use anyhow::{Context, Result}; +use reqwest::Client; +use serde::Deserialize; + +use crate::nip98::Nip98Signer; +use crate::settings::Settings; + +/// HTTP client for calling the LNVPS admin and user APIs. +/// +/// Generates fresh NIP-98 auth tokens from an nsec key on every request. +pub struct ApiClient { + client: Client, + admin_api_url: String, + user_api_url: String, + signer: Nip98Signer, +} + +impl ApiClient { + pub fn new(settings: &Settings) -> Result { + let signer = Nip98Signer::from_nsec(&settings.nsec)?; + Ok(Self { + client: Client::new(), + admin_api_url: settings.admin_api_url.trim_end_matches('/').to_string(), + user_api_url: settings.user_api_url.trim_end_matches('/').to_string(), + signer, + }) + } + + /// Generate a fresh NIP-98 auth header value for the given URL path and method. + fn auth_header(&self, path: &str, method: &str) -> Result { + let full_url = format!("{}{}", self.admin_api_url, path); + let token = self.signer.sign_auth_token(&full_url, method)?; + Ok(format!("Nostr {}", token)) + } + + // ── Admin API calls ────────────────────────────────────────────── + + /// List all VMs, optionally filtered by user_id + pub async fn admin_list_vms( + &self, + user_id: Option, + include_deleted: Option, + ) -> Result> { + let path = "/api/admin/v1/vms"; + let mut url = format!("{}{}", self.admin_api_url, path); + let mut params = Vec::new(); + if let Some(uid) = user_id { + params.push(("user_id", uid.to_string())); + } + if let Some(d) = include_deleted { + params.push(("include_deleted", d.to_string())); + } + if !params.is_empty() { + let qs: Vec = params.iter().map(|(k, v)| format!("{k}={v}")).collect(); + url.push('?'); + url.push_str(&qs.join("&")); + } + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(path, "GET")?) + .send() + .await + .context("admin_list_vms request failed")? + .json() + .await + .context("admin_list_vms parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// Get a specific VM by id + pub async fn admin_get_vm(&self, vm_id: u64) -> Result { + let path = format!("/api/admin/v1/vms/{}", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper = self + .client + .get(&url) + .header("Authorization", self.auth_header(&path, "GET")?) + .send() + .await + .context("admin_get_vm request failed")? + .json() + .await + .context("admin_get_vm parse failed")?; + + rsp.data.context("No VM data in response") + } + + /// List a VM's payment history + pub async fn admin_list_vm_payments(&self, vm_id: u64) -> Result> { + let path = format!("/api/admin/v1/vms/{}/payments", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(&path, "GET")?) + .send() + .await + .context("admin_list_vm_payments request failed")? + .json() + .await + .context("admin_list_vm_payments parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// Get a user's info by id + pub async fn admin_get_user(&self, user_id: u64) -> Result { + let path = format!("/api/admin/v1/users/{}", user_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper = self + .client + .get(&url) + .header("Authorization", self.auth_header(&path, "GET")?) + .send() + .await + .context("admin_get_user request failed")? + .json() + .await + .context("admin_get_user parse failed")?; + + rsp.data.context("No user data in response") + } + + /// List a VM's history + pub async fn admin_list_vm_history(&self, vm_id: u64) -> Result> { + let path = format!("/api/admin/v1/vms/{}/history", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(&path, "GET")?) + .send() + .await + .context("admin_list_vm_history request failed")? + .json() + .await + .context("admin_list_vm_history parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// List all users, paginating through all results (100 per page). + pub async fn admin_list_users(&self) -> Result> { + let mut all_users = Vec::new(); + let mut offset: u64 = 0; + let limit: u64 = 100; + + loop { + let path = format!("/api/admin/v1/users?limit={}&offset={}", limit, offset); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminPaginatedResponse> = self + .client + .get(&url) + .header("Authorization", self.auth_header("/api/admin/v1/users", "GET")?) + .send() + .await + .context("admin_list_users request failed")? + .json() + .await + .context("admin_list_users parse failed")?; + + let page = rsp.data.unwrap_or_default(); + let total = rsp.total.unwrap_or(0); + let page_len = page.len() as u64; + all_users.extend(page); + + if all_users.len() as u64 >= total || page_len < limit { + break; + } + offset += limit; + } + + Ok(all_users) + } + + /// Lookup a user by pubkey hex using the API search parameter. + pub async fn admin_find_user_by_pubkey( + &self, + pubkey: &str, + ) -> Result> { + let path = format!("/api/admin/v1/users?search={}", pubkey); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminPaginatedResponse> = self + .client + .get(&url) + .header("Authorization", self.auth_header("/api/admin/v1/users", "GET")?) + .send() + .await + .context("admin_find_user_by_pubkey request failed")? + .json() + .await + .context("admin_find_user_by_pubkey parse failed")?; + + let users = rsp.data.unwrap_or_default(); + // search returns prefix matches, so filter for exact pubkey + Ok(users.into_iter().find(|u| { + u.get("pubkey") + .and_then(|v| v.as_str()) + .map(|p| p == pubkey) + .unwrap_or(false) + })) + } + + /// Lookup a user by email address via the indexed email_hash column. + pub async fn admin_find_user_by_email( + &self, + email: &str, + ) -> Result> { + let path = format!("/api/admin/v1/users/by-email?email={}", email); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: serde_json::Value = self + .client + .get(&url) + .header("Authorization", self.auth_header("/api/admin/v1/users/by-email", "GET")?) + .send() + .await + .context("admin_find_user_by_email request failed")? + .json() + .await + .context("admin_find_user_by_email parse failed")?; + + if rsp.get("error").is_some() || rsp.get("data").and_then(|v| v.as_object()).is_none() { + return Ok(None); + } + + Ok(rsp.get("data").cloned()) + } + + /// Refund a VM payment + pub async fn admin_refund_vm(&self, vm_id: u64) -> Result { + let path = format!("/api/admin/v1/vms/{}/refund", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper = self + .client + .post(&url) + .header("Authorization", self.auth_header(&path, "POST")?) + .header("Content-Type", "application/json") + .body("{}") + .send() + .await + .context("admin_refund_vm request failed")? + .json() + .await + .context("admin_refund_vm parse failed")?; + + rsp.data.context("No refund data in response") + } + + /// Extend a VM + pub async fn admin_extend_vm(&self, vm_id: u64, days: u64) -> Result { + let path = format!("/api/admin/v1/vms/{}/extend", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let body = serde_json::json!({"days": days}); + + let rsp: AdminResponseWrapper = self + .client + .put(&url) + .header("Authorization", self.auth_header(&path, "PUT")?) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .context("admin_extend_vm request failed")? + .json() + .await + .context("admin_extend_vm parse failed")?; + + rsp.data.context("No extend data in response") + } + + /// Delete a VM + pub async fn admin_delete_vm(&self, vm_id: u64) -> Result { + let path = format!("/api/admin/v1/vms/{}", vm_id); + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper = self + .client + .delete(&url) + .header("Authorization", self.auth_header(&path, "DELETE")?) + .send() + .await + .context("admin_delete_vm request failed")? + .json() + .await + .context("admin_delete_vm parse failed")?; + + rsp.data.context("No delete data in response") + } + + /// Get the regions + pub async fn admin_list_regions(&self) -> Result> { + let path = "/api/admin/v1/regions"; + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(path, "GET")?) + .send() + .await + .context("admin_list_regions request failed")? + .json() + .await + .context("admin_list_regions parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// List all VM templates (name, specs, pricing, region) + pub async fn admin_list_templates(&self) -> Result> { + let path = "/api/admin/v1/vm_templates"; + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(path, "GET")?) + .send() + .await + .context("admin_list_templates request failed")? + .json() + .await + .context("admin_list_templates parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// List all OS images available for provisioning + pub async fn admin_list_os_images(&self) -> Result> { + let path = "/api/admin/v1/vm_os_images"; + let url = format!("{}{}", self.admin_api_url, path); + + let rsp: AdminResponseWrapper> = self + .client + .get(&url) + .header("Authorization", self.auth_header(path, "GET")?) + .send() + .await + .context("admin_list_os_images request failed")? + .json() + .await + .context("admin_list_os_images parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + // ── User API calls (for user-scoped lookups) ─────────────────── + + /// List user's VMs (user API, not admin) + /// Note: requires the user's Nip98 auth token, which comes from the support channel + pub async fn user_list_vms(&self, auth_token: &str) -> Result> { + let path = "/api/v1/vm"; + let url = format!("{}{}", self.user_api_url, path); + + let rsp: ApiResponseWrapper> = self + .client + .get(&url) + .header("Authorization", format!("Nostr {}", auth_token)) + .send() + .await + .context("user_list_vms request failed")? + .json() + .await + .context("user_list_vms parse failed")?; + + Ok(rsp.data.unwrap_or_default()) + } + + /// Get user's account info + pub async fn user_get_account(&self, auth_token: &str) -> Result { + let path = "/api/v1/account"; + let url = format!("{}{}", self.user_api_url, path); + + let rsp: ApiResponseWrapper = self + .client + .get(&url) + .header("Authorization", format!("Nostr {}", auth_token)) + .send() + .await + .context("user_get_account request failed")? + .json() + .await + .context("user_get_account parse failed")?; + + rsp.data.context("No account data") + } +} + +// ── API response wrappers ─────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +struct AdminResponseWrapper { + #[serde(default)] + data: Option, + #[serde(default)] + #[allow(dead_code)] + error: Option, +} + +/// Paginated admin API response wrapper. +#[derive(Debug, Deserialize)] +struct AdminPaginatedResponse { + #[serde(default)] + data: Option, + #[serde(default)] + total: Option, + #[serde(default)] + #[allow(dead_code)] + limit: Option, + #[serde(default)] + #[allow(dead_code)] + offset: Option, +} + +#[derive(Debug, Deserialize)] +struct ApiResponseWrapper { + #[serde(default)] + data: Option, + #[serde(default)] + #[allow(dead_code)] + error: Option, +} diff --git a/lnvps_agent/src/channel/email.rs b/lnvps_agent/src/channel/email.rs new file mode 100644 index 00000000..a4931911 --- /dev/null +++ b/lnvps_agent/src/channel/email.rs @@ -0,0 +1,558 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use futures::TryStreamExt; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + +use lettre::message::Mailbox; +use lettre::message::header::ContentType; +use lettre::transport::smtp::authentication::Credentials; +use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor}; + +use crate::api_client::ApiClient; +use crate::channel::{IncomingSupportRequest, SupportChannel, SupportReply}; +use crate::settings::EmailConfig; + +/// Email support channel that uses IMAP IDLE for push-based email notifications. +/// +/// Keeps a persistent IMAP connection open and uses the IDLE command (RFC 2177) +/// to receive instant notifications when new messages arrive. Falls back to +/// reconnecting on errors. +/// +/// Users are identified by their `From` email address, which is looked up +/// against the LNVPS admin API to resolve to a pubkey. +pub struct EmailSupportChannel { + config: EmailConfig, + /// Receive end of the channel fed by the IDLE loop. + rx: tokio::sync::Mutex>, +} + +impl EmailSupportChannel { + pub fn new(config: EmailConfig, api: Arc) -> Self { + let (tx, rx) = mpsc::channel::(256); + + let cfg = config.clone(); + let api_clone = api.clone(); + + // Spawn a background task that maintains a persistent IMAP connection + // and uses IDLE to wait for new messages. + tokio::spawn(async move { + run_idle_loop(cfg, api_clone, tx).await; + }); + + Self { + config, + rx: tokio::sync::Mutex::new(rx), + } + } + + async fn send_smtp_reply( + &self, + to: &str, + subject: &str, + body: &str, + in_reply_to: &str, + references: &str, + ) -> Result<()> { + let from: Mailbox = if let Some(ref name) = self.config.smtp_from_name { + format!("{} <{}>", name, self.config.smtp_from) + .parse() + .context("Invalid from address")? + } else { + self.config + .smtp_from + .parse() + .context("Invalid from address")? + }; + + let to_addr: Mailbox = to.parse().context("Invalid to address")?; + + let mut builder = Message::builder() + .from(from) + .to(to_addr) + .subject(subject) + .header(ContentType::TEXT_PLAIN); + + // Set threading headers for proper email client grouping + if !in_reply_to.is_empty() { + builder = builder.in_reply_to(in_reply_to.to_string()); + } + if !references.is_empty() { + builder = builder.references(references.to_string()); + } + + let email = builder + .body(body.to_string()) + .context("Failed to build email message")?; + + // Parse host (strip :port if present — starttls_relay needs bare hostname) + let (host, port) = parse_host_port(&self.config.smtp_server, 587)?; + + log::info!("Connecting SMTP to {}:{}...", host, port); + let mailer = AsyncSmtpTransport::::starttls_relay(&host) + .context("Failed to create SMTP transport")? + .port(port) + .credentials(Credentials::new( + self.config.smtp_username.clone(), + self.config.smtp_password.clone(), + )) + .timeout(Some(std::time::Duration::from_secs(30))) + .build(); + + log::info!("Sending email to {} (subject: {})...", to, subject); + mailer + .send(email) + .await + .context("SMTP send failed")?; + + log::info!("SMTP send successful"); + + Ok(()) + } +} + +#[async_trait] +impl SupportChannel for EmailSupportChannel { + fn channel_prompt(&self) -> &str { + r#"Format your responses like a professional email reply: +- Use a polite greeting (e.g. "Hello", "Hi") +- Use proper paragraphs with blank lines between them +- Do NOT use markdown formatting (no **, ##, -, bullet lists with asterisks) +- Use plain text only — email clients render plain text, not markdown +- For lists, use numbered items (1. 2. 3.) on separate lines +- End with a professional sign-off (e.g. "Best regards, LNVPS Support") +- Keep the tone professional but approachable +- Do NOT include a subject line or "From" header in the body — those are set in the email envelope +- If quoting the customer's question, use "> " prefix for the quote block"# + } + + async fn next_request(&self) -> Option { + self.rx.lock().await.recv().await + } + + async fn send_reply(&self, reply: SupportReply) -> Result<()> { + let ctx: serde_json::Value = reply + .channel_context + .as_deref() + .and_then(|s| serde_json::from_str(s).ok()) + .unwrap_or_default(); + + let from_email = ctx + .get("from_email") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + let original_subject = ctx + .get("subject") + .and_then(|v| v.as_str()) + .unwrap_or("Support request"); + + let in_reply_to = ctx + .get("message_id") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let references = ctx + .get("references") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let re_subject = if original_subject.starts_with("Re: ") { + original_subject.to_string() + } else { + format!("Re: {}", original_subject) + }; + + self.send_smtp_reply(from_email, &re_subject, &reply.response, in_reply_to, references) + .await + } +} + +// ── IMAP IDLE loop ────────────────────────────────────────────────── + +/// Persistent IMAP connection loop using IDLE for push notifications. +/// +/// Connects, logs in, selects the mailbox, fetches any unseen messages, +/// then enters IDLE. When IDLE signals new mail, fetches and processes +/// the new messages, then re-enters IDLE. On any error, reconnects +/// after a short delay. +async fn run_idle_loop( + config: EmailConfig, + api: Arc, + tx: mpsc::Sender, +) { + let mut seen = HashSet::::new(); + + loop { + match idle_session(&config, &api, &tx, &mut seen).await { + Ok(()) => { + // idle_session returned normally — reconnect + log::info!("IMAP IDLE session ended, reconnecting..."); + } + Err(e) => { + log::warn!("IMAP IDLE error (reconnecting in 30s): {:#}", e); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + } + } + } +} + +/// Run a single IMAP session: connect, login, select, process unseen, IDLE loop. +async fn idle_session( + config: &EmailConfig, + api: &Arc, + tx: &mpsc::Sender, + seen: &mut HashSet, +) -> Result<()> { + use tokio::time::{timeout, Duration}; + + let t = Duration::from_secs(30); + let (host, _port) = parse_host_port(&config.imap_server, 993)?; + + log::info!("IMAP connecting to {}...", config.imap_server); + let tcp = timeout(t, TcpStream::connect(&config.imap_server)) + .await + .context("TCP connect timed out")? + .context("TCP connect failed")?; + + let tls_connector = native_tls::TlsConnector::builder() + .build() + .context("Failed to build TLS connector")?; + let tls_connector = tokio_native_tls::TlsConnector::from(tls_connector); + let tls_stream = timeout(t, tls_connector.connect(&host, tcp)) + .await + .context("TLS handshake timed out")? + .context("TLS handshake failed")?; + + let client = async_imap::Client::new(tls_stream); + let mut session = timeout(t, client.login(&config.imap_username, &config.imap_password)) + .await + .context("IMAP login timed out")? + .map_err(|(e, _)| anyhow::anyhow!("IMAP login failed: {}", e))?; + log::info!("IMAP logged in as {}", config.imap_username); + + let mailbox = config.imap_mailbox.as_deref().unwrap_or("INBOX"); + timeout(t, session.select(mailbox)) + .await + .context("IMAP select timed out")? + .context("Failed to select IMAP mailbox")?; + log::info!("IMAP selected '{}', fetching unseen...", mailbox); + + // Process any existing unseen messages + fetch_and_process(&mut session, api, tx, seen).await?; + + // IDLE loop: wait for new mail, then fetch + loop { + log::debug!("Entering IDLE..."); + let mut handle = session.idle(); + timeout(t, handle.init()) + .await + .context("IDLE init timed out")? + .context("IDLE init failed")?; + + // Wait for new data (IDLE auto-reconnects every 29 min per RFC 2177) + let (wait_fut, _stop) = handle.wait(); + let idle_result = timeout(Duration::from_secs(29 * 60), wait_fut) + .await; + + // Exit IDLE and get the session back + session = timeout(t, handle.done()) + .await + .context("IDLE done timed out")? + .context("IDLE done failed")?; + + match idle_result { + Ok(Ok(_resp)) => { + log::debug!("IDLE returned new data, fetching..."); + fetch_and_process(&mut session, api, tx, seen).await?; + } + Ok(Err(e)) => { + // IDLE error — reconnect + return Err(anyhow::anyhow!("IDLE wait error: {}", e)); + } + Err(_) => { + // Timeout — re-enter IDLE (keepalive) + log::debug!("IDLE timeout, re-entering..."); + } + } + } +} + +/// Search for unseen messages, process them, and send to the channel. +async fn fetch_and_process( + session: &mut async_imap::Session>, + api: &Arc, + tx: &mpsc::Sender, + seen: &mut HashSet, +) -> Result<()> { + use tokio::time::{timeout, Duration}; + + let t = Duration::from_secs(15); + + let unseen: Vec = timeout(t, session.search("UNSEEN")) + .await + .context("IMAP search timed out")? + .context("IMAP search failed")? + .into_iter() + .map(|seq: async_imap::types::Seq| seq.to_string()) + .collect(); + + if unseen.is_empty() { + return Ok(()); + } + + log::info!("Found {} unseen messages", unseen.len()); + let fetch_range = unseen.join(","); + let messages: Vec = timeout(t, session + .fetch(&fetch_range, "(FLAGS UID RFC822)")) + .await + .context("IMAP fetch timed out")? + .context("IMAP fetch failed")? + .try_collect() + .await + .context("Failed to collect fetch results")?; + + for fetch in &messages { + let Some(uid) = fetch.uid else { continue }; + let Some(body) = fetch.body() else { continue }; + let raw = match std::str::from_utf8(body) { + Ok(s) => s.to_string(), + Err(_) => continue, + }; + + let uid_str = uid.to_string(); + if seen.contains(&uid_str) { + continue; + } + seen.insert(uid_str.clone()); + + // Extract threading headers + let from_email = extract_header(&raw, "from") + .and_then(|v| extract_email_addr(&v)); + let subject = extract_header(&raw, "subject").unwrap_or_default(); + let message_id = extract_header(&raw, "message-id").unwrap_or_default(); + let in_reply_to = extract_header(&raw, "in-reply-to").unwrap_or_default(); + let references = extract_header(&raw, "references").unwrap_or_default(); + + let Some(ref from_email) = from_email else { + log::warn!("Message UID {} has no From address, skipping", uid); + session.store(uid_str, "+FLAGS (\\Seen)").await.ok(); + continue; + }; + + // Resolve the user via the admin API + let pubkey = match api.admin_find_user_by_email(from_email).await { + Ok(Some(u)) => u.get("pubkey").and_then(|v| v.as_str()).map(|s| s.to_string()), + Ok(None) => { + log::info!("No LNVPS user for {} (UID {}) — general question", from_email, uid); + None + } + Err(e) => { + log::error!("API error looking up {}: {} — general", from_email, e); + None + } + }; + + // Extract body text + let body_text = raw + .find("\r\n\r\n") + .map(|p| &raw[p + 4..]) + .or_else(|| raw.find("\n\n").map(|p| &raw[p + 2..])) + .unwrap_or("") + .trim() + .to_string(); + + let message = if !subject.is_empty() && !body_text.starts_with(&subject) { + format!("{}\n\n{}", subject, body_text) + } else { + body_text + }; + + log::info!("Email {} -> {} (UID {})", from_email, pubkey.as_deref().unwrap_or("general"), uid); + + let reply_references = build_reply_references(&references, &message_id); + + let req = IncomingSupportRequest { + pubkey, + sender_id: from_email.clone(), + message: message.trim().to_string(), + channel_context: Some( + serde_json::json!({ + "uid": uid, + "from_email": from_email, + "subject": subject, + "message_id": message_id, + "in_reply_to": in_reply_to, + "references": reply_references, + }) + .to_string(), + ), + }; + + if tx.send(req).await.is_err() { + // Channel closed — agent is shutting down + return Ok(()); + } + + session.store(uid_str, "+FLAGS (\\Seen)").await.ok(); + } + + Ok(()) +} + +// ── Minimal RFC822 header parser ───────────────────────────────────── + +/// Extract the first value of an RFC822 header (case-insensitive). +fn extract_header(raw: &str, name: &str) -> Option { + let name_lower = name.to_lowercase(); + for line in raw.lines() { + // End of headers + if line.is_empty() { + break; + } + // Continuation line (starts with whitespace) + if line.starts_with(' ') || line.starts_with('\t') { + continue; + } + if let Some(colon) = line.find(':') { + let key = line[..colon].trim().to_lowercase(); + if key == name_lower { + return Some(line[colon + 1..].trim().to_string()); + } + } + } + None +} + +/// Extract a bare email address from "Name " or "". +fn extract_email_addr(raw: &str) -> Option { + if let Some(open) = raw.find('<') + && let Some(close) = raw.rfind('>') + { + return Some(raw[open + 1..close].trim().to_string()); + } + let trimmed = raw.trim(); + if trimmed.contains('@') { + return Some(trimmed.to_string()); + } + None +} + +/// Parse "host:port" into (host, port). +fn parse_host_port(s: &str, default_port: u16) -> Result<(String, u16)> { + if let Some(pos) = s.rfind(':') { + let port: u16 = s[pos + 1..] + .parse() + .context("Invalid port number in server address")?; + Ok((s[..pos].to_string(), port)) + } else { + Ok((s.to_string(), default_port)) + } +} + +/// Build the References header for a reply by appending the original Message-ID +/// to the existing References chain. +/// +/// RFC 2822 says References should contain the full chain of Message-IDs from +/// root to immediate parent, so email clients can reconstruct the thread tree. +fn build_reply_references(existing: &str, incoming_message_id: &str) -> String { + let incoming = incoming_message_id.trim(); + if incoming.is_empty() { + return existing.trim().to_string(); + } + + let existing = existing.trim(); + if existing.is_empty() { + return incoming.to_string(); + } + + // Avoid duplicating if the ID is already in the chain + if existing.contains(incoming) { + return existing.to_string(); + } + + format!("{} {}", existing, incoming) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn build_reply_references_empty_existing() { + assert_eq!( + build_reply_references("", ""), + "" + ); + } + + #[test] + fn build_reply_references_empty_incoming() { + assert_eq!( + build_reply_references("", ""), + "" + ); + } + + #[test] + fn build_reply_references_appends() { + assert_eq!( + build_reply_references(" ", ""), + " " + ); + } + + #[test] + fn build_reply_references_deduplicates() { + let existing = " "; + assert_eq!( + build_reply_references(existing, ""), + " " + ); + } + + #[test] + fn extract_header_finds_subject() { + let raw = "From: user@example.com\nSubject: My VM is down\nMessage-ID: \n\nBody"; + assert_eq!(extract_header(raw, "subject"), Some("My VM is down".to_string())); + } + + #[test] + fn extract_header_case_insensitive() { + let raw = "message-id: \n\nBody"; + assert_eq!(extract_header(raw, "Message-ID"), Some("".to_string())); + } + + #[test] + fn extract_email_addr_with_name() { + assert_eq!( + extract_email_addr("John Doe "), + Some("john@example.com".to_string()) + ); + } + + #[test] + fn extract_email_addr_bare() { + assert_eq!( + extract_email_addr("john@example.com"), + Some("john@example.com".to_string()) + ); + } + + #[test] + fn parse_host_port_with_port() { + let (host, port) = parse_host_port("smtp.example.com:465", 587).unwrap(); + assert_eq!(host, "smtp.example.com"); + assert_eq!(port, 465); + } + + #[test] + fn parse_host_port_default_port() { + let (host, port) = parse_host_port("smtp.example.com", 587).unwrap(); + assert_eq!(host, "smtp.example.com"); + assert_eq!(port, 587); + } +} diff --git a/lnvps_agent/src/channel/kind1.rs b/lnvps_agent/src/channel/kind1.rs new file mode 100644 index 00000000..f9625e63 --- /dev/null +++ b/lnvps_agent/src/channel/kind1.rs @@ -0,0 +1,265 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use nostr_sdk::prelude::*; +use tokio::sync::mpsc; + +use crate::api_client::ApiClient; +use crate::channel::{IncomingSupportRequest, SupportChannel, SupportReply}; +use crate::settings::Kind1Config; + +/// Kind 1 Nostr support channel. +/// +/// Connects to relays, subscribes for kind 1 events that mention the bot's +/// pubkey, and receives them in real-time via `handle_notifications`. +/// Replies are published as NIP-10 kind 1 replies. +pub struct Kind1SupportChannel { + client: Client, + /// Receive end of the channel fed by the notification handler. + rx: tokio::sync::Mutex>, +} + +impl Kind1SupportChannel { + pub async fn new(config: Kind1Config, nsec: &str, api: Arc) -> Result { + let keys = Keys::parse(nsec).context("Invalid nsec key for kind1 channel")?; + let bot_pubkey = keys.public_key(); + + if config.relays.is_empty() { + anyhow::bail!("kind1.relays must contain at least one relay URL"); + } + + let mention_pubkeys = match &config.mention_pubkeys { + Some(pk_hexes) => pk_hexes + .iter() + .map(|h| PublicKey::from_hex(h).context("Invalid mention pubkey hex")) + .collect::>>()?, + None => vec![bot_pubkey], + }; + + let opts = ClientOptions::new() + .automatic_authentication(false); + + let client = Client::builder() + .signer(keys) + .opts(opts) + .build(); + + // Connect to relays + for relay in &config.relays { + client + .add_relay(relay) + .await + .with_context(|| format!("Failed to add relay: {}", relay))?; + } + client.connect().await; + log::info!("Kind1 channel connected to {} relays", config.relays.len()); + + // Subscribe to kind 1 events that mention any of our monitored pubkeys + let filter = Filter::new() + .kind(Kind::TextNote) + .pubkeys(mention_pubkeys.clone()) + .since(Timestamp::now()); + + client.subscribe(filter, None).await?; + log::info!( + "Kind1 channel subscribed to mentions of {} pubkey(s)", + mention_pubkeys.len() + ); + + // Clone client for the notification handler + let client_clone = client.clone(); + + // Spawn notification handler that pushes incoming events into an mpsc channel + let (tx, rx) = mpsc::channel::(256); + let handler_bot = bot_pubkey; + let handler_mentions = mention_pubkeys.clone(); + let handler_api = api.clone(); + + tokio::spawn(async move { + let seen = Arc::new(std::sync::Mutex::new(HashSet::::new())); + + let result = client_clone + .handle_notifications(|notification| { + let tx = tx.clone(); + let handler_mentions = handler_mentions.clone(); + let handler_api = handler_api.clone(); + let seen = seen.clone(); + + async move { + match notification { + RelayPoolNotification::Event { event, .. } => { + // Skip our own events + if event.pubkey == handler_bot { + return Ok(false); + } + + // Skip already seen + { + let mut seen = seen.lock().unwrap(); + if seen.contains(&event.id) { + return Ok(false); + } + seen.insert(event.id); + } + + // Verify the event has a p-tag for one of our monitored pubkeys + let mentions_us = event.tags.iter().any(|tag| { + if let Some(TagStandard::PublicKey { + public_key, + uppercase: false, + .. + }) = tag.as_standardized() + { + handler_mentions.contains(public_key) + } else { + false + } + }); + + if !mentions_us { + return Ok(false); + } + + let author_hex = event.pubkey.to_string(); + + // Look up whether the author is an LNVPS customer + let pubkey = match handler_api + .admin_find_user_by_pubkey(&author_hex) + .await + { + Ok(Some(_)) => Some(author_hex.clone()), + Ok(None) => { + log::info!( + "Kind1 mention from {} is not an LNVPS user — general", + &author_hex[..16.min(author_hex.len())] + ); + None + } + Err(e) => { + log::error!( + "API error looking up {}: {}", + &author_hex[..16.min(author_hex.len())], + e + ); + None + } + }; + + log::info!( + "Kind1 mention from {} (event {}): {}", + &author_hex[..16.min(author_hex.len())], + event.id, + &event.content[..event.content.len().min(100)] + ); + + let req = IncomingSupportRequest { + pubkey, + sender_id: author_hex, + message: event.content.clone(), + channel_context: Some( + serde_json::json!({ + "event_id": event.id.to_hex(), + "event_json": event.as_json(), + }) + .to_string(), + ), + }; + + let _ = tx.send(req).await; + + Ok(false) // keep listening + } + _ => Ok(false), + } + } + }) + .await; + + if let Err(e) = result { + log::error!("Kind1 notification handler exited: {}", e); + } + }); + + Ok(Self { + client, + rx: tokio::sync::Mutex::new(rx), + }) + } +} + +#[async_trait] +impl SupportChannel for Kind1SupportChannel { + fn channel_prompt(&self) -> &str { + r#"Format your responses for a Nostr kind 1 post: +- Keep it SHORT — Nostr kind 1 events should be concise (under ~500 chars is ideal, max ~2000) +- You may use Nostr-style formatting: **bold**, _italic_, and `code` +- Be friendly and direct — social media tone, not corporate email +- Include relevant links if helpful (e.g. https://lnvps.net) +- Do NOT sign off with "Best regards" or similar — this is a public social media reply +- Do NOT include "Re:" or subject lines +- Remember: your reply will be PUBLIC on Nostr — be professional and helpful +- Use emoji sparingly if it fits the context"# + } + + async fn next_request(&self) -> Option { + self.rx.lock().await.recv().await + } + + async fn send_reply(&self, reply: SupportReply) -> Result<()> { + let ctx: serde_json::Value = reply + .channel_context + .as_deref() + .and_then(|s| serde_json::from_str(s).ok()) + .unwrap_or_default(); + + let event_id_hex = ctx + .get("event_id") + .and_then(|v| v.as_str()) + .context("Missing event_id in channel context")?; + + let event_json = ctx + .get("event_json") + .and_then(|v| v.as_str()) + .context("Missing event_json in channel context")?; + + let original_event = + Event::from_json(event_json).context("Failed to parse original event JSON")?; + + // Build a NIP-10 text note reply + let builder = EventBuilder::text_note_reply( + &reply.response, + &original_event, + None::<&Event>, // root = same as reply_to (top-level reply) + None::, // no specific relay URL + ); + + let output = self + .client + .send_event_builder(builder) + .await + .context("Failed to publish kind 1 reply")?; + + log::info!( + "Kind1 reply published for event {}: {}", + event_id_hex, + output.val + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_nsec_and_derive_pubkey() { + let keys = Keys::generate(); + let nsec = keys.secret_key().to_bech32().unwrap(); + let parsed = Keys::parse(&nsec).unwrap(); + assert_eq!(parsed.public_key(), keys.public_key()); + } +} diff --git a/lnvps_agent/src/channel/mod.rs b/lnvps_agent/src/channel/mod.rs new file mode 100644 index 00000000..59a9830c --- /dev/null +++ b/lnvps_agent/src/channel/mod.rs @@ -0,0 +1,52 @@ +pub mod email; +pub mod kind1; + +use anyhow::Result; +use async_trait::async_trait; + +/// An incoming support request from a customer. +#[derive(Clone, Debug)] +pub struct IncomingSupportRequest { + /// The customer's nostr pubkey in hex format (64 chars). + /// `None` for general questions from unknown senders. + pub pubkey: Option, + /// Stable identifier for this sender across requests. + /// For email channels this is the From email address. + pub sender_id: String, + /// The customer's message. + pub message: String, + /// Opaque channel-specific identifier — the channel implementation + /// can stash whatever it needs here to route the reply later. + pub channel_context: Option, +} + +/// The reply produced by the agent for delivery back through the channel. +#[derive(Clone, Debug)] +pub struct SupportReply { + /// The agent's text response. + pub response: String, + /// The original channel context so the channel knows where to route the reply. + pub channel_context: Option, +} + +/// A channel over which support requests arrive and replies are delivered. +/// +/// Implementations might poll a database table, listen on a Nostr relay, +/// read from a message queue, or monitor an IMAP inbox. +#[async_trait] +pub trait SupportChannel: Send + Sync { + /// Wait for the next inbound support request. + /// Blocks until a request is available, or returns `None` if the channel + /// has been shut down. + async fn next_request(&self) -> Option; + + /// Deliver a reply back to the customer through this channel. + async fn send_reply(&self, reply: SupportReply) -> Result<()>; + + /// Additional channel-specific instructions appended to the system prompt. + /// E.g. email channels might request plain-text formatting and sign-offs; + /// a Nostr channel might want short messages with emoji. + fn channel_prompt(&self) -> &str { + "" + } +} diff --git a/lnvps_agent/src/conversation.rs b/lnvps_agent/src/conversation.rs new file mode 100644 index 00000000..542b19b3 --- /dev/null +++ b/lnvps_agent/src/conversation.rs @@ -0,0 +1,319 @@ +use anyhow::Result; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::PathBuf; +use tokio::sync::RwLock; + +/// A single exchange between the user and the agent. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ConversationEntry { + /// The sender's message. + pub user_message: String, + /// The agent's final response (tool-call chains are not stored). + pub agent_response: String, + /// Unix timestamp (seconds) when the exchange happened. + pub timestamp: i64, +} + +/// Full conversation state for a single sender. +/// +/// When history is compacted, `summary` contains a condensed +/// narrative of all prior exchanges and `entries` is reset to empty. +/// New exchanges after compaction accumulate in `entries` until the +/// next compaction. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct SenderConversation { + /// LLM-generated summary of all compacted exchanges. + pub summary: Option, + /// Raw exchanges that haven't been compacted yet. + pub entries: Vec, +} + +/// Trait for persistent conversation storage. +#[async_trait] +pub trait ConversationStore: Send + Sync { + /// Load full conversation state for a sender. + async fn load(&self, sender_id: &str) -> SenderConversation; + + /// Append an entry for a sender. + async fn append(&self, sender_id: &str, entry: ConversationEntry) -> Result<()>; + + /// Truncate entries (not summary) to the most recent `keep` entries. + async fn trim(&self, sender_id: &str, keep: usize) -> Result<()>; + + /// Replace the entire conversation state for a sender (used after compaction). + async fn save(&self, sender_id: &str, conversation: SenderConversation) -> Result<()>; +} + +/// Normalize a sender_id into a cache key / filename. +/// Lowercases and replaces non-alphanumeric chars so that +/// `Kieran@Harkin.me` and `kieran@harkin.me` and `kieran_harkin.me` +/// all map to the same key. +fn normalize_key(sender_id: &str) -> String { + sender_id + .to_lowercase() + .chars() + .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) + .collect() +} + +/// JSON-file-backed conversation store. +/// +/// Each sender gets a file at `/.json`. +pub struct JsonFileStore { + root: PathBuf, + /// In-memory cache, periodically flushed to disk. + /// Keys are always normalized via `normalize_key`. + cache: RwLock>, +} + +impl JsonFileStore { + pub async fn new(root: PathBuf) -> Result { + tokio::fs::create_dir_all(&root).await?; + + let mut cache = HashMap::new(); + let mut entries = tokio::fs::read_dir(&root).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.extension().is_some_and(|e| e == "json") { + let key = path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or_default() + .to_string(); + match tokio::fs::read_to_string(&path).await { + Ok(data) => { + // Support both legacy format (Vec) and new SenderConversation + if let Ok(conv) = serde_json::from_str::(&data) { + log::info!( + "Loaded history for {}: summary={}, {} entries", + key, + conv.summary.is_some(), + conv.entries.len() + ); + cache.insert(key, conv); + } else if let Ok(legacy) = serde_json::from_str::>(&data) + { + log::info!( + "Loaded legacy history for {}: {} entries", + key, + legacy.len() + ); + cache.insert( + key, + SenderConversation { + summary: None, + entries: legacy, + }, + ); + } else { + log::warn!("Failed to parse history for {}", key); + } + } + Err(e) => { + log::warn!("Failed to read history for {}: {}", key, e); + } + } + } + } + + Ok(Self { + root, + cache: RwLock::new(cache), + }) + } + + async fn flush(&self, key: &str, conv: &SenderConversation) -> Result<()> { + let path = self.root.join(format!("{}.json", key)); + let json = serde_json::to_string_pretty(conv)?; + tokio::fs::write(&path, json).await?; + Ok(()) + } +} + +#[async_trait] +impl ConversationStore for JsonFileStore { + async fn load(&self, sender_id: &str) -> SenderConversation { + let key = normalize_key(sender_id); + let cache = self.cache.read().await; + cache.get(&key).cloned().unwrap_or_default() + } + + async fn append(&self, sender_id: &str, entry: ConversationEntry) -> Result<()> { + let key = normalize_key(sender_id); + let mut cache = self.cache.write().await; + let conv = cache.entry(key.clone()).or_default(); + conv.entries.push(entry); + let snapshot = conv.clone(); + drop(cache); + + self.flush(&key, &snapshot).await + } + + async fn trim(&self, sender_id: &str, keep: usize) -> Result<()> { + let key = normalize_key(sender_id); + let mut cache = self.cache.write().await; + let Some(conv) = cache.get_mut(&key) else { + return Ok(()); + }; + if conv.entries.len() <= keep { + return Ok(()); + } + let drain = conv.entries.len() - keep; + conv.entries.drain(0..drain); + let snapshot = conv.clone(); + drop(cache); + + self.flush(&key, &snapshot).await + } + + async fn save(&self, sender_id: &str, conversation: SenderConversation) -> Result<()> { + let key = normalize_key(sender_id); + let mut cache = self.cache.write().await; + cache.insert(key.clone(), conversation.clone()); + drop(cache); + + self.flush(&key, &conversation).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn entry(user: &str, agent: &str) -> ConversationEntry { + ConversationEntry { + user_message: user.to_string(), + agent_response: agent.to_string(), + timestamp: 1700000000, + } + } + + #[tokio::test] + async fn append_and_load() { + let dir = TempDir::new().unwrap(); + let store = JsonFileStore::new(dir.path().to_path_buf()).await.unwrap(); + + store.append("alice@example.com", entry("hello", "hi there")).await.unwrap(); + store.append("alice@example.com", entry("vm status?", "running")).await.unwrap(); + + let conv = store.load("alice@example.com").await; + assert_eq!(conv.entries.len(), 2); + assert!(conv.summary.is_none()); + assert_eq!(conv.entries[0].user_message, "hello"); + } + + #[tokio::test] + async fn empty_load_returns_default() { + let dir = TempDir::new().unwrap(); + let store = JsonFileStore::new(dir.path().to_path_buf()).await.unwrap(); + + let conv = store.load("nobody@example.com").await; + assert!(conv.entries.is_empty()); + assert!(conv.summary.is_none()); + } + + #[tokio::test] + async fn trim_removes_oldest() { + let dir = TempDir::new().unwrap(); + let store = JsonFileStore::new(dir.path().to_path_buf()).await.unwrap(); + + for i in 0..5 { + store.append("bob", entry(&format!("msg{}", i), &format!("resp{}", i))).await.unwrap(); + } + + store.trim("bob", 2).await.unwrap(); + + let conv = store.load("bob").await; + assert_eq!(conv.entries.len(), 2); + assert_eq!(conv.entries[0].user_message, "msg3"); + assert_eq!(conv.entries[1].user_message, "msg4"); + } + + #[tokio::test] + async fn save_and_load_with_summary() { + let dir = TempDir::new().unwrap(); + let store = JsonFileStore::new(dir.path().to_path_buf()).await.unwrap(); + + store.append("carol", entry("msg1", "resp1")).await.unwrap(); + store.append("carol", entry("msg2", "resp2")).await.unwrap(); + + // Compact: save summary, clear entries + let conv = SenderConversation { + summary: Some("Carol asked about VM status. She has a running VM on Proxmox.".to_string()), + entries: vec![], + }; + store.save("carol", conv).await.unwrap(); + + let loaded = store.load("carol").await; + assert_eq!(loaded.summary.unwrap(), "Carol asked about VM status. She has a running VM on Proxmox."); + assert!(loaded.entries.is_empty()); + + // New exchange after compaction + store.append("carol", entry("how do I extend?", "call extend_vm")).await.unwrap(); + let loaded = store.load("carol").await; + assert_eq!(loaded.entries.len(), 1); + } + + #[tokio::test] + async fn persists_across_sessions() { + let dir = TempDir::new().unwrap(); + let path = dir.path().to_path_buf(); + + let store1 = JsonFileStore::new(path.clone()).await.unwrap(); + store1.append("dave", entry("hello", "hi")).await.unwrap(); + + let store2 = JsonFileStore::new(path).await.unwrap(); + let conv = store2.load("dave").await; + assert_eq!(conv.entries.len(), 1); + assert_eq!(conv.entries[0].user_message, "hello"); + } + + #[tokio::test] + async fn legacy_format_loads() { + let dir = TempDir::new().unwrap(); + let path = dir.path().to_path_buf(); + + // Write raw Vec (legacy format) + let legacy: Vec = vec![entry("msg", "resp")]; + let file = path.join("legacy_user.json"); + let _ = tokio::fs::create_dir_all(&path).await; + let _ = tokio::fs::write(&file, serde_json::to_string(&legacy).unwrap()).await; + + let store = JsonFileStore::new(path).await.unwrap(); + let conv = store.load("legacy_user").await; + assert_eq!(conv.entries.len(), 1); + assert!(conv.summary.is_none()); + } + + #[tokio::test] + async fn email_case_insensitive() { + let dir = TempDir::new().unwrap(); + let store = JsonFileStore::new(dir.path().to_path_buf()).await.unwrap(); + + store.append("Kieran@Harkin.me", entry("msg1", "resp1")).await.unwrap(); + + // Same email, different case — should find the same data + let conv = store.load("kieran@harkin.me").await; + assert_eq!(conv.entries.len(), 1); + assert_eq!(conv.entries[0].user_message, "msg1"); + + // Append under lowercase key + store.append("kieran@harkin.me", entry("msg2", "resp2")).await.unwrap(); + + // Check under original case — should see both + let conv = store.load("Kieran@Harkin.me").await; + assert_eq!(conv.entries.len(), 2); + } + + #[test] + fn normalize_key_works() { + assert_eq!(normalize_key("kieran@harkin.me"), "kieran_harkin_me"); + assert_eq!(normalize_key("Kieran@Harkin.me"), "kieran_harkin_me"); + assert_eq!(normalize_key("KIERAN@HARKIN.ME"), "kieran_harkin_me"); + assert_eq!(normalize_key("bob"), "bob"); + assert_eq!(normalize_key("user+tag@example.com"), "user_tag_example_com"); + } +} diff --git a/lnvps_agent/src/lib.rs b/lnvps_agent/src/lib.rs new file mode 100644 index 00000000..952c11f2 --- /dev/null +++ b/lnvps_agent/src/lib.rs @@ -0,0 +1,7 @@ +pub mod agent; +pub mod api_client; +pub mod channel; +pub mod conversation; +pub mod nip98; +pub mod settings; +pub mod tools; diff --git a/lnvps_agent/src/main.rs b/lnvps_agent/src/main.rs new file mode 100644 index 00000000..0d1645b6 --- /dev/null +++ b/lnvps_agent/src/main.rs @@ -0,0 +1,61 @@ +use anyhow::Result; +use log::info; +use std::path::PathBuf; +use std::sync::Arc; + +use lnvps_agent::agent::SupportAgent; +use lnvps_agent::api_client::ApiClient; +use lnvps_agent::conversation::JsonFileStore; +use lnvps_agent::settings::Settings; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let config_path = std::env::var("LNVPS_AGENT_CONFIG").ok().map(PathBuf::from); + + let settings = Settings::load(config_path)?; + info!("LNVPS support agent starting..."); + info!("Admin API URL: {}", settings.admin_api_url); + info!("OpenAI URL: {}", settings.openai.base_url); + info!("Model: {}", settings.openai.model); + + let history_path = settings + .conversation_history_path + .clone() + .unwrap_or_else(|| PathBuf::from("conversation_history")); + info!("Conversation history: {}", history_path.display()); + + let store = Arc::new(JsonFileStore::new(history_path).await?); + let api_client = Arc::new(ApiClient::new(&settings)?); + let agent = SupportAgent::new(api_client.clone(), settings.clone(), store); + + if let Some(ref kind1_cfg) = settings.kind1 { + info!( + "Starting kind1 Nostr support channel: relays={:?}, mentions={:?}", + kind1_cfg.relays, kind1_cfg.mention_pubkeys + ); + let channel = Box::new(lnvps_agent::channel::kind1::Kind1SupportChannel::new( + kind1_cfg.clone(), + &settings.nsec, + api_client.clone(), + ).await?); + agent.run_loop(channel).await; + } else if let Some(ref email_cfg) = settings.email { + info!( + "Starting email support channel: {} / {}", + email_cfg.imap_server, email_cfg.imap_username + ); + let channel = Box::new(lnvps_agent::channel::email::EmailSupportChannel::new( + email_cfg.clone(), + api_client.clone(), + )); + agent.run_loop(channel).await; + } else { + info!("No support channel configured — exiting."); + } + + Ok(()) +} diff --git a/lnvps_agent/src/nip98.rs b/lnvps_agent/src/nip98.rs new file mode 100644 index 00000000..73797637 --- /dev/null +++ b/lnvps_agent/src/nip98.rs @@ -0,0 +1,41 @@ +use anyhow::{Context, Result}; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64; +use nostr::prelude::*; + +/// Generates fresh NIP-98 auth tokens from an nsec key. +pub struct Nip98Signer { + keys: Keys, +} + +impl Nip98Signer { + pub fn from_nsec(nsec: &str) -> Result { + let sk = SecretKey::from_bech32(nsec).context("Invalid nsec key")?; + Ok(Self { + keys: Keys::new(sk), + }) + } + + /// Create a signed NIP-98 HTTP auth event (Kind 27235) for the given URL + /// and method, then base64-encode the entire event JSON for the + /// `Authorization: Nostr ` header. + pub fn sign_auth_token(&self, url: &str, method: &str) -> Result { + let url_tag = Tag::custom( + TagKind::Custom(std::borrow::Cow::Borrowed("u")), + vec![url.to_string()], + ); + let method_tag = Tag::custom( + TagKind::Custom(std::borrow::Cow::Borrowed("method")), + vec![method.to_uppercase()], + ); + + let event = EventBuilder::new(Kind::HttpAuth, "") + .tag(url_tag) + .tag(method_tag) + .sign_with_keys(&self.keys) + .context("Failed to sign NIP-98 event")?; + + let json = event.as_json(); + Ok(BASE64.encode(json.as_bytes())) + } +} diff --git a/lnvps_agent/src/settings.rs b/lnvps_agent/src/settings.rs new file mode 100644 index 00000000..4f942638 --- /dev/null +++ b/lnvps_agent/src/settings.rs @@ -0,0 +1,148 @@ +use anyhow::{Result, anyhow}; +use config::Config; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct Settings { + /// Listen address for the agent HTTP server + pub listen: Option, + + /// Base URL of the LNVPS admin API + pub admin_api_url: String, + + /// Base URL of the LNVPS user API + pub user_api_url: String, + + /// Nsec key (bech32 `nsec1...`) used to sign NIP-98 auth events and for Nostr channel operations. + /// Fresh tokens are generated per API call — no stale pre-encoded event needed. + pub nsec: String, + + /// OpenAI-compatible API configuration + pub openai: OpenAiConfig, + + /// Support agent system prompt (optional override) + pub system_prompt: Option, + + /// Email channel configuration (IMAP polling + SMTP replies) + pub email: Option, + + /// Kind 1 Nostr channel configuration (mention-based support via kind 1 replies) + pub kind1: Option, + + /// Path to conversation history storage directory + pub conversation_history_path: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct EmailConfig { + /// IMAP server host:port (e.g., "imap.gmail.com:993") + pub imap_server: String, + /// IMAP username / login + pub imap_username: String, + /// IMAP password or app-specific password + pub imap_password: String, + /// IMAP mailbox to watch (e.g., "INBOX") + pub imap_mailbox: Option, + + /// SMTP server host:port (e.g., "smtp.gmail.com:587") + pub smtp_server: String, + /// SMTP username + pub smtp_username: String, + /// SMTP password + pub smtp_password: String, + /// From address for replies (e.g., "support@lnvps.io") + pub smtp_from: String, + /// Custom from name (e.g., "LNVPS Support") + pub smtp_from_name: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct Kind1Config { + /// Nostr relays to connect to (e.g., ["wss://relay.damus.io"]) + pub relays: Vec, + + /// Hex pubkey(s) of accounts whose mentions trigger support responses. + /// When set, only mentions of these pubkeys are processed. + /// If empty/omitted, mentions of the bot's own pubkey (derived from the + /// top-level nsec) are used. + pub mention_pubkeys: Option>, + + /// Poll interval in seconds between checking for new mentions + pub poll_interval_secs: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct OpenAiConfig { + /// Base URL of the OpenAI-compatible API (e.g., http://localhost:11434/v1 for Ollama) + pub base_url: String, + + /// API key (not needed for Ollama but required by some providers) + pub api_key: Option, + + /// Model name to use (e.g., "llama3.2", "gpt-4o") + pub model: String, + + /// Max tokens for the response + pub max_tokens: Option, +} + +impl Settings { + pub fn load(path: Option) -> Result { + let builder = Config::builder(); + + // Default configuration + let builder = builder + .set_default("listen", "0.0.0.0:8080")? + .set_default("openai.max_tokens", 2048u32)?; + + #[cfg(debug_assertions)] + let builder = { + let default_path = std::env::current_dir()?.join("settings.yaml"); + if default_path.exists() { + builder.add_source(config::File::from(default_path).required(false)) + } else { + builder + } + }; + + // Load from explicit path + let builder = if let Some(p) = path { + builder.add_source(config::File::from(p).required(true)) + } else { + builder + }; + + let config = builder + .add_source( + config::Environment::with_prefix("LNVPS_AGENT") + .separator("__") + .try_parsing(true), + ) + .build()?; + + let settings: Settings = config.try_deserialize()?; + settings.validate()?; + Ok(settings) + } + + fn validate(&self) -> Result<()> { + if self.admin_api_url.is_empty() { + return Err(anyhow!("admin_api_url must not be empty")); + } + if self.nsec.is_empty() { + return Err(anyhow!("nsec must not be empty")); + } + if self.openai.base_url.is_empty() { + return Err(anyhow!("openai.base_url must not be empty")); + } + if self.openai.model.is_empty() { + return Err(anyhow!("openai.model must not be empty")); + } + Ok(()) + } +} diff --git a/lnvps_agent/src/tools/mod.rs b/lnvps_agent/src/tools/mod.rs new file mode 100644 index 00000000..760107c6 --- /dev/null +++ b/lnvps_agent/src/tools/mod.rs @@ -0,0 +1,179 @@ +use async_openai::types::FunctionObject; +use serde_json::json; + +/// All tools the support agent has access to, defined as OpenAI function specs. +/// These are user-scoped — no tool accepts a pubkey or user_id parameter; +/// the executor is already bound to the user identified by the support channel. +pub fn support_tools() -> Vec { + vec![ + tool( + "get_my_account", + "Get the current user's account information: billing details, contact preferences, email verification status, and NWC auto-renewal status.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "list_my_vms", + "List all VMs belonging to the current user. Shows VM IDs, names, status, specs, IPs, expiry dates, and region info.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "get_vm_details", + "Get detailed information about a specific VM owned by the current user. Includes host, region, IP assignments, full specs, payment status, and exact expiry date.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID" + } + }, + "required": ["vm_id"] + }), + ), + tool( + "list_vm_payments", + "List all payments for a specific VM owned by the current user. Shows amounts, currencies, paid/unpaid status, dates, and payment methods.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID" + } + }, + "required": ["vm_id"] + }), + ), + tool( + "list_vm_history", + "List the activity history for a specific VM. Shows creation, start/stop events, reinstallations, upgrades, and configuration changes with timestamps.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID" + } + }, + "required": ["vm_id"] + }), + ), + tool( + "extend_vm", + "Extend (renew) a VM owned by the current user for a certain number of days. Use this when a customer asks for extra time or a manual renewal.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID to extend" + }, + "days": { + "type": "integer", + "description": "Number of days to extend the VM for" + } + }, + "required": ["vm_id", "days"] + }), + ), + tool( + "refund_vm", + "Process a refund for a VM. This is irreversible — always confirm with the user before executing. Only works on VMs owned by the current user.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID to refund" + } + }, + "required": ["vm_id"] + }), + ), + tool( + "delete_vm", + "Delete a VM owned by the current user. Use this only when explicitly requested and after confirming with the customer.", + json!({ + "type": "object", + "properties": { + "vm_id": { + "type": "integer", + "description": "The numeric VM ID to delete" + } + }, + "required": ["vm_id"] + }), + ), + tool( + "list_regions", + "List all available hosting regions with their names and IDs. Use this to answer questions about where VMs can be provisioned or where an existing VM is located.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "list_templates", + "List all available VM templates with specifications and pricing. Shows CPU, memory, storage, pricing plans, and which region each template belongs to. Use this to answer questions about available plans and pricing.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "list_os_images", + "List all available operating system images that can be installed on VMs. Shows image names, versions, OS types, and supported platforms.", + json!({ + "type": "object", + "properties": {} + }), + ), + ] +} + +/// Tools available to non-customer/general support requests. +/// Subset of support_tools that don't require an authenticated user. +pub fn public_tools() -> Vec { + vec![ + tool( + "list_regions", + "List all available hosting regions with their names and IDs. Use this to answer questions about where VMs can be provisioned.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "list_templates", + "List all available VM templates with specifications and pricing. Shows CPU, memory, storage, pricing plans, and which region each template belongs to. Use this to answer questions about available plans and pricing.", + json!({ + "type": "object", + "properties": {} + }), + ), + tool( + "list_os_images", + "List all available operating system images that can be installed on VMs. Shows image names, versions, OS types, and supported platforms.", + json!({ + "type": "object", + "properties": {} + }), + ), + ] +} + +fn tool(name: &str, description: &str, parameters: serde_json::Value) -> FunctionObject { + use async_openai::types::FunctionObjectArgs; + FunctionObjectArgs::default() + .name(name) + .description(description) + .parameters(parameters) + .build() + .expect("valid tool definition") +}