Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added app_vsens.zip
Binary file not shown.
Empty file added application.json
Empty file.
2 changes: 1 addition & 1 deletion ble_legacy_dfu_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def check_DFU_mode(self):
try:
res = self.ble_conn.expect('handle:.*', timeout=10)
# res = self.ble_conn.expect('handle:', timeout=10)
except pexpect.TIMEOUT, e:
except pexpect.TIMEOUT as e:
print("State timeout")
except:
pass
Expand Down
23 changes: 12 additions & 11 deletions ble_secure_dfu_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import math
import pexpect
import time
import binascii

from array import array
from util import *
Expand Down Expand Up @@ -71,9 +72,9 @@ def from_string(res_str):

class BleDfuControllerSecure(NrfBleDfuController):
# Class constants
UUID_BUTTONLESS = '8e400001-f315-4f60-9fb8-838830daea50'
UUID_CONTROL_POINT = '8ec90001-f315-4f60-9fb8-838830daea50'
UUID_PACKET = '8ec90002-f315-4f60-9fb8-838830daea50'
UUID_BUTTONLESS = '8ec90003-f315-4f60-9fb8-838830daea50'

# Constructor inherited from abstract base class

Expand All @@ -83,7 +84,6 @@ class BleDfuControllerSecure(NrfBleDfuController):
def start(self):
(_, self.ctrlpt_handle, self.ctrlpt_cccd_handle) = self._get_handles(self.UUID_CONTROL_POINT)
(_, self.data_handle, _) = self._get_handles(self.UUID_PACKET)

if verbose:
print('Control Point Handle: 0x%04x, CCCD: 0x%04x' % (self.ctrlpt_handle, self.ctrlpt_cccd_handle))
print('Packet handle: 0x%04x' % (self.data_handle))
Expand Down Expand Up @@ -111,23 +111,24 @@ def check_DFU_mode(self):
dfu_mode = False

try:
self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=2)
except (pexpect.TIMEOUT , e):
self.ble_conn.expect([self.UUID_BUTTONLESS], timeout=5)
except pexpect.TIMEOUT as e:
dfu_mode = True

return dfu_mode

def switch_to_dfu_mode(self):
(_, bl_value_handle, bl_cccd_handle) = self._get_handles(self.UUID_BUTTONLESS)

self._enable_notifications(bl_cccd_handle)
self._enable_indications(bl_cccd_handle)

# Reset the board in DFU mode. After reset the board will be disconnected
cmd = 'char-write-req 0x%04x 01' % (bl_value_handle)
self.ble_conn.sendline(cmd)

# Wait some time for board to reboot
time.sleep(0.5)
time.sleep(1)
print("Switched to DFU mode Successfully")

# Increase the mac address by one and reconnect
self.target_mac_increase(1)
Expand Down Expand Up @@ -189,7 +190,7 @@ def _wait_and_parse_notify(self):
if result[1] != Results.SUCCESS:
raise Exception("Error in {} procedure, reason: {}".format(
Procedures.to_string(result[0]),
Results.to_string(result[1])))
Results.to_string(result[1])))

return result

Expand Down Expand Up @@ -274,14 +275,14 @@ def _dfu_send_object(self, offset, obj_max_size):
if offset == 0 or offset >= obj_max_size or crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Create Data Object
size = min(obj_max_size, self.image_size - offset)
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(size))
self._dfu_send_command(Procedures.CREATE, [Procedures.PARAM_DATA] + uint32_to_bytes_le(int(size)))
self._wait_and_parse_notify()

segment_count = 0
segment_total = int(math.ceil(min(obj_max_size, self.image_size-offset)/float(self.pkt_payload_size)))

segment_begin = offset
segment_end = min(offset+obj_max_size, self.image_size)
segment_begin = int(offset)
segment_end = int(min(offset+obj_max_size, self.image_size))

for i in range(segment_begin, segment_end, self.pkt_payload_size):
num_bytes = min(self.pkt_payload_size, segment_end - i)
Expand All @@ -301,7 +302,7 @@ def _dfu_send_object(self, offset, obj_max_size):

if res != Results.SUCCESS:
raise Exception("bad notification status: {}".format(Results.to_string(res)))

if crc32 != crc32_unsigned(self.bin_array[0:offset]):
# Something went wrong, need to re-transmit this object
return 0
Expand Down
24 changes: 15 additions & 9 deletions dfu.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

def main():

init_msg = """
init_msg = """
================================
== ==
== DFU Server ==
Expand Down Expand Up @@ -81,7 +81,7 @@ def main():

options, args = parser.parse_args()

except (Exception , e):
except Exception as e:
print(e)
print("For help use --help")
sys.exit(2)
Expand All @@ -108,7 +108,7 @@ def main():
#print(options.zipfile)
try:
hexfile, datfile = unpacker.unpack_zipfile(options.zipfile)
except (Exception, e):
except Exception as e:
print("ERR")
print(e)
pass
Expand Down Expand Up @@ -137,6 +137,7 @@ def main():
else:
ble_dfu = BleDfuControllerLegacy(options.address.upper(), hexfile, datfile)


# Initialize inputs
ble_dfu.input_setup()

Expand All @@ -147,21 +148,26 @@ def main():
success = ble_dfu.switch_to_dfu_mode()
if not success:
print("Couldn't reconnect")
else:
ble_dfu.start()
else:
print("Already in DFU mode")
ble_dfu.start()
else:
print("Couldn't connect, will try DFU MAC")
# The device might already be in DFU mode (MAC + 1)
ble_dfu.target_mac_increase(1)

# Try connection with new address
print("Couldn't connect, will try DFU MAC")
if not ble_dfu.scan_and_connect():
if ble_dfu.scan_and_connect():
ble_dfu.start()
else:
raise Exception("Can't connect to device")

ble_dfu.start()

# Disconnect from peer device if not done already and clean up.
ble_dfu.disconnect()

except (Exception, e):
except Exception as e:
# print(traceback.format_exc())
print("Exception at line {}: {}".format(sys.exc_info()[2].tb_lineno, e))
pass
Expand All @@ -171,7 +177,7 @@ def main():

# If Unpacker for zipfile used then delete Unpacker
if unpacker != None:
unpacker.delete()
unpacker.delete()

print("DFU Server done")

Expand Down
8 changes: 8 additions & 0 deletions manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"manifest": {
"application": {
"bin_file": "ble_app_vsens_pca10100_s113.bin",
"dat_file": "ble_app_vsens_pca10100_s113.dat"
}
}
}
55 changes: 39 additions & 16 deletions nrf_ble_dfu_controller.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import pexpect
import re
import zlib

from abc import ABCMeta, abstractmethod
from array import array
from util import *
from scan import Scan

verbose = False

Expand Down Expand Up @@ -60,9 +62,7 @@ def __init__(self, target_mac, firmware_path, datfile_path):

self.firmware_path = firmware_path
self.datfile_path = datfile_path

self.ble_conn = pexpect.spawn("sudo hcitool lescan")
self.ble_conn = pexpect.spawn("gatttool -b '%s' --interactive" % target_mac)
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % target_mac)
self.ble_conn.delaybeforesend = 0

# --------------------------------------------------------------------------
Expand Down Expand Up @@ -105,7 +105,6 @@ def input_setup(self):

self.image_size = len(self.bin_array)
print("Binary imge size: %d" % self.image_size)
print("Binary CRC32: %d" % crc32_unsigned(array_to_hex_string(self.bin_array)))

return

Expand All @@ -122,21 +121,26 @@ def input_setup(self):
# Perform a scan and connect via gatttool.
# Will return True if a connection was established, False otherwise
# --------------------------------------------------------------------------
def scan_and_connect(self, timeout=2):
def scan_and_connect(self, timeout=30):
if verbose: print("scan_and_connect")

self.scan_obj = Scan(None)
scan_list = self.scan_obj.scan()
print("scan_list:",scan_list)
print("Connecting to %s" % (self.target_mac))

try:
self.ble_conn.expect('\[LE\]>', timeout=timeout)
except (pexpect.TIMEOUT, e):
except pexpect.TIMEOUT as e:
print("scan error:",e)
return False

self.ble_conn.sendline('connect')

try:
res = self.ble_conn.expect('.*Connection successful.*', timeout=timeout)
except (pexpect.TIMEOUT, e):
print("Connection successfull")
except pexpect.TIMEOUT as e:
return False

return True
Expand All @@ -153,8 +157,7 @@ def target_mac_increase(self, inc):

# Re-start gatttool with the new address
self.disconnect()
self.ble_conn = pexpect.spawn("sudo hcitool lescan")
self.ble_conn = pexpect.spawn("gatttool -b '%s' --interactive" % self.target_mac)
self.ble_conn = pexpect.spawn("gatttool -b '%s' -t random --interactive" % self.target_mac)
self.ble_conn.delaybeforesend = 0

# --------------------------------------------------------------------------
Expand All @@ -167,10 +170,11 @@ def _get_handles(self, uuid):
self.ble_conn.sendline('characteristics')

try:
self.ble_conn.expect([uuid], timeout=2)
handles = re.findall('.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before)
self.ble_conn.expect([uuid], timeout=10)
handles = re.findall('.*handle: (0x....),.*char value handle: (0x....)', self.ble_conn.before.decode('utf-8'))
(handle, value_handle) = handles[-1]
except (pexpect.TIMEOUT, e):

except pexpect.TIMEOUT as e:
raise Exception("UUID not found: {}".format(uuid))

return (int(handle, 16), int(value_handle, 16), int(value_handle, 16)+1)
Expand Down Expand Up @@ -202,7 +206,7 @@ def _dfu_wait_for_notify(self):
# continue to wait.
#
self.ble_conn.sendline('')
string = self.ble_conn.before
string = self.ble_conn.before.decode('utf-8')
if '[ ]' in string:
print('Connection lost! ')
raise Exception('Connection Lost')
Expand All @@ -211,7 +215,7 @@ def _dfu_wait_for_notify(self):
if index == 0:
after = self.ble_conn.after
hxstr = after.split()[3:]
handle = long(float.fromhex(hxstr[0]))
handle = int(float.fromhex(hxstr[0].decode('utf-8')))
return hxstr[2:]

else:
Expand All @@ -234,7 +238,8 @@ def _dfu_send_command(self, procedure, params=[]):
# Verify that command was successfully written
try:
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
except (pexpect.TIMEOUT, e):
except pexpect.TIMEOUT as e:

print("State timeout")

# --------------------------------------------------------------------------
Expand Down Expand Up @@ -264,5 +269,23 @@ def _enable_notifications(self, cccd_handle):
# Verify that command was successfully written
try:
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
except (pexpect.TIMEOUT, e):
except pexpect.TIMEOUT as e:
print("State timeout")

# --------------------------------------------------------------------------
# Enable notifications from the Control Point Handle
# --------------------------------------------------------------------------
def _enable_indications(self, cccd_handle):
if verbose: print('_enable_notifications')

cmd = 'char-write-req 0x%04x %s' % (cccd_handle, '0200')

if verbose: print(cmd)

self.ble_conn.sendline(cmd)

# Verify that command was successfully written
try:
res = self.ble_conn.expect('Characteristic value was written successfully.*', timeout=10)
except pexpect.TIMEOUT as e:
print("State timeout")
20 changes: 16 additions & 4 deletions scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import pexpect
import signal
import sys
import time

#------------------------------------------------------------------------------
# Bluetooth LE scan for advertising peripheral devices
Expand All @@ -21,10 +23,16 @@ def __init__( self, advert_name ):
def scan( self ):

try:
self.hcitool = pexpect.spawn('hcitool lescan')
self.hcitool = pexpect.spawn('hciconfig hci0 down')
self.hcitool = pexpect.spawn('hciconfig hci0 up')
self.hcitool = pexpect.spawn('hcitool lescan')
#self.hcitool.logfile = sys.stdout
index = self.hcitool.expect(['LE Scan ...'], 1)
index = self.hcitool.expect(['LE Scan ...'])
time.sleep(10)
print("scan after:",self.hcitool.after)

except pexpect.EOF:

self.hcitool.terminate(force=True)
return []
except Exception as err:
Expand All @@ -38,10 +46,13 @@ def scan( self ):
return []

list = []
for dummy in range(0, 2):
for dummy in range(0, 4):
#print("for loop:",self.hcitool.readline())
try:
list.append(self.hcitool.readline())
#print("list:",list)
except pexpect.TIMEOUT:
return list
break

if list == []:
Expand All @@ -60,7 +71,7 @@ def scan( self ):

# Strip newline from items in list
list = [item.strip() for item in list]
list = list[0:2]
#list = list

# Close pexpect (release device for subsequent use)
self.hcitool.terminate(force=True)
Expand Down Expand Up @@ -91,6 +102,7 @@ def scan(self):
print("scan: pexpect.TIMEOUT")
pass
except Exception as e:
print(e)
print("scan: exception: {0} ".format(sys.exc_info()[0]))
pass

Expand Down
Loading