diff --git a/pytuya/__init__.py b/pytuya/__init__.py index c6f3a5a..add7ff2 100644 --- a/pytuya/__init__.py +++ b/pytuya/__init__.py @@ -10,17 +10,17 @@ import base64 -from hashlib import md5 +import binascii +import colorsys import json import logging import socket import sys import time -import colorsys -import binascii +from hashlib import md5 +from typing import Union try: - #raise ImportError import Crypto from Crypto.Cipher import AES # PyCrypto except ImportError: @@ -29,10 +29,7 @@ from pytuya.const import __version__ - log = logging.getLogger(__name__) -#logging.basicConfig() # TODO include function name/line numbers in log -#log.setLevel(level=logging.DEBUG) # Debug hack! log.info('%s version %s', __name__, __version__) log.info('Python %s on %s', sys.version, sys.platform) @@ -51,12 +48,14 @@ IS_PY2 = sys.version_info[0] == 2 + class AESCipher(object): def __init__(self, key): - #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ + # self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ self.bs = 16 self.key = key - def encrypt(self, raw, use_base64 = True): + + def encrypt(self, raw, use_base64=True): if Crypto: raw = self._pad(raw) cipher = AES.new(self.key, mode=AES.MODE_ECB) @@ -66,8 +65,6 @@ def encrypt(self, raw, use_base64 = True): cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 crypted_text = cipher.feed(raw) crypted_text += cipher.feed() # flush final block - #print('crypted_text %r' % crypted_text) - #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) if use_base64: return base64.b64encode(crypted_text) else: @@ -76,27 +73,23 @@ def encrypt(self, raw, use_base64 = True): def decrypt(self, enc, use_base64=True): if use_base64: enc = base64.b64decode(enc) - #print('enc (%d) %r' % (len(enc), enc)) - #enc = self._unpad(enc) - #enc = self._pad(enc) - #print('upadenc (%d) %r' % (len(enc), enc)) if Crypto: cipher = AES.new(self.key, AES.MODE_ECB) raw = cipher.decrypt(enc) - #print('raw (%d) %r' % (len(raw), raw)) return self._unpad(raw).decode('utf-8') - #return self._unpad(cipher.decrypt(enc)).decode('utf-8') else: cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 plain_text = cipher.feed(enc) plain_text += cipher.feed() # flush final block return plain_text + def _pad(self, s): padnum = self.bs - len(s) % self.bs return s + padnum * chr(padnum).encode() + @staticmethod def _unpad(s): - return s[:-ord(s[len(s)-1:])] + return s[:-ord(s[len(s) - 1:])] def bin2hex(x, pretty=False): @@ -117,6 +110,7 @@ def hex2bin(x): else: return bytes.fromhex(x) + # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi payload_dict = { "device": { @@ -133,6 +127,7 @@ def hex2bin(x): } } + class XenonDevice(object): def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): """ @@ -207,7 +202,6 @@ def generate_payload(self, command, data=None): # Create byte buffer from hex data json_payload = json.dumps(json_data) - #print(json_payload) json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! json_payload = json_payload.encode('utf-8') log.debug('json_payload=%r', json_payload) @@ -221,55 +215,32 @@ def generate_payload(self, command, data=None): json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload elif command == SET: # need to encrypt - #print('json_payload %r' % json_payload) self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new json_payload = self.cipher.encrypt(json_payload) - #print('crypted json_payload %r' % json_payload) preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key - #print('preMd5String %r' % preMd5String) m = md5() m.update(preMd5String) - #print(repr(m.digest())) hexdigest = m.hexdigest() - #print(hexdigest) - #print(hexdigest[8:][:16]) json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload - #print('data_to_send') - #print(json_payload) - #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - #print('json_payload %r' % repr(json_payload)) - #print('json_payload len %r' % len(json_payload)) - #print(bin2hex(json_payload)) self.cipher = None # expect to connect and then disconnect to set new - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - #print('postfix_payload %r' % postfix_payload) - #print('postfix_payload %r' % len(postfix_payload)) - #print('postfix_payload %x' % len(postfix_payload)) - #print('postfix_payload %r' % hex(len(postfix_payload))) assert len(postfix_payload) <= 0xff postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + - payload_dict[self.dev_type][command]['hexByte'] + - '000000' + - postfix_payload_hex_len ) + postfix_payload + buffer = hex2bin(payload_dict[self.dev_type]['prefix'] + + payload_dict[self.dev_type][command]['hexByte'] + + '000000' + + postfix_payload_hex_len) + postfix_payload # calc the CRC of everything except where the CRC goes and the suffix hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] - #print('command', command) - #print('prefix') - #print(payload_dict[self.dev_type][command]['prefix']) - #print(repr(buffer)) - #print(bin2hex(buffer, pretty=True)) - #print(bin2hex(buffer, pretty=False)) - #print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) return buffer -class Device(XenonDevice): - def __init__(self, dev_id, address, local_key=None, dev_type=None): - super(Device, self).__init__(dev_id, address, local_key, dev_type) + +class TuyaDevice(XenonDevice): + def __init__(self, dev_id, address, local_key=None, dev_type="device"): + super(TuyaDevice, self).__init__(dev_id, address, local_key, dev_type) def status(self): log.debug('status() entry') @@ -281,8 +252,7 @@ def status(self): result = data[20:-8] # hard coded offsets log.debug('result=%r', result) - #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer - #print('result %r' % result) + if result.startswith(b'{'): # this is the regular expected code path if not isinstance(result, str): @@ -293,7 +263,8 @@ def status(self): # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # NOTE dps.2 may or may not be present result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header - result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload + result = result[ + 16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload cipher = AESCipher(self.local_key) result = cipher.decrypt(result) log.debug('decrypted result=%r', result) @@ -312,41 +283,41 @@ def status(self): return result - def set_status(self, on, switch=1): + def set_value(self, index: Union[str, int], value: Union[bool, int, float, str]): """ - Set status of the device to 'on' or 'off'. + Set int value of any index. Args: - on(bool): True for 'on', False for 'off'. - switch(int): The switch to set + index: index to set + value: new value for the index """ # open device, send request, then close connection - if isinstance(switch, int): - switch = str(switch) # index and payload is a string - payload = self.generate_payload(SET, {switch:on}) - #print('payload %r' % payload) + if isinstance(index, int): + index = str(index) # index and payload is a string - data = self._send_receive(payload) - log.debug('set_status received data=%r', data) + payload = self.generate_payload(SET, {index: value}) + return self._send_receive(payload) - return data - def set_value(self, index, value): +class Device(TuyaDevice): + def __init__(self, dev_id, address, local_key=None, dev_type=None): + super(Device, self).__init__(dev_id, address, local_key, dev_type) + + def set_status(self, on, switch=1): """ - Set int value of any index. + Set status of the device to 'on' or 'off'. Args: - index(int): index to set - value(int): new value for the index + on(bool): True for 'on', False for 'off'. + switch(int): The switch to set """ # open device, send request, then close connection - if isinstance(index, int): - index = str(index) # index and payload is a string - - payload = self.generate_payload(SET, { - index: value}) + if isinstance(switch, int): + switch = str(switch) # index and payload is a string + payload = self.generate_payload(SET, {switch: on}) data = self._send_receive(payload) + log.debug('set_status received data=%r', data) return data @@ -374,35 +345,37 @@ def set_timer(self, num_secs): devices_numbers.sort() dps_id = devices_numbers[-1] - payload = self.generate_payload(SET, {dps_id:num_secs}) + payload = self.generate_payload(SET, {dps_id: num_secs}) data = self._send_receive(payload) log.debug('set_timer received data=%r', data) return data + class OutletDevice(Device): def __init__(self, dev_id, address, local_key=None): dev_type = 'device' super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) + class BulbDevice(Device): - DPS_INDEX_ON = '1' - DPS_INDEX_MODE = '2' + DPS_INDEX_ON = '1' + DPS_INDEX_MODE = '2' DPS_INDEX_BRIGHTNESS = '3' DPS_INDEX_COLOURTEMP = '4' - DPS_INDEX_COLOUR = '5' + DPS_INDEX_COLOUR = '5' - DPS = 'dps' + DPS = 'dps' DPS_MODE_COLOUR = 'colour' - DPS_MODE_WHITE = 'white' + DPS_MODE_WHITE = 'white' DPS_2_STATE = { - '1':'is_on', - '2':'mode', - '3':'brightness', - '4':'colourtemp', - '5':'colour', - } + '1': 'is_on', + '2': 'mode', + '3': 'brightness', + '4': 'colourtemp', + '5': 'colour', + } def __init__(self, dev_id, address, local_key=None): dev_type = 'device' @@ -425,12 +398,12 @@ def _rgb_to_hexvalue(r, g, b): g(int): Value for the colour green as int from 0-255. b(int): Value for the colour blue as int from 0-255. """ - rgb = [r,g,b] - hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) + rgb = [r, g, b] + hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) hexvalue = "" for value in rgb: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue = hexvalue + temp @@ -438,7 +411,7 @@ def _rgb_to_hexvalue(r, g, b): hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] hexvalue_hsv = "" for value in hsvarray: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue_hsv = hexvalue_hsv + temp @@ -495,7 +468,6 @@ def set_colour(self, r, g, b): if not 0 <= b <= 255: raise ValueError("The value for blue needs to be between 0 and 255.") - #print(BulbDevice) hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) payload = self.generate_payload(SET, { @@ -576,7 +548,7 @@ def state(self): state = {} for key in status[self.DPS].keys(): - if(int(key)<=5): - state[self.DPS_2_STATE[key]]=status[self.DPS][key] + if (int(key) <= 5): + state[self.DPS_2_STATE[key]] = status[self.DPS][key] return state diff --git a/tests.py b/tests.py index 481d06d..b0554b0 100755 --- a/tests.py +++ b/tests.py @@ -36,8 +36,9 @@ def compare_json_strings(json1, json2, ignoring_keys=None): def check_data_frame(data, expected_prefix, encrypted=True): prefix = data[:15] - suffix = data[-8:] - + crc = data[-8:-4] + suffix = data[-4:] + if encrypted: payload_len = struct.unpack(">B",data[15:16])[0] # big-endian, unsigned char version = data[16:19] @@ -51,10 +52,10 @@ def check_data_frame(data, expected_prefix, encrypted=True): frame_ok = True if prefix != pytuya.hex2bin(expected_prefix): frame_ok = False - elif suffix != pytuya.hex2bin("000000000000aa55"): + elif suffix != pytuya.hex2bin("0000aa55"): frame_ok = False elif encrypted: - if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(suffix): + if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(crc) + len(suffix): frame_ok = False elif version != b"3.1": frame_ok = False