Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crypto/enums/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from enum import Enum

class Constants(Enum):
ETHEREUM_RECOVERY_ID_OFFSET = 27
EIP_1559_PREFIX = '02'
20 changes: 17 additions & 3 deletions crypto/identity/private_key.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from binascii import hexlify
from hashlib import sha256

from coincurve import PrivateKey as PvtKey

from crypto.enums.constants import Constants

class PrivateKey(object):
def __init__(self, private_key: str):
self.private_key = PvtKey.from_hex(private_key)
Expand All @@ -18,8 +19,21 @@ def sign(self, message: bytes) -> bytes:
bytes: signature of the signed message
"""
signature = self.private_key.sign(message)

return hexlify(signature).decode()

return hexlify(signature)

def sign_compact(self, message: bytes) -> bytes:
"""Sign a message with this private key object

Args:
message (bytes): bytes data you want to sign

Returns:
bytes: signature of the signed message
"""
der = self.private_key.sign_recoverable(message)

return bytes([der[64] + Constants.ETHEREUM_RECOVERY_ID_OFFSET.value]) + der[0:64]

def to_hex(self):
"""Returns a private key in hex format
Expand Down
108 changes: 43 additions & 65 deletions crypto/transactions/deserializer.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import re
from binascii import unhexlify
from typing import Optional
from crypto.enums.constants import Constants
from crypto.transactions.types.abstract_transaction import AbstractTransaction
from crypto.transactions.types.transfer import Transfer
from crypto.transactions.types.evm_call import EvmCall
from crypto.transactions.types.vote import Vote
from crypto.transactions.types.unvote import Unvote
from crypto.transactions.types.validator_registration import ValidatorRegistration
from crypto.transactions.types.validator_resignation import ValidatorResignation
from binascii import unhexlify, hexlify

from binary.unsigned_integer.reader import (
read_bit8,
read_bit32,
read_bit64,
)
from crypto.enums.abi_function import AbiFunction
from crypto.utils.abi_decoder import AbiDecoder
from crypto.utils.rlp_decoder import RlpDecoder

class Deserializer:
SIGNATURE_SIZE = 64
Expand All @@ -23,77 +22,39 @@ def __init__(self, serialized: str):
self.serialized = unhexlify(serialized) if isinstance(serialized, str) else serialized
self.pointer = 0

self.encoded_rlp = '0x' + serialized[2:]

@staticmethod
def new(serialized: str):
return Deserializer(serialized)

def deserialize(self) -> AbstractTransaction:
data = {}
decoded_rlp = RlpDecoder.decode(self.encoded_rlp)

data = {
'network': Deserializer.parse_number(decoded_rlp[0]),
'nonce': Deserializer.parse_big_number(decoded_rlp[1]),
'gasPrice': Deserializer.parse_number(decoded_rlp[3]),
'gasLimit': Deserializer.parse_number(decoded_rlp[4]),
'recipientAddress': Deserializer.parse_address(decoded_rlp[5]),
'value': Deserializer.parse_big_number(decoded_rlp[6]),
'data': Deserializer.parse_hex(decoded_rlp[7]),
}

if len(decoded_rlp) == 12:
data['v'] = Deserializer.parse_number(decoded_rlp[9]) + Constants.ETHEREUM_RECOVERY_ID_OFFSET.value
data['r'] = Deserializer.parse_hex(decoded_rlp[10])
data['s'] = Deserializer.parse_hex(decoded_rlp[11])

self.deserialize_common(data)
self.deserialize_data(data)
transaction = self.guess_transaction_from_data(data)
self.deserialize_signatures(data)

transaction.data = data
transaction.recover_sender()

transaction.data['id'] = transaction.hash(skip_signature=False).hex()
transaction.data['id'] = transaction.get_id()

return transaction

def read_bytes(self, length: int) -> bytes:
result = self.serialized[self.pointer:self.pointer + length]
self.pointer += length
return result

def deserialize_common(self, data: dict):
data['network'] = read_bit8(self.serialized, self.pointer)
self.pointer += 1

nonce = read_bit64(self.serialized, self.pointer)
data['nonce'] = str(nonce)
self.pointer += 8

gas_price = read_bit32(self.serialized, self.pointer)
data['gasPrice'] = gas_price
self.pointer += 4

gas_limit = read_bit32(self.serialized, self.pointer)
data['gasLimit'] = gas_limit
self.pointer += 4

data['value'] = '0'

def deserialize_data(self, data: dict):
value = int.from_bytes(self.serialized[self.pointer:self.pointer + 32], byteorder='big')
self.pointer += 32

data['value'] = str(value)

recipient_marker = read_bit8(self.serialized, self.pointer)
self.pointer += 1

if recipient_marker == 1:
recipient_address_bytes = self.read_bytes(20)
recipient_address = '0x' + hexlify(recipient_address_bytes).decode()
data['recipientAddress'] = recipient_address

payload_length = read_bit32(self.serialized, self.pointer)
self.pointer += 4

payload_hex = ''
if payload_length > 0:
payload_bytes = self.read_bytes(payload_length)
payload_hex = hexlify(payload_bytes).decode()

data['data'] = payload_hex

def deserialize_signatures(self, data: dict):
signature_length = self.SIGNATURE_SIZE + self.RECOVERY_SIZE
signature_bytes = self.read_bytes(signature_length)
data['signature'] = hexlify(signature_bytes).decode()

def guess_transaction_from_data(self, data: dict) -> AbstractTransaction:
if data['value'] != '0':
return Transfer(data)
Expand All @@ -115,7 +76,7 @@ def guess_transaction_from_data(self, data: dict) -> AbstractTransaction:
else:
return EvmCall(data)

def decode_payload(self, data: dict) -> dict:
def decode_payload(self, data: dict) -> Optional[dict]:
payload = data.get('data', '')

if payload == '':
Expand All @@ -126,4 +87,21 @@ def decode_payload(self, data: dict) -> dict:
return decoder.decode_function_data(payload)
except Exception as e:
print(f"Error decoding payload: {str(e)}")
return None

return None

@staticmethod
def parse_number(value: str) -> int:
return 0 if value == '0x' else int(value, 16)

@staticmethod
def parse_big_number(value: str) -> str:
return str(Deserializer.parse_number(value))

@staticmethod
def parse_hex(value: str) -> str:
return re.sub(r'^0x', '', value)

@staticmethod
def parse_address(value: str) -> Optional[str]:
return None if value == '0x' else value
54 changes: 3 additions & 51 deletions crypto/transactions/serializer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
from binascii import unhexlify
from crypto.transactions.types.abstract_transaction import AbstractTransaction
from crypto.configuration.network import get_network
from binary.unsigned_integer.writer import (
write_bit8,
write_bit32,
write_bit64,
)
from crypto.utils.transaction_utils import TransactionUtils

class Serializer:
def __init__(self, transaction: AbstractTransaction):
Expand All @@ -22,48 +16,6 @@ def get_bytes(transaction: AbstractTransaction, skip_signature: bool = False) ->
return transaction.serialize(skip_signature=skip_signature)

def serialize(self, skip_signature: bool = False) -> bytes:
bytes_data = bytes()
transaction_hash = TransactionUtils.to_buffer(self.transaction.data, skip_signature=skip_signature).decode()

bytes_data += self.serialize_common()
bytes_data += self.serialize_data()
if not skip_signature:
bytes_data += self.serialize_signatures()

return bytes_data

def serialize_common(self) -> bytes:
bytes_data = bytes()
network_version = self.transaction.data.get('network', get_network()['version'])
bytes_data += write_bit8(int(network_version))
bytes_data += write_bit64(int(self.transaction.data['nonce']))
bytes_data += write_bit32(int(self.transaction.data['gasPrice']))
bytes_data += write_bit32(int(self.transaction.data['gasLimit']))
return bytes_data

def serialize_data(self) -> bytes:
bytes_data = bytes()

bytes_data += int(self.transaction.data['value']).to_bytes(32, byteorder='big')

if 'recipientAddress' in self.transaction.data:
bytes_data += write_bit8(1)
recipient_address = self.transaction.data['recipientAddress']
bytes_data += unhexlify(recipient_address.replace('0x', ''))
else:
bytes_data += write_bit8(0)

payload_hex = self.transaction.data.get('data', '')
payload_length = len(payload_hex) // 2
bytes_data += write_bit32(payload_length)

if payload_length > 0:
bytes_data += unhexlify(payload_hex)

return bytes_data

def serialize_signatures(self) -> bytes:
bytes_data = bytes()
if 'signature' in self.transaction.data:
bytes_data += unhexlify(self.transaction.data['signature'])

return bytes_data
return bytes.fromhex(transaction_hash)
58 changes: 34 additions & 24 deletions crypto/transactions/types/abstract_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import Optional

from crypto.configuration.network import get_network
from crypto.enums.constants import Constants
from crypto.identity.address import address_from_public_key
from crypto.identity.private_key import PrivateKey
from crypto.utils.transaction_hasher import TransactionHasher
from crypto.utils.transaction_utils import TransactionUtils
from coincurve import PublicKey
from crypto.utils.abi_decoder import AbiDecoder

Expand All @@ -19,7 +20,7 @@ def get_payload(self) -> str:
def decode_payload(self, data: dict) -> Optional[dict]:
if 'data' not in data or data['data'] == '':
return None

payload = data['data']
decoder = AbiDecoder()

Expand All @@ -31,51 +32,59 @@ def refresh_payload_data(self):
self.data['data'] = self.get_payload().lstrip('0x')

def get_id(self) -> str:
return self.hash(skip_signature=False).hex()
return TransactionUtils.get_id(self.data)

def get_bytes(self, skip_signature: bool = False) -> bytes:
from crypto.transactions.serializer import Serializer

return Serializer.get_bytes(self, skip_signature)

def sign(self, private_key: PrivateKey):
hash_ = self.hash(skip_signature=True)
signature_with_recid = private_key.private_key.sign_recoverable(hash_, hasher=None)
signature_hex = signature_with_recid.hex()
self.data['signature'] = signature_hex
transaction_hash = TransactionUtils.to_buffer(self.data, skip_signature=True).decode()

message = bytes.fromhex(transaction_hash)

transaction_signature = private_key.sign_compact(message)

self.data['v'] = transaction_signature[0]
self.data['r'] = transaction_signature[1:33].hex()
self.data['s'] = transaction_signature[33:].hex()

return self

def get_public_key(self, compact_signature, hash_):
public_key = PublicKey.from_signature_and_message(compact_signature, hash_, hasher=None)

return public_key

def recover_sender(self):
signature_hex = self.data.get('signature')
if not signature_hex:
raise ValueError("No signature to recover from")
signature_with_recid = self.get_signature()
if not signature_with_recid:
return False

signature_with_recid = bytes.fromhex(signature_hex)
hash_ = self.hash(skip_signature=True)
hash_ = bytes.fromhex(self.hash(skip_signature=True))
public_key = self.get_public_key(signature_with_recid, hash_)
self.data['senderPublicKey'] = public_key.format().hex()
self.data['senderAddress'] = address_from_public_key(self.data['senderPublicKey'])

def verify(self) -> bool:
signature_hex = self.data.get('signature')
if not signature_hex:
signature_with_recid = self.get_signature()
if not signature_with_recid:
return False

signature_with_recid = bytes.fromhex(signature_hex)
hash_ = self.hash(skip_signature=True)
hash_ = bytes.fromhex(self.hash(skip_signature=True))
recovered_public_key = self.get_public_key(signature_with_recid, hash_)
sender_public_key_hex = self.data.get('senderPublicKey')
if not sender_public_key_hex:
return False

sender_public_key_bytes = bytes.fromhex(sender_public_key_hex)

return recovered_public_key.format() == sender_public_key_bytes

def serialize(self, skip_signature: bool = False) -> bytes:
from crypto.transactions.serializer import Serializer

return Serializer(self).serialize(skip_signature)

def to_dict(self) -> dict:
Expand All @@ -84,14 +93,15 @@ def to_dict(self) -> dict:
def to_json(self) -> str:
return json.dumps(self.to_dict())

def hash(self, skip_signature: bool) -> bytes:
hash_data = self.data.copy()
if skip_signature:
hash_data['signature'] = None
return TransactionHasher.to_hash(hash_data, skip_signature)
def hash(self, skip_signature: bool) -> str:
return TransactionUtils.to_hash(self.data, skip_signature=skip_signature)

def get_signature(self):
signature_hex = self.data.get('signature')
if signature_hex:
return bytes.fromhex(signature_hex)
recover_id = int(self.data.get('v', 0)) - Constants.ETHEREUM_RECOVERY_ID_OFFSET.value
r = self.data.get('r')
s = self.data.get('s')

if r and s:
return bytes.fromhex(r) + bytes.fromhex(s) + bytes([recover_id])

return None
Loading