diff --git a/README.md b/README.md index 7c29df6..1f82fb6 100644 --- a/README.md +++ b/README.md @@ -3,20 +3,126 @@ [![Build Status](https://travis-ci.org/clach04/python-tuya.svg?branch=master)](https://travis-ci.org/clach04/python-tuya) Python 2.7 and Python 3.6.1 interface to ESP8266MOD WiFi smart devices from Shenzhen Xenon. -If you are using the Jinvoo Smart App, this allows local control over the LAN. -NOTE requires the devices to have already been **activated** by Jinvoo Smart App (or similar). +If you are using the Jinvoo Smart app, this allows local control over the LAN. +NOTE requires the devices to have already been **activated** by Jinvoo Smart app (or similar). -## Key extraction - -https://github.com/clach04/python-tuya/wiki has background information for how to get device id and local key. -(the device id can be seen in Jinvoo Smart App, under "Device Info"). Known to work with: * SKYROKU SM-PW701U Wi-Fi Plug Smart Plug - see https://wikidevi.com/wiki/Xenon_SM-PW701U * Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging + * Jinvoo WiFi Curtain / Roller Shutter Switch -Demo: +## Key extraction + +- background knowledge + - The ``local-key`` is used for AES-based encryption of the messages sent between device and client. + - Key changes every time the device is reset and paired to a new account (e.g. using app) + - Key and further meta-data is regularly requested by the app from the cloud-server via HTTPS requests + - The request responses can be recorded by apps to extract the key and further data (such as name, ip, state, etc.) +- how to extract key + - Android + - Install your app used for setup and paring (tested with TuyaSmart) + - Install [SSL Capture](https://play.google.com/store/apps/details?id=com.minhui.networkcapture) + - Change app settings to only record your pairing app and start recording + - Go inside app and do something with one of your devices + - Go back inside the SSL Capture app, stop the recording + - Find the package with the longest response by the server + - Copy all information from the response body to your computer (e.g. via email) + - Use pytuya to extract key from the response stored inside a file: + ``pytuya utils extract_keys response.txt`` + - IOS + - > TODO + +- further resources: + - https://github.com/clach04/python-tuya/wiki has further information for how to get device id and local key. +(the device id can be seen in Jinvoo Smart app, under "Device Info"). + +## CLI - Commandline Interface +The command line tool ``pytuya`` can be used to send actions to devices. Simply executing ``pytuya`` after +installing displays the following options: + + >pytuya + Usage: pytuya [OPTIONS] COMMAND [ARGS]... + + Options: + -l, --debug / --no-debug + -c, --config_path PATH + --help Show this message and exit. + + Commands: + bulb + cover + outlet + update_config + utils + +In order to use this client interface, it is first necessary to update the configuration, which should +a name, ip, id and local key used for encryption for each device. Given a recorded API response extracted +from the app using SSL Capture (see above), the configuration can be automatically built as following: + + > pytuya update_config example_response.txt + + INFO:root:Querying devices + WARNING:root:wrote config at C:\Users\username\pytuya.yaml with content: + + Bedroom Blinds: + id: 51870625b4e62e4b2fc4 + ip: 192.168.1.116 + key: afab3d41b839c54c + Bedroom Lights: + id: 5517064584f3ec2e4095 + ip: 192.168.1.210 + key: bfa4804827714672 + +Once this config file exists, actions can be sent to the corresponding devices by referencing them via name: + + > pytuya cover close "bedroom blinds" + + INFO:root:sending close to device bedroom blinds at 192.168.1.116 + +In order to get help, simply use the ``--help`` flag, e.g.: + + > pytuya bulb --help + + Usage: pytuya bulb [OPTIONS] COMMAND [ARGS]... + + Options: + --help Show this message and exit. + + Commands: + brightness set brightness of device + colour set colour of device using provided R, G, B... + off sends turn off action to device + on sends turn on action to device + state sends turn off action to device + + > pytuya bulb colour --help + Usage: pytuya bulb colour [OPTIONS] NAME [R] [G] [B] + + set colour of device using provided R, G, B (red green and blue) + + Options: + --help Show this message and exit. + +### HomeAssistant Integration +HomeAssistant does already support tuya devices via the official cloud API. However, the direct communication +of the pytuya package may be preferable to relying on cloud services (pytuya works regardless of internet connectivity). +To use pytuya in HomeAssistant, simply use the commandline device components. +Here's an example configuration for a cover device with the name "bedroom blinds": + +```yaml +cover: + - platform: command_line + covers: + bedroom: + command_open: pytuya cover open "bedroom blinds" + command_close: pytuya cover close "bedroom blinds" + command_stop: pytuya cover stop "bedroom blinds" +``` +Note that pytuya needs to be set up first before these commands can work (see ``pytuya update_config``) + +## API-Demo: import pytuya diff --git a/pytuya/__init__.py b/pytuya/__init__.py index 737b420..737b14c 100644 --- a/pytuya/__init__.py +++ b/pytuya/__init__.py @@ -1,529 +1,8 @@ -# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices -# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U -# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug -# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa -# -# This would not exist without the protocol reverse engineering from -# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes -# -# Tested with Python 2.7 and Python 3.6.1 only +from .devices import XenonDevice, Device, BulbDevice, OutletDevice, CoverDevice, SET +from .utils import query_devices, KeyExtractor - -import base64 -from hashlib import md5 -import json -import logging -import socket -import sys -import time -import colorsys - -try: - #raise ImportError - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes - - -version_tuple = (7, 0, 3) +version_tuple = (7, 0, 4) version = version_string = __version__ = '%d.%d.%d' % version_tuple __author__ = 'clach04' -log = logging.getLogger(__name__) -logging.basicConfig() # TODO include function name/line numbers in log -#log.setLevel(level=logging.DEBUG) # Debug hack! - -log.info('Python %s on %s', sys.version, sys.platform) -if Crypto is None: - log.info('Using pyaes version %r', pyaes.VERSION) - log.info('Using pyaes from %r', pyaes.__file__) -else: - log.info('Using PyCrypto %r', Crypto.version_info) - log.info('Using PyCrypto from %r', Crypto.__file__) - -SET = 'set' - -PROTOCOL_VERSION_BYTES = b'3.1' - -IS_PY2 = sys.version_info[0] == 2 - -class AESCipher(object): - def __init__(self, key): - #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ - self.bs = 16 - self.key = key - def encrypt(self, raw): - if Crypto: - raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block - #print('crypted_text %r' % crypted_text) - #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) - crypted_text_b64 = base64.b64encode(crypted_text) - #print('crypted_text_b64 (%d) %r' % (len(crypted_text_b64), crypted_text_b64)) - return crypted_text_b64 - def decrypt(self, enc): - enc = base64.b64decode(enc) - #print('enc (%d) %r' % (len(enc), enc)) - #enc = self._unpad(enc) - #enc = self._pad(enc) - #print('upadenc (%d) %r' % (len(enc), enc)) - if Crypto: - cipher = AES.new(self.key, AES.MODE_ECB) - raw = cipher.decrypt(enc) - #print('raw (%d) %r' % (len(raw), raw)) - return self._unpad(raw).decode('utf-8') - #return self._unpad(cipher.decrypt(enc)).decode('utf-8') - else: - cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - plain_text = cipher.feed(enc) - plain_text += cipher.feed() # flush final block - return plain_text - def _pad(self, s): - padnum = self.bs - len(s) % self.bs - return s + padnum * chr(padnum).encode() - @staticmethod - def _unpad(s): - return s[:-ord(s[len(s)-1:])] - - -def bin2hex(x, pretty=False): - if pretty: - space = ' ' - else: - space = '' - if IS_PY2: - result = ''.join('%02X%s' % (ord(y), space) for y in x) - else: - result = ''.join('%02X%s' % (y, space) for y in x) - return result - - -def hex2bin(x): - if IS_PY2: - return x.decode('hex') - else: - return bytes.fromhex(x) - -# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi -payload_dict = { - "device": { - "status": { - "hexByte": "0a", - "command": {"gwId": "", "devId": ""} - }, - "set": { - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} - }, - "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) - "suffix": "000000000000aa55" - } -} - -class XenonDevice(object): - def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): - """ - Represents a Tuya device. - - Args: - dev_id (str): The device id. - address (str): The network address. - local_key (str, optional): The encryption key. Defaults to None. - dev_type (str, optional): The device type. - It will be used as key for lookups in payload_dict. - Defaults to None. - - Attributes: - port (int): The port to connect to. - """ - self.id = dev_id - self.address = address - self.local_key = local_key - self.local_key = local_key.encode('latin1') - self.dev_type = dev_type - self.connection_timeout = connection_timeout - - self.port = 6668 # default - do not expect caller to pass in - - def __repr__(self): - return '%r' % ((self.id, self.address),) # FIXME can do better than this - - def _send_receive(self, payload): - """ - Send single buffer `payload` and receive a single buffer. - - Args: - payload(bytes): Data to send. - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - s.send(payload) - data = s.recv(1024) - s.close() - return data - - def generate_payload(self, command, data=None): - """ - Generate the payload to send. - - Args: - command(str): The type of command. - This is one of the entries from payload_dict - data(dict, optional): The data to be send. - This is what will be passed via the 'dps' entry - """ - json_data = payload_dict[self.dev_type][command]['command'] - - if 'gwId' in json_data: - json_data['gwId'] = self.id - if 'devId' in json_data: - json_data['devId'] = self.id - if 'uid' in json_data: - json_data['uid'] = self.id # still use id, no seperate uid - if 't' in json_data: - json_data['t'] = str(int(time.time())) - - if data is not None: - json_data['dps'] = data - - # Create byte buffer from hex data - json_payload = json.dumps(json_data) - #print(json_payload) - json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! - json_payload = json_payload.encode('utf-8') - log.debug('json_payload=%r', json_payload) - - if command == SET: - # need to encrypt - #print('json_payload %r' % json_payload) - self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload) - #print('crypted json_payload %r' % json_payload) - preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key - #print('preMd5String %r' % preMd5String) - m = md5() - m.update(preMd5String) - #print(repr(m.digest())) - hexdigest = m.hexdigest() - #print(hexdigest) - #print(hexdigest[8:][:16]) - json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload - #print('data_to_send') - #print(json_payload) - #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - #print('json_payload %r' % repr(json_payload)) - #print('json_payload len %r' % len(json_payload)) - #print(bin2hex(json_payload)) - self.cipher = None # expect to connect and then disconnect to set new - - - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - #print('postfix_payload %r' % postfix_payload) - #print('postfix_payload %r' % len(postfix_payload)) - #print('postfix_payload %x' % len(postfix_payload)) - #print('postfix_payload %r' % hex(len(postfix_payload))) - assert len(postfix_payload) <= 0xff - postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + - payload_dict[self.dev_type][command]['hexByte'] + - '000000' + - postfix_payload_hex_len ) + postfix_payload - #print('command', command) - #print('prefix') - #print(payload_dict[self.dev_type][command]['prefix']) - #print(repr(buffer)) - #print(bin2hex(buffer, pretty=True)) - #print(bin2hex(buffer, pretty=False)) - #print('full buffer(%d) %r' % (len(buffer), buffer)) - return buffer - -class Device(XenonDevice): - def __init__(self, dev_id, address, local_key=None, dev_type=None): - super(Device, self).__init__(dev_id, address, local_key, dev_type) - - def status(self): - log.debug('status() entry') - # open device, send request, then close connection - payload = self.generate_payload('status') - - data = self._send_receive(payload) - log.debug('status received data=%r', data) - - result = data[20:-8] # hard coded offsets - log.debug('result=%r', result) - #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer - #print('result %r' % result) - if result.startswith(b'{'): - # this is the regular expected code path - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif result.startswith(PROTOCOL_VERSION_BYTES): - # got an encrypted payload, happens occasionally - # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} - # NOTE dps.2 may or may not be present - result = result[len(PROTOCOL_VERSION_BYTES):] # remove version header - result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result) - log.debug('decrypted result=%r', result) - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - else: - log.error('Unexpected status() payload=%r', result) - - return result - - def set_status(self, on, switch=1): - """ - Set status of the device to 'on' or 'off'. - - Args: - on(bool): True for 'on', False for 'off'. - switch(int): The switch to set - """ - # open device, send request, then close connection - if isinstance(switch, int): - switch = str(switch) # index and payload is a string - payload = self.generate_payload(SET, {switch:on}) - #print('payload %r' % payload) - - data = self._send_receive(payload) - log.debug('set_status received data=%r', data) - - return data - - def turn_on(self, switch=1): - """Turn the device on""" - self.set_status(True, switch) - - def turn_off(self, switch=1): - """Turn the device off""" - self.set_status(False, switch) - - def set_timer(self, num_secs): - """ - Set a timer. - - Args: - num_secs(int): Number of seconds - """ - # FIXME / TODO support schemas? Accept timer id number as parameter? - - # Dumb heuristic; Query status, pick last device id as that is probably the timer - status = self.status() - devices = status['dps'] - devices_numbers = list(devices.keys()) - devices_numbers.sort() - dps_id = devices_numbers[-1] - - payload = self.generate_payload(SET, {dps_id:num_secs}) - - data = self._send_receive(payload) - log.debug('set_timer received data=%r', data) - return data - -class OutletDevice(Device): - def __init__(self, dev_id, address, local_key=None): - dev_type = 'device' - super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) - -class BulbDevice(Device): - DPS_INDEX_ON = '1' - DPS_INDEX_MODE = '2' - DPS_INDEX_BRIGHTNESS = '3' - DPS_INDEX_COLOURTEMP = '4' - DPS_INDEX_COLOUR = '5' - - DPS = 'dps' - DPS_MODE_COLOUR = 'colour' - DPS_MODE_WHITE = 'white' - - def __init__(self, dev_id, address, local_key=None): - dev_type = 'device' - super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) - - @staticmethod - def _rgb_to_hexvalue(r, g, b): - """ - Convert an RGB value to the hex representation expected by tuya. - - Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: - rrggbb0hhhssvv - - While r, g and b are just hexadecimal values of the corresponding - Red, Green and Blue values, the h, s and v values (which are values - between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - rgb = [r,g,b] - hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) - - hexvalue = "" - for value in rgb: - temp = str(hex(int(value))).replace("0x","") - if len(temp) == 1: - temp = "0" + temp - hexvalue = hexvalue + temp - - hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] - hexvalue_hsv = "" - for value in hsvarray: - temp = str(hex(int(value))).replace("0x","") - if len(temp) == 1: - temp = "0" + temp - hexvalue_hsv = hexvalue_hsv + temp - if len(hexvalue_hsv) == 7: - hexvalue = hexvalue + "0" + hexvalue_hsv - else: - hexvalue = hexvalue + "00" + hexvalue_hsv - - return hexvalue - - @staticmethod - def _hexvalue_to_rgb(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an RGB value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - r = int(hexvalue[0:2], 16) - g = int(hexvalue[2:4], 16) - b = int(hexvalue[4:6], 16) - - return (r, g, b) - - @staticmethod - def _hexvalue_to_hsv(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an HSV value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - h = int(hexvalue[7:10], 16) / 360 - s = int(hexvalue[10:12], 16) / 255 - v = int(hexvalue[12:14], 16) / 255 - - return (h, s, v) - - def set_colour(self, r, g, b): - """ - Set colour of an rgb bulb. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - if not 0 <= r <= 255: - raise ValueError("The value for red needs to be between 0 and 255.") - if not 0 <= g <= 255: - raise ValueError("The value for green needs to be between 0 and 255.") - if not 0 <= b <= 255: - raise ValueError("The value for blue needs to be between 0 and 255.") - - print(BulbDevice) - hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) - - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, - self.DPS_INDEX_COLOUR: hexvalue}) - data = self._send_receive(payload) - return data - - def set_white(self, brightness, colourtemp): - """ - Set white coloured theme of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, - self.DPS_INDEX_BRIGHTNESS: brightness, - self.DPS_INDEX_COLOURTEMP: colourtemp}) - - data = self._send_receive(payload) - return data - - def set_brightness(self, brightness): - """ - Set the brightness value of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) - data = self._send_receive(payload) - return data - - def set_colourtemp(self, colourtemp): - """ - Set the colour temperature of an rgb bulb. - - Args: - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) - data = self._send_receive(payload) - return data - - def brightness(self): - """Return brightness value""" - return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] - - def colourtemp(self): - """Return colour temperature""" - return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] - - def colour_rgb(self): - """Return colour as RGB value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_rgb(hexvalue) - - def colour_hsv(self): - """Return colour as HSV value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_hsv(hexvalue) - def state(self): - status = self.status() - state = { - 'is_on' : status[self.DPS][self.DPS_INDEX_ON], - 'mode' : status[self.DPS][self.DPS_INDEX_MODE], - 'brightness' : status[self.DPS][self.DPS_INDEX_BRIGHTNESS], - 'colourtemp' : status[self.DPS][self.DPS_INDEX_COLOURTEMP], - 'colour' : status[self.DPS][self.DPS_INDEX_COLOUR], - } - return state diff --git a/pytuya/cli/__init__.py b/pytuya/cli/__init__.py new file mode 100644 index 0000000..36fce55 --- /dev/null +++ b/pytuya/cli/__init__.py @@ -0,0 +1,11 @@ +from pytuya.cli.main import cli_root, config, get_device_from_config +from pytuya.cli import bulb, cover, outlet, utils + + +def main(): + cli_root() + + +if __name__ == "__main__": + import sys, os + main() \ No newline at end of file diff --git a/pytuya/cli/bulb.py b/pytuya/cli/bulb.py new file mode 100644 index 0000000..ad66b36 --- /dev/null +++ b/pytuya/cli/bulb.py @@ -0,0 +1,73 @@ +import json +import click +from pytuya.cli import cli_root, get_device_from_config, config +from pytuya.devices import BulbDevice + + +@cli_root.group("bulb") +def bulb(): + pass + + +@bulb.command() +@click.argument('name', default=None) +def on(name): + """ sends turn on action to device """ + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_on() + + +@bulb.command() +@click.argument('name', default=None) +def off(name): + """ sends turn off action to device """ + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_off() + + +@bulb.command() +@click.argument('name', default=None) +@click.argument('brightness', default=255, type=click.types.IntRange(min=25, max=255, clamp=True)) +@click.option('-t', '--colour_temp', default=None, type=click.types.IntRange(min=25, max=255, clamp=True), + help="colour temperature") +def brightness(name, brightness, colour_temp): + """ set brightness of device""" + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + if colour_temp is None: + dev.set_brightness(brightness) + else: + dev.set_white(brightness=brightness, colour_temp=colour_temp) + + +@bulb.command() +@click.argument('name', default=None) +@click.argument('r', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +@click.argument('g', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +@click.argument('b', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +def colour(name, r, g, b): + """ set colour of device using provided R, G, B (red green and blue)""" + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.set_colour(r, g, b) + + +def get_json_state(dev_props): + return BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() + + +@bulb.command() +@click.argument('name', default=None) +def state(name): + """ prints the current state of device specified via NAME """ + dev_props = get_device_from_config(config, name) + print(json.dumps(get_json_state(dev_props))) + + +if __name__ == "__main__": + import sys + sys.argv = list(sys.argv) + ["bulb", "state", "garden lights"] + from pytuya.cli import cli_root + cli_root() diff --git a/pytuya/cli/cover.py b/pytuya/cli/cover.py new file mode 100644 index 0000000..9c37aa9 --- /dev/null +++ b/pytuya/cli/cover.py @@ -0,0 +1,61 @@ +import logging +import click +import json +from pytuya.devices import CoverDevice +from pytuya.cli.main import cli_root, config, get_device_from_config + + +@cli_root.group("cover") +def cover(): + pass + + +def exec_cover_action(name, action_name): + actions = dict(close=CoverDevice.action_close, open=CoverDevice.action_open, + stop=CoverDevice.action_stop) + dev_props = get_device_from_config(config, name) + logging.info("sending %s to device %s at %s" % (action_name, name, dev_props["ip"])) + dev = CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.send_action(action=actions.get(action_name)) + + +def add_cover_command(action_name): + cmd = cover.command(action_name)( + click.argument('name', default=None)( + lambda name: exec_cover_action(name, action_name))) + return cmd + + +for action in "open", "close", "stop": + add_cover_command(action) + + +def get_json_state(dev_props): + return CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() + + +def get_status_descr(status): + if type(status) is bytes: + return str(status) + return {'1': "open", '2': "closed", '3': "stopped"}.get(status.get('dps').get('1')) + + +@cover.command() +@click.argument('name', default=None) +def state(name): + """ sends turn off action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + state = get_json_state(dev_props) + state["descr"] = get_status_descr(state) + print(json.dumps(state)) + + +if __name__ == "__main__": + import sys + + name = "study_blinds" + sys.argv = list(sys.argv) + ["cover", "state", name.lower()] + + print("\nexecuting test: " + " ".join(sys.argv[1:])) + + cli_root() diff --git a/pytuya/cli/main.py b/pytuya/cli/main.py new file mode 100644 index 0000000..11d94a6 --- /dev/null +++ b/pytuya/cli/main.py @@ -0,0 +1,125 @@ +import logging +import click +import yaml +import os + + +def_config_path = os.path.join(os.path.expanduser("~"), ".pytuya.yaml") + + +class Config(dict): + _path = None + + @property + def path(self): + return self._path + + @path.setter + def path(self, value): + self._path = value + if os.path.isfile(value): + with open(self._path, "r") as f: + content = yaml.load(f.read()) + + if content is None: + raise RuntimeError("Invalid Config: %s " % self.path) + + super().update(content) + + def __str__(self): + return yaml.dump(dict(**self), default_flow_style=False) + + def update(self, new_config, **kwargs): + if os.path.isfile(self._path): + try: + old_config = dict(**self) + except Exception as e: + logging.warning("%s: generating new config" % e) + old_config = {} + else: + old_config = {} + + old_config.update(new_config) + with open(self.path, "w") as f: + config_yaml = yaml.dump(old_config, default_flow_style=False) + f.write(config_yaml) + + super().update(new_config) + + logging.warning("wrote config at %s with content:\n%s" % (self._path, config_yaml)) + + +config = Config() + + +def get_keys_from_file(api_response_path): + # extracts a local key from recorded api responses stored in a file + from pytuya.utils import KeyExtractor + + with open(api_response_path, "rb") as f: + api_response = f.read() + + return KeyExtractor.parse_device_keys_from_api_response(api_response) + + +def build_config(api_response_path): + from pytuya.utils import query_devices + keys = get_keys_from_file(api_response_path) + logging.info("Querying devices") + devices = query_devices() + res = {} + for dev_id, props in keys.items(): + if dev_id not in devices or "ip" not in devices[dev_id]: + logging.warning("device %s with id %s not found" % (props["name"], dev_id)) + continue + res[props["name"]] = dict(key=props["key"], ip=devices[dev_id]["ip"], id=dev_id) + return res + + +def get_device_from_config(config, name): + dev_props = config.get(name) + if dev_props is None: + name_map = lambda name: name.lower().replace(" ", "").replace("_", "") + dev_props = {name_map(k): v for k, v in config.items()}.get(name_map(name)) + if dev_props is None: + raise RuntimeError("Device %s not found in config:\n%s" % (name, config)) + return dev_props + + +@click.group() +@click.option('-l', '--debug/--no-debug', default=False) +@click.option("-c", "--config_path", default=def_config_path, type=click.Path(file_okay=True, dir_okay=False)) +def cli_root(debug, config_path): + log_level = logging.DEBUG if debug else logging.INFO + logging.getLogger().setLevel(log_level) + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + ) + config.path = config_path + + +@cli_root.command("update_config") +@click.argument('api_response_path', default=None, type=click.Path(file_okay=True, dir_okay=False, readable=True)) +def update_config(api_response_path): + # updates a config file using info extracted from api response and queried devices + new_config = build_config(api_response_path) + config.update(new_config) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cli_root() + + test_extract = list(sys.argv) + ["extract", "../../example_response.json"] + test_query = list(sys.argv) + ["query"] + test_update_config = list(sys.argv) + ["update_config", "../../example_response.json"] + + test = test_update_config + print("\nexecuting test: pytuya %s\n" % test) + sys.argv = test + + cli_root() diff --git a/pytuya/cli/outlet.py b/pytuya/cli/outlet.py new file mode 100644 index 0000000..77e792d --- /dev/null +++ b/pytuya/cli/outlet.py @@ -0,0 +1,49 @@ +import click +import json +from pytuya.cli import cli_root, get_device_from_config, config +from pytuya.devices import OutletDevice + + +@cli_root.group("outlet") +def outlet(): + pass + + +@outlet.command() +@click.argument('name', default=None) +@click.argument('switch', default=1) +def on(name, switch): + """ sends turn on action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_on(switch=switch) + + +@outlet.command() +@click.argument('name', default=None) +@click.argument('switch', default=1, type=click.types.IntRange(0, 3)) +def off(name, switch): + """ sends turn off action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_off(switch=switch) + + +def get_json_state(dev_props): + return OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() + + +@outlet.command() +@click.argument('name', default=None) +def state(name): + """ prints the current state of device specified via NAME """ + dev_props = get_device_from_config(config, name) + print(json.dumps(get_json_state(dev_props))) + + +if __name__ == "__main__": + import sys + # sys.argv = list(sys.argv) + ["outlet", "off", "garden lights", "2"] + sys.argv = list(sys.argv) + ["outlet", "state", "garden lights"] + from pytuya.cli import cli_root + cli_root() diff --git a/pytuya/cli/utils.py b/pytuya/cli/utils.py new file mode 100644 index 0000000..7ea1c34 --- /dev/null +++ b/pytuya/cli/utils.py @@ -0,0 +1,46 @@ +import yaml +import click +from pytuya.cli.main import cli_root, get_keys_from_file, build_config, config +from pytuya.utils import query_devices + + +@cli_root.group("utils") +def utils(): + pass + + +@utils.command("extract_keys") +@click.argument('api_response_path', default=None, type=click.Path(file_okay=True, dir_okay=False, readable=True)) +def extract_keys(api_response_path): + """ Extracts local keys from a recorded api response. API_RESPONSE_PATH is a file containing the response """ + result = get_keys_from_file(api_response_path) + pretty = yaml.dump({el["name"]: {k: v for k, v in el.items() if k != "name"} for el in result.values()}, + default_flow_style=False) + print(pretty) + + +@utils.command() +@click.option('-t', '--timeout', default=3.1, help="time spent for listening for device broadcasts") +def discover(timeout): + """ discovers tuya devices available on the network """ + result = query_devices(timeout_in_s=timeout) + pretty = yaml.dump(result, default_flow_style=False) + print(pretty) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cli_root() + + test_extract = list(sys.argv) + ["utils", "extract_keys", "../../example_response.json"] + test_query = list(sys.argv) + ["utils", "discover"] + + for test in (test_extract, test_query): + print("\nexecuting test: pytuya %s\n" % test) + sys.argv = test + try: + cli_root() + except SystemExit: + pass diff --git a/pytuya/devices.py b/pytuya/devices.py new file mode 100644 index 0000000..f87d07c --- /dev/null +++ b/pytuya/devices.py @@ -0,0 +1,349 @@ +# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices +# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U +# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug +# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa +# +# This would not exist without the protocol reverse engineering from +# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes +# +# Tested with Python 2.7 and Python 3.6.1 only + +from hashlib import md5 +import json +import logging +import socket +import time +import binascii +from pytuya.utils import hex2bin, bin2hex, AESCipher, Colour + +log = logging.getLogger(__name__) + +SET = 'set' +PROTOCOL_VERSION_BYTES = b'3.1' + +# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi +payload_dict = { + "device": { + "status": { + "hexByte": "0a", "command": {"gwId": "", "devId": ""} + }, + "set": { + "hexByte": "07", "command": {"devId": "", "uid": "", "t": ""} + }, + "prefix": "000055aa00000000000000", + # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + # + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) + "suffix": "000000000000aa55" + } +} + + +class XenonDevice(object): + def __init__(self, dev_id, address, local_key=None, dev_type='device', connection_timeout=10): + """ Represents a Tuya device. + + Args: + dev_id (str): The device id. + address (str): The network address. + local_key (str, optional): The encryption key. Defaults to None. + dev_type (str, optional): The device type. + It will be used as key for lookups in payload_dict. + Defaults to None. + + Attributes: + port (int): The port to connect to. + """ + self.id = dev_id + self.address = address + self.local_key = local_key.encode('latin1') + self.dev_type = dev_type + self.send_receive_max_tries = 3 + self.socket_timeout = connection_timeout / self.send_receive_max_tries + self.cipher = None + self.port = 6668 # default - do not expect caller to pass in + + def __repr__(self): + return '%r' % ((self.id, self.address),) # FIXME can do better than this + + def _send_receive(self, payload): + """ Send single buffer `payload` and receive a single buffer. + + Args: + payload(bytes): Data to send. + """ + + success, data = False, "" + for tries in range(1, self.send_receive_max_tries + 1): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.settimeout(self.socket_timeout) + s.connect((self.address, self.port)) + s.send(payload) + data = s.recv(1024) + success = True + break + except ConnectionResetError as e: + logging.warning("Connection attempt %i/%i: %s" % (tries, self.send_receive_max_tries, e)) + except socket.timeout as e: + logging.warning("Connection attempt %i/%i: %s" % (tries, self.send_receive_max_tries, e)) + + if not success: + raise RuntimeError("Unable to communicate with device") + else: + return data + + def generate_payload(self, command, data=None): + """ Generate the payload to send. + + Args: + command(str): The type of command. This is one of the entries from payload_dict + data(dict, optional): The data to be send. This is what will be passed via the 'dps' entry + """ + json_data = payload_dict[self.dev_type][command]['command'].copy() + + if 'gwId' in json_data: + json_data['gwId'] = self.id + if 'devId' in json_data: + json_data['devId'] = self.id + if 'uid' in json_data: + json_data['uid'] = self.id # still use id, no seperate uid + if 't' in json_data: + json_data['t'] = str(int(time.time())) + + if data is not None: + json_data['dps'] = data + + # Create byte buffer from hex data + json_payload = json.dumps(json_data) + json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! + json_payload = json_payload.encode('utf-8') + log.debug('json_payload=%r', json_payload) + + if command == SET: + # need to encrypt + self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new + json_payload = self.cipher.encrypt(json_payload) + pre_md5_str = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key + m = md5() + m.update(pre_md5_str) + json_payload = PROTOCOL_VERSION_BYTES + m.hexdigest()[8:][:16].encode('latin1') + json_payload + self.cipher = None # expect to connect and then disconnect to set new + + suffix = payload_dict[self.dev_type]['suffix'] + payload = bin2hex(json_payload) + crc32 = "%.8x" % binascii.crc32(bytearray(payload.encode())) + suffix = crc32 + suffix[-8:] + postfix_payload = hex2bin(payload + suffix) + postfix_payload_hex_len = '%x' % len(postfix_payload) + return hex2bin(payload_dict[self.dev_type]['prefix'] + payload_dict[self.dev_type][command]['hexByte'] + + '000000' + postfix_payload_hex_len) + postfix_payload + + +class Device(XenonDevice): + def status(self): + log.debug('status() entry') + # open device, send request, then close connection + payload = self.generate_payload('status') + + data = self._send_receive(payload) + log.debug('status received data=%r', data) + + result = data[20:-8] # hard coded offsets + log.debug('result=%r', result) + if result.startswith(b'{'): + # this is the regular expected code path + if not isinstance(result, str): + result = result.decode() + result = json.loads(result) + elif result.startswith(PROTOCOL_VERSION_BYTES): + # got an encrypted payload, happens occasionally + # expect json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} + # NOTE dps.2 may or may not be present + result = result[len(PROTOCOL_VERSION_BYTES):] # remove version header + result = result[16:] # remove first 16-bytes - MD5 hexdigest of payload (guess, unconfirmed) + cipher = AESCipher(self.local_key) + result = cipher.decrypt(result) + log.debug('decrypted result=%r', result) + if not isinstance(result, str): + result = result.decode() + result = json.loads(result) + else: + log.error('Unexpected status() payload=%r', result) + result = dict(error=result) + + return result + + def set_status(self, on, switch=1): + """ Set status of the device to 'on' or 'off'. + + Args: + on(bool): True for 'on', False for 'off'. + switch(int): The switch to set + """ + # open device, send request, then close connection + if isinstance(switch, int): + switch = str(switch) # index and payload is a string + payload = self.generate_payload(SET, {switch: on}) + # print('payload %r' % payload) + + data = self._send_receive(payload) + log.debug('set_status received data=%r', data) + + return data + + def turn_on(self, switch=1): + """ Turn the device on """ + self.set_status(True, switch) + + def turn_off(self, switch=1): + """ Turn the device off """ + self.set_status(False, switch) + + def set_timer(self, num_secs): + """ Set a timer. + + Args: + num_secs(int): Number of seconds + """ + # FIXME / TODO support schemas? Accept timer id number as parameter? + + # Dumb heuristic; Query status, pick last device id as that is probably the timer + status = self.status() + devices = status['dps'] + devices_numbers = list(devices.keys()) + devices_numbers.sort() + dps_id = devices_numbers[-1] + + payload = self.generate_payload(SET, {dps_id: num_secs}) + + data = self._send_receive(payload) + log.debug('set_timer received data=%r', data) + return data + + +class OutletDevice(Device): + def __init__(self, dev_id, address, local_key=None): + super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type='device') + + +class BulbDevice(Device): + DPS_INDEX_ON = '1' + DPS_INDEX_MODE = '2' + DPS_INDEX_BRIGHTNESS = '3' + DPS_INDEX_COLOUR_TEMP = '4' + DPS_INDEX_COLOUR = '5' + DPS_INDEX_COLOUR_SCENE = '6' + + DPS = 'dps' + DPS_MODE_COLOUR = 'colour' + DPS_MODE_COLOUR_SCENE = 'scene' + DPS_MODE_WHITE = 'white' + + def __init__(self, dev_id, address, local_key=None): + dev_type = 'device' + super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) + + def _send(self, mode=None, colour=None, brightness=None, colour_temp=None): + payload_data = {self.DPS_INDEX_MODE: mode, + self.DPS_INDEX_COLOUR: colour, + self.DPS_INDEX_BRIGHTNESS: brightness, + self.DPS_INDEX_COLOUR_TEMP: colour_temp} + payload = self.generate_payload(SET, {k: v for k, v in payload_data.items() if v is not None}) + return self._send_receive(payload) + + def set_colour(self, r, g, b): + """ Set colour of an rgb bulb. + Args: + r(int): Value for the colour red as int from 0-255. + g(int): Value for the colour green as int from 0-255. + b(int): Value for the colour blue as int from 0-255. """ + + for value, name in ((r, "red"), (b, "blue"), (g, "green")): + if not 0 <= value <= 255: + raise ValueError("The %s for red needs to be between 0 and 255." % name) + + return self._send(self.DPS_MODE_COLOUR, colour=Colour.rgb_to_hex_value(r, g, b)) + + def set_white(self, brightness, colour_temp): + """ Set white coloured theme of an rgb bulb. + Args: + brightness(int): Value for the brightness (25-255). + colour_temp(int): Value for the colour temperature (0-255). """ + if not 25 <= brightness <= 255: + raise ValueError("The brightness needs to be between 25 and 255.") + if not 0 <= colour_temp <= 255: + raise ValueError("The colour temperature needs to be between 0 and 255.") + + return self._send(self.DPS_MODE_WHITE, brightness=brightness, colour_temp=colour_temp) + + def set_brightness(self, brightness): + """ Set the brightness value of an rgb bulb. + Args: + brightness(int): Value for the brightness (25-255). """ + if not 25 <= brightness <= 255: + raise ValueError("The brightness needs to be between 25 and 255.") + + return self._send(brightness=brightness) + + def set_colour_temp(self, colour_temp): + """ Set the colour temperature of an rgb bulb. + Args: + colour_temp(int): Value for the colour temperature (0-255). """ + if not 0 <= colour_temp <= 255: + raise ValueError("The colour temperature needs to be between 0 and 255.") + + return self._send(colour_temp=colour_temp) + + def brightness(self): + """ Return brightness value """ + return self.status().get(self.DPS, {}).get(self.DPS_INDEX_BRIGHTNESS, 0) + + def colour_temp(self): + """ Return colour temperature """ + return self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR_TEMP, 0) + + def colour_rgb(self): + """ Return colour as RGB value """ + hex_value = self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR, "0"*6) + return Colour.hex_value_to_rgb(hex_value) + + def colour_hsv(self): + """ Return colour as HSV value """ + hex_value = self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR, "0"*14) + return Colour.hex_value_to_hsv(hex_value) + + def state(self): + dps = self.status().get(self.DPS, {}) + return {k: v for k, v in + dict(is_on=dps.get(self.DPS_INDEX_ON), + mode=dps.get(self.DPS_INDEX_MODE), + brightness=dps.get(self.DPS_INDEX_BRIGHTNESS), + colourtemp=dps.get(self.DPS_INDEX_COLOUR_TEMP), + colour=dps.get(self.DPS_INDEX_COLOUR)).items() if v is not None} + + +class CoverDevice(Device): + action_open = {'2': '1'} + action_close = {'2': '2'} + action_stop = {'2': '3'} + + def state(self): + status = self.status() + if type(status) is bytes: + return str(status) + return {'1': "opening or open", '2': "closing or closed", '3': "stopped"}.get(status.get('dps').get('1')) + + def send_action(self, action): + payload = self.generate_payload(command=SET, data=action) + self._send_receive(payload) + return + + def open(self): + self.send_action(self.action_open) + + def close(self): + self.send_action(self.action_close) + + def stop(self): + self.send_action(self.action_stop) diff --git a/pytuya/utils.py b/pytuya/utils.py new file mode 100644 index 0000000..7394158 --- /dev/null +++ b/pytuya/utils.py @@ -0,0 +1,237 @@ +import base64 +import sys +import logging +import time +import json +import colorsys +import re + +try: + # raise ImportError + import Crypto + from Crypto.Cipher import AES # PyCrypto +except ImportError: + Crypto = AES = None + import pyaes # https://github.com/ricmoo/pyaes + +log = logging.getLogger(__name__) +logging.basicConfig() # TODO include function name/line numbers in log +# log.setLevel(level=logging.DEBUG) # Debug hack! + +log.info('Python %s on %s', sys.version, sys.platform) +if Crypto is None: + log.info('Using pyaes version %r', pyaes.VERSION) + log.info('Using pyaes from %r', pyaes.__file__) +else: + log.info('Using PyCrypto %r', Crypto.version_info) + log.info('Using PyCrypto from %r', Crypto.__file__) + +SET = 'set' + +PROTOCOL_VERSION_BYTES = b'3.1' + +IS_PY2 = sys.version_info[0] == 2 + + +class AESCipher(object): + def __init__(self, key): + self.bs = 16 + self.key = key + + def encrypt(self, raw): + if Crypto: + raw = self._pad(raw) + cipher = AES.new(self.key, mode=AES.MODE_ECB) + crypted_text = cipher.encrypt(raw) + else: + _ = self._pad(raw) + cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + crypted_text = cipher.feed(raw) + crypted_text += cipher.feed() # flush final block + crypted_text_b64 = base64.b64encode(crypted_text) + return crypted_text_b64 + + def decrypt(self, enc): + enc = base64.b64decode(enc) + if Crypto: + cipher = AES.new(self.key, AES.MODE_ECB) + raw = cipher.decrypt(enc) + return self._unpad(raw).decode('utf-8') + else: + cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + plain_text = cipher.feed(enc) + plain_text += cipher.feed() # flush final block + return plain_text + + def _pad(self, s): + padnum = self.bs - len(s) % self.bs + return s + padnum * chr(padnum).encode() + + @staticmethod + def _unpad(s): + return s[:-ord(s[len(s) - 1:])] + + +def bin2hex(x, pretty=False): + space = ' ' if pretty else '' + if IS_PY2: + x = [ord(xi) for xi in x] + + return ''.join('%02X%s' % (y, space) for y in x) + + +def hex2bin(x): + if IS_PY2: + return x.decode('hex') + else: + return bytes.fromhex(x) + + +class Colour: + @staticmethod + def rgb_to_hex_value(r, g, b): + """ Convert an RGB value to the hex representation expected by tuya. + + Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: + rrggbb0hhhssvv + + While r, g and b are just hexadecimal values of the corresponding + Red, Green and Blue values, the h, s and v values (which are values + between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. + + Args: + r(int): Value for the colour red as int from 0-255. + g(int): Value for the colour green as int from 0-255. + b(int): Value for the colour blue as int from 0-255. + """ + rgb = [r, g, b] + hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) + + hex_value = "" + for value in rgb: + temp = str(hex(int(value))).replace("0x", "") + if len(temp) == 1: + temp = "0" + temp + hex_value = hex_value + temp + + hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] + hex_value_hsv = "" + for value in hsvarray: + temp = str(hex(int(value))).replace("0x", "") + if len(temp) == 1: + temp = "0" + temp + hex_value_hsv = hex_value_hsv + temp + if len(hex_value_hsv) == 7: + hex_value = hex_value + "0" + hex_value_hsv + else: + hex_value = hex_value + "00" + hex_value_hsv + + return hex_value + + @staticmethod + def hex_value_to_rgb(hex_value): + """ + Converts the hex_value used by tuya for colour representation into an RGB value. + + Args: + hex_value(string): The hex representation generated by Color.rgb_to_hex_value() + """ + r = int(hex_value[0:2], 16) + g = int(hex_value[2:4], 16) + b = int(hex_value[4:6], 16) + return r, g, b + + @staticmethod + def hex_value_to_hsv(hex_value): + """ + Converts the hex_value used by tuya for colour representation into an HSV value. + + Args: + hex_value(string): The hex representation generated by rgb_to_hex_value() + """ + h = int(hex_value[7:10], 16) / 360 + s = int(hex_value[10:12], 16) / 255 + v = int(hex_value[12:14], 16) / 255 + + return h, s, v + + +def query_devices(timeout_in_s=3.1, max_count=None): + from socket import socket, AF_INET, SOCK_DGRAM + + def decode_message(message): + json_msg = message[message.index(b"{"):-message[::-1].index(b"}")] + try: + return json.loads(json_msg) + except json.JSONDecodeError as e: + logging.warning("Error occurred while trying to decode json message: %s", e) + return {} + + s = socket(AF_INET, SOCK_DGRAM) + s.bind(('', 6666)) + s.settimeout(timeout_in_s) + + guids = {} + t_start = time.time() + while time.time() < t_start + timeout_in_s: + message = s.recv(1024) + data = decode_message(message) + if data.get("gwId") not in guids: + guids[data.get("gwId")] = data + if max_count and len(guids) == max_count: + break + + return guids + + +class KeyExtractor: + # extracts device ids and corresponding local keys from a server api response + # which can be obtained by sniffing app traffic (e.g. using ssl capture app). see howto for details. + # local keys are necessary to send commands to devices + + @staticmethod + def get_device_keys_hacky(api_response): + api_response = str(api_response) + + def get_json_value(key, json_str): + matches = re.findall("\"%s\":\"([a-zA-Z0-9\ -]*)\"" % key, json_str, re.DOTALL) + return matches[0] if len(matches) > 0 else None + + keys = {} + + # split api result by devId entry and extract next found localKey entry as corresponding key + # CAREFULLY NOTE: this assumes the devId entry always comes BEFORE the localKey entry in the dictionary + for dev_result in api_response.split("devId")[1:]: + dev_id = dev_result[dev_result.index(":") + 2:dev_result.index("\",")] + key = get_json_value("localKey", dev_result) + if key is None: + logging.warning("no key found for entry with id %s" % dev_id) + continue + + name = get_json_value("name", dev_result) + keys[dev_id] = dict(key=key, name=name, id=dev_id) + + return keys + + @staticmethod + def get_device_keys_json(api_response): + data = json.loads(api_response) + keys = {} + for entry in data.get("result", []): + if entry.get('a') == 'tuya.m.my.group.device.list': + for dev in entry.get('result'): + if 'devId' in dev: + keys[dev['devId']] = dict(key=dev['localKey'], name=dev.get("name", ""), id=dev['devId']) + if 'devId' in entry: + dev = entry + keys[dev['devId']] = dict(key=dev['localKey'], name=dev.get("name", ""), id=dev['devId']) + return keys + + @staticmethod + def parse_device_keys_from_api_response(api_response): + try: + keys = KeyExtractor.get_device_keys_json(api_response) + except Exception as e: + logging.info("getting device keys using json method failed: \n\t%s\ntrying hacky method instead.." % e) + keys = KeyExtractor.get_device_keys_hacky(api_response) + return keys diff --git a/requirements.txt b/requirements.txt index f7d3e23..609b1bd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ pycrypto==2.6.1 +pyyaml +click \ No newline at end of file diff --git a/setup.py b/setup.py index cd02ba5..a5cff34 100644 --- a/setup.py +++ b/setup.py @@ -55,9 +55,14 @@ 'Topic :: Home Automation', ], keywords='home automation', - packages=['pytuya'], + packages=['pytuya', 'pytuya.cli'], platforms='any', install_requires=[ 'pyaes', # NOTE this is optional, AES can be provided via PyCrypto or PyCryptodome + 'pyyaml', + 'click' ], + entry_points={ + 'console_scripts': ['pytuya=pytuya.cli:main'], + } ) diff --git a/tests.py b/tests.py index 826b067..69b6f56 100755 --- a/tests.py +++ b/tests.py @@ -1,28 +1,26 @@ #!/usr/bin/env python3 +import json import logging - +import struct +import pytuya import unittest + try: from unittest.mock import MagicMock # Python 3 except ImportError: from mock import MagicMock # py2 use https://pypi.python.org/pypi/mock -from hashlib import md5 -import json -import logging -import struct + # Enable info logging to see version information log = logging.getLogger('pytuya') log.setLevel(level=logging.INFO) -#log.setLevel(level=logging.DEBUG) # Debug hack! - -import pytuya +# log.setLevel(level=logging.DEBUG) # Debug hack! LOCAL_KEY = '0123456789abcdef' - mock_byte_encoding = 'utf-8' + def compare_json_strings(json1, json2, ignoring_keys=None): json1 = json.loads(json1) json2 = json.loads(json2) @@ -33,51 +31,57 @@ def compare_json_strings(json1, json2, ignoring_keys=None): return json.dumps(json1, sort_keys=True) == json.dumps(json2, sort_keys=True) + def check_data_frame(data, expected_prefix, encrypted=True): prefix = data[:15] suffix = data[-8:] - + if encrypted: - payload_len = struct.unpack(">B",data[15:16])[0] # big-endian, unsigned char + payload_len = struct.unpack(">B", data[15:16])[0] # big-endian, unsigned char version = data[16:19] checksum = data[19:35] encrypted_json = data[35:-8] - - json_data = pytuya.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) + json_data = pytuya.utils.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) else: json_data = data[16:-8].decode(mock_byte_encoding) - + frame_ok = True - if prefix != pytuya.hex2bin(expected_prefix): + if prefix != pytuya.utils.hex2bin(expected_prefix): frame_ok = False - elif suffix != pytuya.hex2bin("000000000000aa55"): + elif suffix != pytuya.utils.hex2bin("000000000000aa55"): frame_ok = False elif encrypted: if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(suffix): frame_ok = False elif version != b"3.1": frame_ok = False - + return json_data, frame_ok - + + def mock_send_receive_set_timer(data): + if not hasattr(mock_send_receive_set_timer, "call_counter"): + mock_send_receive_set_timer.call_counter = 0 if mock_send_receive_set_timer.call_counter == 0: - ret = 20*chr(0x0) + '{"devId":"DEVICE_ID","dps":{"1":false,"2":0}}' + 8*chr(0x0) + ret = 20 * chr(0x0) + '{"devId":"DEVICE_ID","dps":{"1":false,"2":0}}' + 8 * chr(0x0) elif mock_send_receive_set_timer.call_counter == 1: - expected = '{"uid":"DEVICE_ID_HERE","devId":"DEVICE_ID_HERE","t":"","dps":{"2":6666}}' + expected = '{"uid":"ID","devId":"ID","t":"","dps":{"2":6666}}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") - + if frame_ok and compare_json_strings(json_data, expected, ['t']): ret = '{"test_result":"SUCCESS"}' else: ret = '{"test_result":"FAIL"}' + else: + raise RuntimeError("unexpected counter of > 1") ret = ret.encode(mock_byte_encoding) mock_send_receive_set_timer.call_counter += 1 return ret - + + def mock_send_receive_set_status(data): - expected = '{"dps":{"1":true},"uid":"DEVICE_ID_HERE","t":"1516117564","devId":"DEVICE_ID_HERE"}' + expected = '{"dps":{"1":true},"uid":"ID","t":"1516117564","devId":"ID"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") if frame_ok and compare_json_strings(json_data, expected, ['t']): @@ -89,8 +93,9 @@ def mock_send_receive_set_status(data): ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_status(data): - expected = '{"devId":"DEVICE_ID_HERE","gwId":"DEVICE_ID_HERE"}' + expected = '{"devId":"ID","gwId":"ID"}' json_data, frame_ok = check_data_frame(data, "000055aa000000000000000a000000", False) # FIXME dead code block @@ -100,12 +105,13 @@ def mock_send_receive_status(data): logging.error("json data not the same: {} != {}".format(json_data, expected)) ret = '{"test_result":"FAIL"}' - ret = 20*chr(0) + ret + 8*chr(0) + ret = 20 * chr(0) + ret + 8 * chr(0) ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_set_colour(data): - expected = '{"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' + expected = '{"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"ID","uid":"ID", "t":"1516117564"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") @@ -118,8 +124,9 @@ def mock_send_receive_set_colour(data): ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_set_white(data): - expected = '{"dps":{"2":"white", "3":255, "4":255}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' + expected = '{"dps":{"2":"white", "3":255, "4":255}, "devId":"ID","uid":"ID", "t":"1516117564"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") if frame_ok and compare_json_strings(json_data, expected, ['t']): @@ -131,53 +138,56 @@ def mock_send_receive_set_white(data): ret = ret.encode(mock_byte_encoding) return ret + class TestXenonDevice(unittest.TestCase): def test_set_timer(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_timer) # Reset call_counter and start test mock_send_receive_set_timer.call_counter = 0 result = d.set_timer(6666) - result = result[result.find(b'{'):result.rfind(b'}')+1] - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json + result = result[result.find(b'{'):result.rfind(b'}') + 1] + result = result.decode( + mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json result = json.loads(result) # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") def test_set_status(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_status) result = d.set_status(True, 1) - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json + result = result.decode( + mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json result = json.loads(result) # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") def test_status(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_status) result = d.status() # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") - + def test_set_colour(self): - d = pytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.BulbDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_colour) - result = d.set_colour(255,255,255) + result = d.set_colour(255, 255, 255) result = result.decode(mock_byte_encoding) result = json.loads(result) self.assertEqual(result['test_result'], "SUCCESS") def test_set_white(self): - d = pytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.BulbDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_white) result = d.set_white(255, 255) @@ -186,5 +196,6 @@ def test_set_white(self): self.assertEqual(result['test_result'], "SUCCESS") + if __name__ == '__main__': unittest.main()