Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdp/cdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def configure_from_json(
"""
with open(os.path.expanduser(file_path)) as file:
data = json.load(file)
api_key_name = data.get("name")
api_key_name = data.get("name") or data.get("id")
private_key = data.get("privateKey")
if not api_key_name:
raise InvalidConfigurationError("Invalid JSON format: Missing 'api_key_name'")
Expand Down
43 changes: 32 additions & 11 deletions cdp/cdp_api_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import base64
import random
import time
from urllib.parse import urlparse

import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
from urllib3.util import Retry

from cdp import __version__
Expand Down Expand Up @@ -172,17 +173,37 @@ def _build_jwt(self, url: str, method: str = "GET") -> str:
str: The JWT for the given API endpoint URL.

"""
private_key_obj = None
key_data = self.private_key.encode()
# Business change: Support both ECDSA and Ed25519 keys.
try:
private_key = serialization.load_pem_private_key(
self.private_key.encode(), password=None
)
if not isinstance(private_key, ec.EllipticCurvePrivateKey):
raise InvalidAPIKeyFormatError("Invalid key type")
except Exception as e:
raise InvalidAPIKeyFormatError("Could not parse the private key") from e
# Try loading as a PEM-encoded key (typically for ECDSA keys).
private_key_obj = serialization.load_pem_private_key(key_data, password=None)
except Exception:
# If PEM loading fails, assume the key is provided as base64-encoded raw bytes (Ed25519).
try:
decoded_key = base64.b64decode(self.private_key)
if len(decoded_key) == 32:
private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key)
elif len(decoded_key) == 64:
private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key[:32])
else:
raise InvalidAPIKeyFormatError(
"Ed25519 private key must be 32 or 64 bytes after base64 decoding"
)
except Exception as e2:
raise InvalidAPIKeyFormatError("Could not parse the private key") from e2

# Determine signing algorithm based on the key type.
if isinstance(private_key_obj, ec.EllipticCurvePrivateKey):
alg = "ES256"
elif isinstance(private_key_obj, ed25519.Ed25519PrivateKey):
alg = "EdDSA"
else:
raise InvalidAPIKeyFormatError("Unsupported key type")

header = {
"alg": "ES256",
"alg": alg,
"kid": self.api_key,
"typ": "JWT",
"nonce": self._nonce(),
Expand All @@ -195,12 +216,12 @@ def _build_jwt(self, url: str, method: str = "GET") -> str:
"iss": "cdp",
"aud": ["cdp_service"],
"nbf": int(time.time()),
"exp": int(time.time()) + 60, # +1 minute
"exp": int(time.time()) + 60, # Token valid for 1 minute
"uris": [uri],
}

try:
return jwt.encode(claims, private_key, algorithm="ES256", headers=header)
return jwt.encode(claims, private_key_obj, algorithm=alg, headers=header)
except Exception as e:
print(f"Error during JWT signing: {e!s}")
raise InvalidAPIKeyFormatError("Could not sign the JWT") from e
Expand Down
40 changes: 32 additions & 8 deletions cdp/wallet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import builtins
import hashlib
import json
Expand All @@ -12,7 +13,7 @@
from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicValidator, Bip39SeedGenerator
from Crypto.Cipher import AES
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
from eth_account import Account

from cdp.address import Address
Expand Down Expand Up @@ -692,13 +693,36 @@ def _encryption_key(self) -> bytes:
bytes: The generated encryption key.

"""
private_key = serialization.load_pem_private_key(Cdp.private_key.encode(), password=None)

public_key = private_key.public_key()

shared_secret = private_key.exchange(ec.ECDH(), public_key)

return hashlib.sha256(shared_secret).digest()
try:
key_obj = serialization.load_pem_private_key(Cdp.private_key.encode(), password=None)
except Exception:
# If PEM loading fails, assume the key is provided as a base64-encoded Ed25519 key.
try:
decoded = base64.b64decode(Cdp.private_key)
if len(decoded) == 32:
key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded)
elif len(decoded) == 64:
key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded[:32])
else:
raise ValueError("Invalid Ed25519 key length")
except Exception as e2:
raise ValueError("Could not parse the private key") from e2

# For ECDSA keys, perform an ECDH exchange with its own public key.
if isinstance(key_obj, ec.EllipticCurvePrivateKey):
public_key = key_obj.public_key()
shared_secret = key_obj.exchange(ec.ECDH(), public_key)
return hashlib.sha256(shared_secret).digest()
# For Ed25519 keys, derive the encryption key by hashing the raw private key bytes.
elif isinstance(key_obj, ed25519.Ed25519PrivateKey):
raw_bytes = key_obj.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
return hashlib.sha256(raw_bytes).digest()
else:
raise ValueError("Unsupported key type for encryption key derivation")

def _existing_seeds(self, file_path: str) -> dict[str, Any]:
"""Load existing seeds from a file.
Expand Down
Loading