324 lines
13 KiB
Python
324 lines
13 KiB
Python
import math
|
|
import pexpect
|
|
import time
|
|
|
|
from array import array
|
|
from util import *
|
|
|
|
from nrf_ble_dfu_controller import NrfBleDfuController
|
|
|
|
verbose = False
|
|
|
|
class Procedures:
|
|
CREATE = 0x01
|
|
SET_PRN = 0x02
|
|
CALC_CHECKSUM = 0x03
|
|
EXECUTE = 0x04
|
|
SELECT = 0x06
|
|
RESPONSE = 0x60
|
|
|
|
PARAM_COMMAND = 0x01
|
|
PARAM_DATA = 0x02
|
|
|
|
string_map = {
|
|
CREATE : "CREATE",
|
|
SET_PRN : "SET_PRN",
|
|
CALC_CHECKSUM : "CALC_CHECKSUM",
|
|
EXECUTE : "EXECUTE",
|
|
SELECT : "SELECT",
|
|
RESPONSE : "RESPONSE",
|
|
}
|
|
|
|
@staticmethod
|
|
def to_string(proc):
|
|
return Procedures.string_map[proc]
|
|
|
|
@staticmethod
|
|
def from_string(proc_str):
|
|
return int(proc_str, 16)
|
|
|
|
class Results:
|
|
INVALID_CODE = 0x00
|
|
SUCCESS = 0x01
|
|
OPCODE_NOT_SUPPORTED = 0x02
|
|
INVALID_PARAMETER = 0x03
|
|
INSUFF_RESOURCES = 0x04
|
|
INVALID_OBJECT = 0x05
|
|
UNSUPPORTED_TYPE = 0x07
|
|
OPERATION_NOT_PERMITTED = 0x08
|
|
OPERATION_FAILED = 0x0A
|
|
|
|
string_map = {
|
|
INVALID_CODE : "INVALID_CODE",
|
|
SUCCESS : "SUCCESS",
|
|
OPCODE_NOT_SUPPORTED : "OPCODE_NOT_SUPPORTED",
|
|
INVALID_PARAMETER : "INVALID_PARAMETER",
|
|
INSUFF_RESOURCES : "INSUFFICIENT_RESOURCES",
|
|
INVALID_OBJECT : "INVALID_OBJECT",
|
|
UNSUPPORTED_TYPE : "UNSUPPORTED_TYPE",
|
|
OPERATION_NOT_PERMITTED : "OPERATION_NOT_PERMITTED",
|
|
OPERATION_FAILED : "OPERATION_FAILED",
|
|
}
|
|
|
|
@staticmethod
|
|
def to_string(res):
|
|
return Results.string_map[res]
|
|
|
|
@staticmethod
|
|
def from_string(res_str):
|
|
return int(res_str, 16)
|
|
|
|
|
|
class BleDfuControllerSecure(NrfBleDfuController):
|
|
# Class constants
|
|
UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50'
|
|
UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50'
|
|
UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50'
|
|
|
|
# Constructor inherited from abstract base class
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Start the firmware update process
|
|
# --------------------------------------------------------------------------
|
|
def start(self):
|
|
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
|
|
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)
|
|
|
|
if verbose:
|
|
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
|
|
print('Packet handle: 0x%04x' % (self.data_handle))
|
|
|
|
# Subscribe to notifications from Control Point characteristic
|
|
self._enable_notifications(self.ctrlpt_cccd_handle)
|
|
|
|
# Set the Packet Receipt Notification interval
|
|
prn = uint16_to_bytes_le(self.pkt_receipt_interval)
|
|
self._dfu_send_command(Procedures.SET_PRN, prn)
|
|
|
|
self._dfu_send_init()
|
|
|
|
self._dfu_send_image()
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Check if the peripheral is running in bootloader (DFU) or application mode
|
|
# Returns True if the peripheral is in DFU mode
|
|
# --------------------------------------------------------------------------
|
|
def check_DFU_mode(self):
|
|
print("Checking DFU State...")
|
|
|
|
self.ble_conn.sendline('characteristics')
|
|
|
|
dfu_mode = False
|
|
|
|
try:
|
|
self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2)
|
|
except pexpect.TIMEOUT as e:
|
|
dfu_mode = True
|
|
|
|
return dfu_mode
|
|
|
|
def switch_to_dfu_mode(self):
|
|
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS)
|
|
|
|
self._enable_notifications(bl_cccd_handle)
|
|
|
|
# Reset the board in DFU mode. After reset the board will be disconnected
|
|
cmd = 'char-write-req 0x%04x 01' % (bl_value_handle)
|
|
self.ble_conn.sendline(cmd)
|
|
|
|
# Wait some time for board to reboot
|
|
time.sleep(0.5)
|
|
|
|
# Increase the mac address by one and reconnect
|
|
self.target_mac_increase(1)
|
|
return self.scan_and_connect()
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Parse notification status results
|
|
# --------------------------------------------------------------------------
|
|
def _dfu_parse_notify(self, notify):
|
|
if len(notify) < 3:
|
|
print("notify data length error")
|
|
return None
|
|
|
|
if verbose: print(notify)
|
|
|
|
dfu_notify_opcode = Procedures.from_string(notify[0])
|
|
if dfu_notify_opcode == Procedures.RESPONSE:
|
|
|
|
dfu_procedure = Procedures.from_string(notify[1])
|
|
dfu_result = Results.from_string(notify[2])
|
|
|
|
procedure_str = Procedures.to_string(dfu_procedure)
|
|
result_str = Results.to_string(dfu_result)
|
|
|
|
# if verbose: print "opcode: {0}, proc: {1}, res: {2}".format(dfu_notify_opcode, procedure_str, result_str)
|
|
if verbose: print("opcode: 0x%02x, proc: %s, res: %s" % (dfu_notify_opcode, procedure_str, result_str))
|
|
|
|
# Packet Receipt notifications are sent in the exact same format
|
|
# as responses to the CALC_CHECKSUM procedure.
|
|
if(dfu_procedure == Procedures.CALC_CHECKSUM and dfu_result == Results.SUCCESS):
|
|
offset = bytes_to_uint32_le(notify[3:7])
|
|
crc32 = bytes_to_uint32_le(notify[7:11])
|
|
|
|
return (dfu_procedure, dfu_result, offset, crc32)
|
|
|
|
elif(dfu_procedure == Procedures.SELECT and dfu_result == Results.SUCCESS):
|
|
max_size = bytes_to_uint32_le(notify[3:7])
|
|
offset = bytes_to_uint32_le(notify[7:11])
|
|
crc32 = bytes_to_uint32_le(notify[11:15])
|
|
|
|
return (dfu_procedure, dfu_result, max_size, offset, crc32)
|
|
|
|
else:
|
|
return (dfu_procedure, dfu_result)
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Wait for a notification and parse the response
|
|
# --------------------------------------------------------------------------
|
|
def _wait_and_parse_notify(self):
|
|
if verbose: print("Waiting for notification")
|
|
notify = self._dfu_wait_for_notify()
|
|
|
|
if notify is None:
|
|
raise Exception("No notification received")
|
|
|
|
if verbose: print("Parsing notification")
|
|
|
|
result = self._dfu_parse_notify(notify)
|
|
if result[1] != Results.SUCCESS:
|
|
raise Exception("Error in {} procedure, reason: {}".format(
|
|
Procedures.to_string(result[0]),
|
|
Results.to_string(result[1])))
|
|
|
|
return result
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Send the Init info (*.dat file contents) to peripheral device.
|
|
# --------------------------------------------------------------------------
|
|
def _dfu_send_init(self):
|
|
if verbose: print("dfu_send_init")
|
|
|
|
# Open the DAT file and create array of its contents
|
|
init_bin_array = array('B', open(self.datfile_path, 'rb').read())
|
|
init_size = len(init_bin_array)
|
|
init_crc = 0;
|
|
|
|
# Select command
|
|
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_COMMAND]);
|
|
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
|
|
|
|
if offset != init_size or crc32 != init_crc:
|
|
if offset == 0 or offset > init_size:
|
|
# Create command
|
|
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_COMMAND] + uint32_to_bytes_le(init_size))
|
|
res = self._wait_and_parse_notify()
|
|
|
|
segment_count = 0
|
|
segment_total = int(math.ceil(init_size/float(self.pkt_payload_size)))
|
|
|
|
for i in range(0, init_size, self.pkt_payload_size):
|
|
segment = init_bin_array[i:i + self.pkt_payload_size]
|
|
self._dfu_send_data(segment)
|
|
segment_count += 1
|
|
|
|
if (segment_count % self.pkt_receipt_interval) == 0:
|
|
(proc, res, offset, crc32) = self._wait_and_parse_notify()
|
|
|
|
if res != Results.SUCCESS:
|
|
raise Exception("bad notification status: {}".format(Results.to_string(res)))
|
|
|
|
# Calculate CRC
|
|
self._dfu_send_command(Procedures.CALC_CHECKSUM)
|
|
self._wait_and_parse_notify()
|
|
|
|
# Execute command
|
|
self._dfu_send_command(Procedures.EXECUTE)
|
|
self._wait_and_parse_notify()
|
|
|
|
print("Init packet successfully transferred")
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Send the Firmware image to peripheral device.
|
|
# --------------------------------------------------------------------------
|
|
def _dfu_send_image(self):
|
|
if verbose: print("dfu_send_image")
|
|
|
|
# Select Data Object
|
|
self._dfu_send_command(Procedures.SELECT, [Procedures.PARAM_DATA])
|
|
(proc, res, max_size, offset, crc32) = self._wait_and_parse_notify()
|
|
|
|
# Split the firmware into multiple objects
|
|
num_objects = int(math.ceil(self.image_size / float(max_size)))
|
|
print("Max object size: %d, num objects: %d, offset: %d, total size: %d" % (max_size, num_objects, offset, self.image_size))
|
|
|
|
time_start = time.time()
|
|
last_send_time = time.time()
|
|
|
|
obj_offset = (offset/max_size)*max_size
|
|
while(obj_offset < self.image_size):
|
|
# print "\nSending object {} of {}".format(obj_offset/max_size+1, num_objects)
|
|
obj_offset += self._dfu_send_object(obj_offset, max_size)
|
|
|
|
# Image uploaded successfully, update the progress bar
|
|
print_progress(self.image_size, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
|
|
|
|
duration = time.time() - time_start
|
|
print("\nUpload complete in {} minutes and {} seconds".format(int(duration / 60), int(duration % 60)))
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Send a single data object of given size and offset.
|
|
# --------------------------------------------------------------------------
|
|
def _dfu_send_object(self, offset, obj_max_size):
|
|
if offset != self.image_size:
|
|
if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]):
|
|
# Create Data Object
|
|
size = min(obj_max_size, self.image_size - offset)
|
|
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size))
|
|
self._wait_and_parse_notify()
|
|
|
|
segment_count = 0
|
|
segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size)))
|
|
|
|
segment_begin = offset
|
|
segment_end = min(offset+obj_max_size, self.image_size)
|
|
|
|
for i in range(segment_begin, segment_end, self.pkt_payload_size):
|
|
num_bytes = min(self.pkt_payload_size, segment_end - i)
|
|
segment = self.bin_array[i:i + num_bytes]
|
|
self._dfu_send_data(segment)
|
|
segment_count += 1
|
|
|
|
# print "j: {} i: {}, end: {}, bytes: {}, size: {} segment #{} of {}".format(
|
|
# offset, i, segment_end, num_bytes, self.image_size, segment_count, segment_total)
|
|
|
|
if (segment_count % self.pkt_receipt_interval) == 0:
|
|
try:
|
|
(proc, res, offset, crc32) = self._wait_and_parse_notify()
|
|
except e:
|
|
# Likely no notification received, need to re-transmit object
|
|
return 0
|
|
|
|
if res != Results.SUCCESS:
|
|
raise Exception("bad notification status: {}".format(Results.to_string(res)))
|
|
|
|
if crc32 != crc32_unsigned(self.bin_array[0:offset]):
|
|
# Something went wrong, need to re-transmit this object
|
|
return 0
|
|
|
|
print_progress(offset, self.image_size, prefix = 'Progress:', suffix = 'Complete', barLength = 50)
|
|
|
|
# Calculate CRC
|
|
self._dfu_send_command(Procedures.CALC_CHECKSUM)
|
|
(proc, res, offset, crc32) = self._wait_and_parse_notify()
|
|
if(crc32 != crc32_unsigned(self.bin_array[0:offset])):
|
|
# Need to re-transmit object
|
|
return 0
|
|
|
|
# Execute command
|
|
self._dfu_send_command(Procedures.EXECUTE)
|
|
self._wait_and_parse_notify()
|
|
|
|
# If everything executed correctly, return amount of bytes transferred
|
|
return obj_max_size
|