Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cashu/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def from_row(cls, row: Row):
paid_time = int(row["paid_time"]) if row["paid_time"] else None
issued_time = int(row["issued_time"]) if "issued_time" in row.keys() and row["issued_time"] else None
last_checked = int(row["last_checked"]) if "last_checked" in row.keys() and row["last_checked"] else None
expiry = int(row["expiry"]) if "expiry" in row.keys() and row["expiry"] else None
except Exception:
# POSTGRES: row is datetime.datetime
created_time = (
Expand All @@ -452,6 +453,9 @@ def from_row(cls, row: Row):
last_checked = (
int(row["last_checked"].timestamp()) if "last_checked" in row.keys() and row["last_checked"] else None
)
expiry = (
int(row["expiry"].timestamp()) if "expiry" in row.keys() and row["expiry"] else None
)
return cls(
quote=row["quote"],
method=row["method"],
Expand All @@ -464,6 +468,7 @@ def from_row(cls, row: Row):
paid_time=paid_time,
issued_time=issued_time,
last_checked=last_checked,
expiry=expiry,
pubkey=row["pubkey"] if "pubkey" in row.keys() else None,
privkey=row["privkey"] if "privkey" in row.keys() else None,
)
Expand Down
10 changes: 10 additions & 0 deletions cashu/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ class MintLimits(MintSettings):
title="Websocket read timeout",
description="Timeout for reading from a websocket.",
)
mint_websocket_quote_expiry_check_interval: int = Field(
default=30,
gt=0,
title="Websocket quote expiry check interval",
description=(
"Interval in seconds at which the mint checks whether a websocket's"
" subscribed bolt11 mint quotes have expired while still unpaid, in"
" order to close idle connections that will never receive an update."
),
)


class FakeWalletSettings(MintSettings):
Expand Down
9 changes: 7 additions & 2 deletions cashu/mint/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ async def store_mint_quote(
await (conn or db).execute(
f"""
INSERT INTO {db.table_with_schema("mint_quotes")}
(quote, method, request, checking_id, unit, amount, state, created_time, paid_time, issued_time, last_checked, pubkey)
VALUES (:quote, :method, :request, :checking_id, :unit, :amount, :state, :created_time, :paid_time, :issued_time, :last_checked, :pubkey)
(quote, method, request, checking_id, unit, amount, state, created_time, paid_time, issued_time, last_checked, pubkey, expiry)
VALUES (:quote, :method, :request, :checking_id, :unit, :amount, :state, :created_time, :paid_time, :issued_time, :last_checked, :pubkey, :expiry)
""",
{
"quote": quote.quote,
Expand Down Expand Up @@ -640,6 +640,11 @@ async def store_mint_quote(
if quote.last_checked
else None,
"pubkey": quote.pubkey or "",
"expiry": db.to_timestamp(
db.timestamp_from_seconds(quote.expiry) or ""
)
if quote.expiry
else None,
},
)

Expand Down
70 changes: 70 additions & 0 deletions cashu/mint/events/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import contextlib
import json
import time
from typing import List, Union

from fastapi import WebSocket, WebSocketDisconnect
Expand Down Expand Up @@ -46,6 +48,19 @@ def __init__(self, websocket: WebSocket, db: Database, crud: LedgerCrud):
async def start(self):
await self.websocket.accept()

# Close connections that only subscribe to mint quotes which expire
# unpaid, so a client cannot hold the socket open until the read timeout.
expiry_monitor_task = asyncio.create_task(
self._monitor_expired_mint_quote_subscriptions()
)
try:
await self._receive_loop()
finally:
expiry_monitor_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await expiry_monitor_task

async def _receive_loop(self):
while True:
message = await asyncio.wait_for(
self.websocket.receive(),
Expand Down Expand Up @@ -174,6 +189,61 @@ async def _send_msg(
logger.debug(f"Sending websocket message: {data.model_dump_json()}")
await self.websocket.send_text(data.model_dump_json())

async def _monitor_expired_mint_quote_subscriptions(self) -> None:
"""Close the websocket once every subscribed bolt11 mint quote is in a
terminal state.

A subscription to a mint quote can only produce a finite set of state
transitions. Once all subscribed quotes are either paid, or have expired
while unpaid, no more useful events are expected. Proactively closing
such connections frees server resources and reflects that the
subscription is dead.
"""
interval = settings.mint_websocket_quote_expiry_check_interval
while True:
await asyncio.sleep(interval)
quote_filters = list(
self.subscriptions.get(
JSONRPCSubscriptionKinds.BOLT11_MINT_QUOTE, {}
)
)
if not quote_filters:
continue

now = int(time.time())
all_terminal = True
async with self.db_read.db.connect() as conn:
for quote_id in quote_filters:
mint_quote = await self.db_read.crud.get_mint_quote(
quote_id=quote_id, db=self.db_read.db, conn=conn
)
# A quote is terminal when it is paid, or when it is
# unpaid and has passed its expiry.
terminal = bool(
mint_quote
and (
mint_quote.paid
or (
mint_quote.unpaid
and mint_quote.expiry
and mint_quote.expiry <= now
)
)
)
if not terminal:
all_terminal = False
break

if all_terminal:
logger.info(
"Closing websocket: all subscribed mint quotes are terminal"
)
with contextlib.suppress(Exception):
await self.websocket.close(
code=1000, reason="mint quote subscription terminal"
)
return

def add_subscription(
self,
kind: JSONRPCSubscriptionKinds,
Expand Down
10 changes: 10 additions & 0 deletions cashu/mint/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,3 +1261,13 @@ async def m035_add_last_checked_to_mint_quotes(db: Database):
ADD COLUMN last_checked TIMESTAMP NULL
"""
)


async def m036_add_expiry_to_mint_quotes(db: Database):
"""
Add expiry column to mint_quotes table (melt_quotes already has expiry).
"""
async with db.connect() as conn:
await conn.execute(
f"ALTER TABLE {db.table_with_schema('mint_quotes')} ADD COLUMN expiry TIMESTAMP"
)
2 changes: 2 additions & 0 deletions tests/mint/test_mint.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ async def test_mint_quote_ttl_setting_overrides_invoice_expiry(ledger: Ledger):
after = int(time.time())
assert quote.expiry is not None
assert before + ttl <= quote.expiry <= after + ttl
loaded = await ledger.get_mint_quote(quote.quote)
assert loaded.expiry == quote.expiry
finally:
settings.mint_quote_ttl = None

Expand Down
121 changes: 119 additions & 2 deletions tests/mint/test_mint_websocket_protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import time
from contextlib import asynccontextmanager
from types import SimpleNamespace
from typing import Any, cast

Expand Down Expand Up @@ -28,6 +30,9 @@ def __init__(self, messages: list[dict[str, Any]] | None = None):
self.messages = messages or []
self.sent: list[str] = []
self.accepted = False
self.closed = False
self.close_code: int | None = None
self.close_reason: str | None = None

async def accept(self):
self.accepted = True
Expand All @@ -40,6 +45,11 @@ async def receive(self):
async def send_text(self, data: str):
self.sent.append(data)

async def close(self, code: int = 1000, reason: str | None = None):
self.closed = True
self.close_code = code
self.close_reason = reason


def _client_manager(websocket: FakeWebSocket) -> LedgerEventClientManager:
manager = LedgerEventClientManager(
Expand Down Expand Up @@ -172,8 +182,6 @@ async def test_init_subscription_sends_initial_snapshots():
)
proof_state = ProofState(Y="Y1", state=ProofSpentState.unspent)

from contextlib import asynccontextmanager

@asynccontextmanager
async def mock_connect():
yield object()
Expand Down Expand Up @@ -312,3 +320,112 @@ def test_remove_subscription_removes_all_filter_entries_for_subid(monkeypatch):
assert kind_map["quote-1"] == []
assert kind_map["quote-2"] == []
assert kind_map["quote-3"] == []


class _StopMonitor(Exception):
"""Sentinel used to break out of the monitor's infinite loop in tests."""


def _manager_with_mint_quote(
websocket: FakeWebSocket, mint_quote: MintQuote | None
) -> LedgerEventClientManager:
manager = _client_manager(websocket)
manager.subscriptions = {
JSONRPCSubscriptionKinds.BOLT11_MINT_QUOTE: {"quote-1": ["sub-1"]}
}

@asynccontextmanager
async def mock_connect():
yield object()

async def get_mint_quote(quote_id, db, conn=None):
return mint_quote if quote_id == "quote-1" else None

manager.db_read = cast(
Any,
SimpleNamespace(
db=SimpleNamespace(connect=mock_connect),
crud=SimpleNamespace(get_mint_quote=get_mint_quote, get_melt_quote=None),
get_proofs_states=None,
),
)
return manager


@pytest.mark.asyncio
@pytest.mark.parametrize(
"state, expiry",
[
(MintQuoteState.unpaid, lambda: int(time.time()) - 10),
(MintQuoteState.paid, lambda: int(time.time()) - 10),
],
)
async def test_monitor_closes_websocket_when_mint_quote_terminal(
monkeypatch, state, expiry
):
websocket = FakeWebSocket()
terminal_quote = MintQuote(
quote="quote-1",
method="bolt11",
request="lnbc1",
checking_id="check",
unit="sat",
amount=1,
state=state,
expiry=expiry(),
)
manager = _manager_with_mint_quote(websocket, terminal_quote)

async def no_sleep(_):
return None

monkeypatch.setattr("cashu.mint.events.client.asyncio.sleep", no_sleep)

await manager._monitor_expired_mint_quote_subscriptions()

assert websocket.closed is True
assert websocket.close_code == 1000
assert websocket.close_reason == "mint quote subscription terminal"


@pytest.mark.asyncio
@pytest.mark.parametrize(
"state, expiry",
[
# unpaid but not yet expired -> keep open
(MintQuoteState.unpaid, lambda: int(time.time()) + 3600),
# unpaid with no expiry data -> keep open
(MintQuoteState.unpaid, lambda: None),
],
)
async def test_monitor_keeps_websocket_open_when_not_expired_unpaid(
monkeypatch, state, expiry
):
websocket = FakeWebSocket()
mint_quote = MintQuote(
quote="quote-1",
method="bolt11",
request="lnbc1",
checking_id="check",
unit="sat",
amount=1,
state=state,
expiry=expiry(),
)
manager = _manager_with_mint_quote(websocket, mint_quote)

calls = {"n": 0}

async def sleep_stub(_):
calls["n"] += 1
# allow exactly one full check iteration, then break the loop
if calls["n"] >= 2:
raise _StopMonitor()
return None

monkeypatch.setattr("cashu.mint.events.client.asyncio.sleep", sleep_stub)

with pytest.raises(_StopMonitor):
await manager._monitor_expired_mint_quote_subscriptions()

assert websocket.closed is False
5 changes: 2 additions & 3 deletions tests/test_mint_rpc_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,15 @@ async def test_get_melt_quote(cli_prefix):

@pytest.mark.asyncio
async def test_get_quote_ttl_mint_quote(cli_prefix):
"""Mint quotes do not persist expiry, so GetQuoteTtl should return NOT_FOUND."""
"""Mint quotes persist expiry; GetQuoteTtl should return the expiry timestamp."""
wallet = await init_wallet()
mint_quote = await wallet.request_mint(100)
await asyncio.sleep(1)
runner = CliRunner()
# Use -- to prevent Click from interpreting quote_id (e.g. -JmE...) as options
result = runner.invoke(cli, [*cli_prefix, "get", "quote-ttl", "--", mint_quote.quote])
assert result.exception is None
# Mint quotes don't store expiry; the handler returns NOT_FOUND
assert "Error:" in result.output
assert "Quote expiry:" in result.output

@pytest.mark.asyncio
@pytest.mark.skipif(
Expand Down
Loading