diff --git a/cashu/wallet/cli/cli.py b/cashu/wallet/cli/cli.py index c42df8bb0..18fa65250 100644 --- a/cashu/wallet/cli/cli.py +++ b/cashu/wallet/cli/cli.py @@ -297,7 +297,8 @@ async def pay( print(f"Amount: {wallet.unit.str(pr.a)} ({pr.a} {pr.u})") if pr.m and wallet.url not in pr.m: - print(f"Error: Current mint {wallet.url} is not accepted by the receiver.") + print( + f"Error: Current mint {wallet.url} is not accepted by the receiver.") print(f"Accepted mints: {pr.m}") return @@ -364,7 +365,8 @@ async def pay( if post_transports: transport = post_transports[0] url = transport.a - print(f"Sending token via POST to {url}...", end="", flush=True) + print( + f"Sending token via POST to {url}...", end="", flush=True) token_obj = deserialize_token_from_string(token) assert isinstance( @@ -509,12 +511,13 @@ async def invoice( wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() await print_balance(ctx) - amount = int(amount * 100) if wallet.unit in [Unit.usd, Unit.eur] else int(amount) + amount = int( + amount * 100) if wallet.unit in [Unit.usd, Unit.eur] else int(amount) print(f"Requesting invoice for {wallet.unit.str(amount)}.") # in case the user wants a specific split, we create a list of amounts optional_split = None if split: - assert amount % split == 0, "split must be divisor or amount" + assert amount % split == 0, "split must be divisor of amount" assert amount >= split, "split must smaller or equal amount" n_splits = amount // split optional_split = [split] * n_splits @@ -672,7 +675,8 @@ async def swap(ctx: Context): if incoming_wallet.url == outgoing_wallet.url: raise Exception("mints for swap have to be different") - amount = int(input(f"Enter amount to swap in {incoming_wallet.unit.name}: ")) + amount = int( + input(f"Enter amount to swap in {incoming_wallet.unit.name}: ")) assert amount > 0, "amount is not positive" # request invoice from incoming mint @@ -731,7 +735,8 @@ async def balance(ctx: Context, verbose): print("") for i, (k, v) in enumerate(unit_balances.items()): unit = k - print(f"Unit {i+1} ({unit}) - Balance: {unit.str(int(v['available']))}") + print( + f"Unit {i+1} ({unit}) - Balance: {unit.str(int(v['available']))}") print("") if verbose: # show balances per keyset @@ -832,7 +837,8 @@ async def send_command( force_swap: bool, ): wallet: Wallet = ctx.obj["WALLET"] - amount = int(amount * 100) if wallet.unit in [Unit.usd, Unit.eur] else int(amount) + amount = int( + amount * 100) if wallet.unit in [Unit.usd, Unit.eur] else int(amount) await send( wallet, amount=amount, @@ -1003,7 +1009,8 @@ async def pending(ctx: Context, legacy, number: int, offset: int): reserved_proofs = await get_reserved_proofs(wallet.db) if len(reserved_proofs): print("--------------------------\n") - sorted_proofs = sorted(reserved_proofs, key=itemgetter("send_id"), reverse=True) # type: ignore + sorted_proofs = sorted(reserved_proofs, key=itemgetter( + "send_id"), reverse=True) # type: ignore if number: number += offset for i, (key, value) in islice( @@ -1045,6 +1052,105 @@ async def pending(ctx: Context, legacy, number: int, offset: int): print("To receive all pending tokens use: cashu receive -a") +@cli.command("proofs", help="List raw proofs in wallet.") +@click.option( + "--all", "-a", default=False, is_flag=True, help="Include reserved proofs." +) +@click.option( + "--no-dleq", default=False, is_flag=True, help="Do not include DLEQ proofs." +) +@click.option( + "--indent", + "-i", + default=2, + help="Number of spaces to indent JSON with.", + type=int, +) +@click.option( + "--keyset", + "-k", + default=None, + help="Filter by keyset ID.", + type=str, +) +@click.option( + "--order", + "-o", + default=None, + help="Sort the proofs by amount in ascending (asc) or descending (desc) order.", + type=str, +) +@click.pass_context +@coro +async def proofs( + ctx: Context, all: bool, no_dleq: bool, indent: int, keyset: str, order: str +): + wallet: Wallet = ctx.obj["WALLET"] + await wallet.load_proofs() + + # Get proofs based on --all flag + if all: + # Include both available and reserved proofs + proofs = wallet.proofs + else: + # Only include available (non-reserved) proofs + proofs = [p for p in wallet.proofs if not p.reserved] + + # Filter by keyset if specified + if keyset: + proofs = [p for p in proofs if p.id == keyset] + + if not proofs: + if keyset: + print(f"No proofs found for keyset: {keyset}") + else: + print("No proofs found.") + return + + # Sort the proofs if ordering is specified. + if order is not None: + if order == "asc": + proofs = sorted(proofs, key=lambda p: p.amount) + elif order == "desc": + proofs = sorted(proofs, key=lambda p: p.amount, reverse=True) + else: + print( + f"Unidentified order argument: {order}. Valid arguments are 'asc' and 'desc'." + ) + return + + # Convert proofs to dictionary format + include_dleq = not no_dleq + + proofs_dict = [] + for proof in proofs: + proof_dict = { + "id": proof.id, + "amount": proof.amount, + "secret": proof.secret, + "C": proof.C, + } + + if include_dleq and proof.dleq: + proof_dict["dleq"] = { + "e": proof.dleq.e, + "s": proof.dleq.s, + "r": proof.dleq.r, + } + + if proof.witness: + proof_dict["witness"] = proof.witness + + proofs_dict.append(proof_dict) + + # Print as JSON + proofs_json = json.dumps( + proofs_dict, + indent=indent, + ) + print(proofs_json) + + @cli.group(cls=NaturalOrderGroup) def lock(): """Generate receiving locks.""" @@ -1317,7 +1423,8 @@ async def info(ctx: Context, mint: bool, mnemonic: bool, reload: bool): if mint_info: print(f" - Mint name: {mint_info['name']}") if mint_info.get("description"): - print(f" - Description: {mint_info['description']}") + print( + f" - Description: {mint_info['description']}") if mint_info.get("description_long"): print( f" - Long description: {mint_info['description_long']}" @@ -1329,7 +1436,8 @@ async def info(ctx: Context, mint: bool, mnemonic: bool, reload: bool): if mint_info.get("version"): print(f" - Version: {mint_info['version']}") if mint_info.get("motd"): - print(f" - Message of the day: {mint_info['motd']}") + print( + f" - Message of the day: {mint_info['motd']}") if mint_info.get("time"): print(f" - Server time: {mint_info['time']}") if mint_info.get("nuts"): @@ -1473,7 +1581,8 @@ async def auth(ctx: Context, mint: bool, force: bool, password: bool): if mint: new_proofs = await auth_wallet.mint_blind_auth() - print(f"Minted {auth_wallet.unit.str(sum_proofs(new_proofs))} auth tokens.") + print( + f"Minted {auth_wallet.unit.str(sum_proofs(new_proofs))} auth tokens.") @cli.group(cls=NaturalOrderGroup) diff --git a/tests/wallet/test_wallet_cli.py b/tests/wallet/test_wallet_cli.py index c71611006..81ca22f68 100644 --- a/tests/wallet/test_wallet_cli.py +++ b/tests/wallet/test_wallet_cli.py @@ -685,3 +685,213 @@ def test_send_with_lock_and_refund(mint, cli_prefix): assert "cashuB" in token_str, "output does not have a token" token = TokenV4.deserialize(token_str).to_tokenv3() assert fake_refund_pubkey in token.token[0].proofs[0].secret + + +def test_proofs_basic(cli_prefix): + """Test basic proofs command functionality""" + runner = CliRunner(mix_stderr=False) # Separate stdout/stderr, as we want to verify only stdout + + # First create some tokens like other tests do + result = runner.invoke(cli, [*cli_prefix, "invoice", "64"]) + assert result.exception is None + + # Verify wallet has balance + wallet = asyncio.run(init_wallet()) + assert wallet.available_balance >= 64 + + # Now test the proofs command + result = runner.invoke(cli, [*cli_prefix, "proofs"]) + assert result.exception is None + assert result.exit_code == 0 + + # JSON output should be clean in stdout (debug logs go to stderr) + json_output = result.stdout.strip() + + # Should be valid JSON + import json + proofs = json.loads(json_output) + assert isinstance(proofs, list) + assert len(proofs) > 0, "Should have proofs after minting tokens" + print(f"Found {len(proofs)} proofs") + + # Each proof should have the expected fields + for proof in proofs: + assert sorted(proof.keys()) == ['C', 'amount', 'dleq', 'id', 'secret'] + + +def test_proofs_json_structure(cli_prefix): + """Test that proofs have correct JSON structure""" + runner = CliRunner(mix_stderr=False) + + # First create some tokens + result = runner.invoke(cli, [*cli_prefix, "invoice", "64"]) + assert result.exception is None + + # Verify wallet has balance + wallet = asyncio.run(init_wallet()) + assert wallet.available_balance >= 64 + + # Test proofs command + result = runner.invoke(cli, [*cli_prefix, "proofs"]) + assert result.exception is None + assert result.exit_code == 0 + + # Parse JSON from stdout + import json + + proofs = json.loads(result.stdout.strip()) + assert len(proofs) > 0, "Should have proofs to test structure" + + for proof in proofs: + # Check field types, the required fields according to NUT-00 spec + assert isinstance(proof["id"], str), "'id' should be string" + assert isinstance(proof["amount"], int), "'amount' should be integer" + assert isinstance(proof["secret"], str), "'secret' should be string" + assert isinstance(proof["C"], str), "'C' should be string" + assert "dleq" in proof.keys() # will not be present if '--no-dleq' is passed + + +def test_proofs_with_no_dleq_flag(cli_prefix): + """Test --no-dleq flag excludes DLEQ proofs""" + runner = CliRunner(mix_stderr=False) + + # First create some tokens + result = runner.invoke(cli, [*cli_prefix, "invoice", "64"]) + assert result.exception is None + + # Verify wallet has balance + wallet = asyncio.run(init_wallet()) + assert wallet.available_balance >= 64 + + # Get proofs with DLEQ + result_with_dleq = runner.invoke(cli, [*cli_prefix, "proofs"]) + assert result_with_dleq.exception is None + + # Get proofs without DLEQ + result_no_dleq = runner.invoke(cli, [*cli_prefix, "proofs", "--no-dleq"]) + assert result_no_dleq.exception is None + assert result_no_dleq.exit_code == 0 + + # Parse JSON from both outputs + import json + + proofs_with_dleq = json.loads(result_with_dleq.stdout.strip()) + proofs_no_dleq = json.loads(result_no_dleq.stdout.strip()) + + # Should have same number of proofs + assert len(proofs_with_dleq) == len(proofs_no_dleq) + assert len(proofs_with_dleq) > 0, "Should have proofs to test" + + # No DLEQ version should not have dleq field + for proof in proofs_no_dleq: + # Check field types, the required fields according to NUT-00 spec + assert isinstance(proof["id"], str), "'id' should be string" + assert isinstance(proof["amount"], int), "'amount' should be integer" + assert isinstance(proof["secret"], str), "'secret' should be string" + assert isinstance(proof["C"], str), "'C' should be string" + assert ( + "dleq" not in proof + ), "proof should not contain 'dleq' field with --no-dleq" + + +def test_proofs_with_keyset_filter(cli_prefix): + """Test --keyset flag filters proofs by keyset ID""" + runner = CliRunner(mix_stderr=False) + + # First create some tokens + result = runner.invoke(cli, [*cli_prefix, "invoice", "64"]) + assert result.exception is None + + # Verify wallet has balance + wallet = asyncio.run(init_wallet()) + assert wallet.available_balance >= 64 + + # Get all proofs to find a keyset ID + result_all = runner.invoke(cli, [*cli_prefix, "proofs"]) + assert result_all.exception is None + + import json + + all_proofs = json.loads(result_all.stdout.strip()) + assert len(all_proofs) > 0, "Should have proofs to test keyset filtering" + + # Get a keyset ID from first proof + test_keyset = all_proofs[0]["id"] + + # Filter by that keyset + result_filtered = runner.invoke( + cli, [*cli_prefix, "proofs", "--keyset", test_keyset] + ) + assert result_filtered.exception is None + assert result_filtered.exit_code == 0 + + filtered_proofs = json.loads(result_filtered.stdout.strip()) + assert len(filtered_proofs) > 0 + + # All filtered proofs should have the same keyset ID + for proof in filtered_proofs: + assert ( + proof["id"] == test_keyset + ), f"proof has wrong keyset ID: {proof['id']} != {test_keyset}" + + # Filter with a non-existent keyset, to make sure nothing is returned + import secrets + + nonexistent_keyset = "00" + secrets.token_hex(7) # 16 hex chars (8 bytes) + result_filtered_again = runner.invoke( + cli, [*cli_prefix, "proofs", "--keyset", nonexistent_keyset] + ) + assert result_filtered_again.exception is None + assert result_filtered_again.exit_code == 0 + assert "No proofs found for keyset:" in result_filtered_again.stdout + + +def test_proofs_invalid_keyset(cli_prefix): + """Test --keyset with non-existent keyset ID""" + runner = CliRunner(mix_stderr=False) + result = runner.invoke(cli, [*cli_prefix, "proofs", "--keyset", "nonexistent"]) + + assert result.exception is None + assert result.exit_code == 0 + assert "No proofs found for keyset: nonexistent" in result.stdout + + +def test_proofs_with_all_flag(cli_prefix): + """Test --all flag includes reserved proofs""" + runner = CliRunner(mix_stderr=False) + + # Create some tokens first so we have proofs to list + result = runner.invoke(cli, [*cli_prefix, "invoice", "64"]) + assert result.exit_code == 0 + # Send some, in order to 'reserve' the tokens such that they are only included if --all is passed + result = runner.invoke(cli, [*cli_prefix, "send", "12"]) + assert result.exit_code == 0 + + # Get available proofs (default) + result_available = runner.invoke(cli, [*cli_prefix, "proofs", "--no-dleq"]) + assert result_available.exception is None + + # Get all proofs (including reserved) + result_all = runner.invoke(cli, [*cli_prefix, "proofs", "--all", "--no-dleq"]) + assert result_all.exception is None + assert result_all.exit_code == 0 + + import json + + available_proofs = json.loads(result_available.stdout.strip()) + all_proofs = json.loads(result_all.stdout.strip()) + + # All proofs should include at least the same number as available proofs + assert len(all_proofs) > len( + available_proofs + ), "--all should have more proofs, as it includes the reserved proofs" + + print(f"Available proofs: {len(available_proofs)}, All proofs: {len(all_proofs)}") + + # All proofs in available should be in all_proofs + available_secrets = {proof["secret"] for proof in available_proofs} + all_secrets = {proof["secret"] for proof in all_proofs} + + assert available_secrets.issubset( + all_secrets + ), "all available proofs should be included in --all"