diff --git a/dfu.py b/dfu.py index bf4f4fc..281567f 100755 --- a/dfu.py +++ b/dfu.py @@ -1,188 +1,132 @@ #!/usr/bin/env python -""" ------------------------------------------------------------------------------- - DFU Server for Nordic nRF51 based systems. - Conforms to nRF51_SDK 11.0 BLE_DFU requirements. ------------------------------------------------------------------------------- -""" -import os, re + +from __future__ import print_function + +import argparse +import os +import re import sys -import optparse -import time -import math -import traceback -from unpacker import Unpacker +import util from ble_secure_dfu_controller import BleDfuControllerSecure from ble_legacy_dfu_controller import BleDfuControllerLegacy +from unpacker import Unpacker -def main(): - - init_msg = """ - ================================ - == == - == DFU Server == - == == - ================================ - """ - - # print "DFU Server start" - print init_msg - try: - parser = optparse.OptionParser(usage='%prog -f -a \n\nExample:\n\tdfu.py -f application.hex -d application.dat -a cd:e3:4a:47:1c:e4', - version='0.5') - - parser.add_option('-a', '--address', - action='store', - dest="address", - type="string", - default=None, - help='DFU target address.' - ) - - parser.add_option('-f', '--file', - action='store', - dest="hexfile", - type="string", - default=None, - help='hex file to be uploaded.' - ) - - parser.add_option('-d', '--dat', - action='store', - dest="datfile", - type="string", - default=None, - help='dat file to be uploaded.' - ) - - parser.add_option('-z', '--zip', - action='store', - dest="zipfile", - type="string", - default=None, - help='zip file to be used.' - ) - - parser.add_option('--secure', - action='store_true', - dest='secure_dfu', - default=True, - help='Use secure bootloader (Nordic SDK > 12)' - ) - - parser.add_option('--legacy', - action='store_false', - dest='secure_dfu', - help='Use secure bootloader (Nordic SDK < 12)' - ) - - options, args = parser.parse_args() - - except Exception, e: - print e - print "For help use --help" +def main(): + purpose = 'Support for Over The Air (OTA) Device Firmware Update (DFU) ' \ + 'process of Nordic Semiconductor nRF5 (nRF51 or nRF52) based ' \ + 'Bluetooth Low Energy (BLE) peripherals.' + parser = argparse.ArgumentParser(description=purpose) + parser.add_argument( + '--address', '-a', + dest="address", + required=True, + help="target address of DFU capable device, " + "like: 'DE:AD:BE:EF:01:02' or 'deadbeef0102'" + ) + parser.add_argument( + '--file', '-f', + dest="hex_file", + help='the .hex file to be uploaded' + ) + parser.add_argument( + '--dat', '-d', + dest="dat_file", + help='the .dat file to be uploaded' + ) + parser.add_argument( + '--zip', '-z', + dest="zip_file", + help='the .zip file to be used (with .bin / .dat files)' + ) + parser.add_argument( + '--legacy', + dest="secure_dfu", + action='store_false', + help='use legacy bootloader (Nordic SDK < 12)' + ) + parser.add_argument( + '--secure', + dest="secure_dfu", + action='store_true', + help='use secure bootloader (Nordic SDK >= 12)' + ) + args = parser.parse_args() + + # ensure a proper formatted address + mac_address = util.normalize_address(args.address) + if not mac_address: + print("Incorrect MAC-address '{}'".format(args.address)) sys.exit(2) - try: - - ''' Validate input parameters ''' - - if not options.address: - parser.print_help() - exit(2) - - unpacker = None - hexfile = None - datfile = None - - if options.zipfile != None: - - if (options.hexfile != None) or (options.datfile != None): - print "Conflicting input directives" - exit(2) - - unpacker = Unpacker() - #print options.zipfile - try: - hexfile, datfile = unpacker.unpack_zipfile(options.zipfile) - except Exception, e: - print "ERR" - print e - pass - - else: - if (not options.hexfile) or (not options.datfile): - parser.print_help() - exit(2) - - if not os.path.isfile(options.hexfile): - print "Error: Hex file doesn't exist" - exit(2) - - if not os.path.isfile(options.datfile): - print "Error: DAT file doesn't exist" - exit(2) - - hexfile = options.hexfile - datfile = options.datfile + # determine the actual firmware files to use + unpacker = Unpacker() + hex_fname = args.hex_file or '' + dat_fname = args.dat_file or '' + if args.zip_file: + if args.hex_file or args.dat_file: + print("Conflicting input directives, too many files specified.") + sys.exit(2) + try: + hex_fname, dat_fname = unpacker.unpack_zipfile(args.zip_file) + except Exception as err: + print(err) + elif (not args.hex_file) or (not args.dat_file): + print("Missing input directives, too few files specified.") + sys.exit(2) + # check that files exist + if not os.path.isfile(hex_fname): + print("Error: .hex file '{}' doesn't exist".format(hex_fname)) + exit(2) + if not os.path.isfile(dat_fname): + print("Error: .dat file '{}' doesn't exist".format(dat_fname)) + exit(2) - ''' Start of Device Firmware Update processing ''' + # initialize the DFU handler to use + if args.secure_dfu: + ble_dfu = BleDfuControllerSecure(mac_address, hex_fname, dat_fname) + else: + ble_dfu = BleDfuControllerLegacy(mac_address, hex_fname, dat_fname) - if options.secure_dfu: - ble_dfu = BleDfuControllerSecure(options.address.upper(), hexfile, datfile) - else: - ble_dfu = BleDfuControllerLegacy(options.address.upper(), hexfile, datfile) - - # Initialize inputs + try: + # initialize inputs ble_dfu.input_setup() - # Connect to peer device. Assume application mode. + # connect to peripheral; assume application mode if ble_dfu.scan_and_connect(): if not ble_dfu.check_DFU_mode(): - print "Need to switch to DFU mode" + print("Need to switch to DFU mode") success = ble_dfu.switch_to_dfu_mode() if not success: - print "Couldn't reconnect" + print("Couldn't reconnect") else: - # The device might already be in DFU mode (MAC + 1) + # the device might already be in DFU mode (MAC + 1) ble_dfu.target_mac_increase(1) - - # Try connection with new address - print "Couldn't connect, will try DFU MAC" + # try connection with new address + print("Couldn't connect, will try DFU MAC") if not ble_dfu.scan_and_connect(): raise Exception("Can't connect to device") + # perfom the DFU process ble_dfu.start() - # Disconnect from peer device if not done already and clean up. + # disconnect from peer device if not done already and clean up ble_dfu.disconnect() - except Exception, e: - # print traceback.format_exc() - print "Exception at line {}: {}".format(sys.exc_info()[2].tb_lineno, e) + except Exception as err: + print("Exception at line {}: {}".format( + sys.exc_info()[2].tb_lineno, err)) pass - except: - pass + # if Unpacker for zipfile used then delete Unpacker + if unpacker: + unpacker.delete() - # If Unpacker for zipfile used then delete Unpacker - if unpacker != None: - unpacker.delete() + print("Done.") - print "DFU Server done" -""" ------------------------------------------------------------------------------- - ------------------------------------------------------------------------------- -""" if __name__ == '__main__': - - # Do not litter the world with broken .pyc files. - sys.dont_write_bytecode = True - main() diff --git a/util.py b/util.py index b67da7f..348ec67 100644 --- a/util.py +++ b/util.py @@ -2,56 +2,84 @@ import binascii import re + +# The standard (IEEE 802) format for printing MAC-48 addresses +# in human-friendly form is six groups of two hexadecimal digits, +# separated by "-" hyphens or ":" colons. In order to extract +# this from input by REGEX_HEX12, first remove hyphens/colons. +REGEX_HEX12 = re.compile('^([0-9A-F]{12})$') + + +def normalize_address(address, ignore=':-'): + """ + Normalize given address in uppercase mac-address with colons. + Returns given address formatted like 'DE:AD:BE:EF:01:02' + or an empty string if the given address doesn't match. + """ + address = address.strip().upper() + for c in ignore: + address = address.replace(c, '') + m = REGEX_HEX12.match(address) + if m: + address = m.group(0) + return ':'.join(address[i:i + 2] for i in range(0, 12, 2)) + return '' + + def bytes_to_uint32_le(bytes): return (int(bytes[3], 16) << 24) | (int(bytes[2], 16) << 16) | (int(bytes[1], 16) << 8) | (int(bytes[0], 16) << 0) + def uint32_to_bytes_le(uint32): return [(uint32 >> 0) & 0xff, (uint32 >> 8) & 0xff, (uint32 >> 16) & 0xff, (uint32 >> 24) & 0xff] + def uint16_to_bytes_le(value): return [(value >> 0 & 0xFF), (value >> 8 & 0xFF)] + def zero_pad_array_le(data, padsize): for i in range(0, padsize): data.insert(0, 0) + def array_to_hex_string(arr): hex_str = "" for val in arr: if val > 255: raise Exception("Value is greater than it is possible to represent with one byte") hex_str += "%02x" % val - return hex_str + def crc32_unsigned(bytestring): return binascii.crc32(bytestring) % (1 << 32) + def mac_string_to_uint(mac): parts = list(re.match('(..):(..):(..):(..):(..):(..)', mac).groups()) ints = map(lambda x: int(x, 16), parts) - res = 0 for i in range(0, len(ints)): res += (ints[len(ints)-1 - i] << 8*i) - return res + def uint_to_mac_string(mac): ints = [0, 0, 0, 0, 0, 0] for i in range(0, len(ints)): ints[len(ints)-1 - i] = (mac >> 8*i) & 0xff - return ':'.join(map(lambda x: '{:02x}'.format(x).upper(), ints)) -# Print a nice console progress bar + def print_progress(iteration, total, prefix = '', suffix = '', decimals = 1, barLength = 100): """ - Call in a loop to create terminal progress bar + Print a nice console progress bar. + Call in a loop to provide a console progress bar. @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int)