Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions bootloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,18 @@ class Bootloader:
ih: intelhex.IntelHex
board: boards.Board
timeout: int
ui_callback: Callable

def __init__(
self,
bus: can.Bus,
board: boards.Board,
ui_callback: Callable,
ih: intelhex.IntelHex = None,
timeout: int = 5,
) -> None:
self.bus: can.Bus = bus
self.ih: intelhex.IntelHex = ih
self.board: boards.Board = board
self.timeout: int = timeout
self.ui_callback: Callable = ui_callback

def goto_bootloader(self) -> bool:
"""
Expand Down Expand Up @@ -130,7 +127,7 @@ def _validator(msg: can.Message) -> bool:
self._await_can_msg(validator=_validator, timeout=self.timeout) is not None
)

def erase_sectors(self, sectors) -> bool:
def erase_sectors(self, sectors, ui_callback: Optional[Callable] = None) -> bool:
"""
Erase specific sectors of FLASH, sets all bytes to 0xFF. FLASH memory must first be
erased before it can be programmed.
Expand All @@ -154,8 +151,8 @@ def _validator(msg: can.Message):
erase_progress = 0

for sector in sectors:
if self.ui_callback:
self.ui_callback("Erasing FLASH sectors", erase_size, erase_progress)
if ui_callback is not None:
ui_callback("Erasing FLASH sectors", erase_size, erase_progress)

if sector.write_protect:
raise RuntimeError("Attempted to write to a readonly memory sector!")
Expand All @@ -173,12 +170,12 @@ def _validator(msg: can.Message):

erase_progress += sector.size

if self.ui_callback:
self.ui_callback("Erasing FLASH sectors", erase_size, erase_size)
if ui_callback is not None:
ui_callback("Erasing FLASH sectors", erase_size, erase_size)

return True

def program(self) -> None:
def program(self, ui_callback: Optional[Callable] = None) -> None:
"""
Program the binary into flash. There is no CAN handshake here to reduce
latency during programming. Also, the bootloader will verify the app's code is valid
Expand All @@ -188,8 +185,8 @@ def program(self) -> None:
for i, address in enumerate(
range(self.ih.minaddr(), self.ih.minaddr() + self.size_bytes(), 8)
):
if self.ui_callback and i % 128 == 0:
self.ui_callback("Programming data", self.size_bytes(), i * 8)
if ui_callback is not None and i % 128 == 0:
ui_callback("Programming data", self.size_bytes(), i * 8)

data = [self.ih[address + i] for i in range(0, 8)]

Expand All @@ -207,8 +204,8 @@ def program(self) -> None:
success = True
except can.interfaces.vector.exceptions.VectorOperationError:
pass
if self.ui_callback:
self.ui_callback("Programming data", self.size_bytes(), self.size_bytes())
if ui_callback is not None:
ui_callback("Programming data", self.size_bytes(), self.size_bytes())

def status(self) -> Optional[int]:
"""
Expand Down Expand Up @@ -247,7 +244,7 @@ def _validator(msg: can.Message):

return rx_msg.data[0]

def update(self) -> None:
def update(self, ui_callback: Optional[Callable] = None) -> None:
"""
Run the update procedure for this bootloader.

Expand Down Expand Up @@ -284,7 +281,8 @@ def _intersect(a_min, a_max, b_min, b_max):
self.program()
time.sleep(0.5)

self.ui_callback("Verifying programming", self.size_bytes(), 0)
if ui_callback is not None:
ui_callback("Verifying programming", self.size_bytes(), 0)
boot_status = self.status()
if boot_status is not None:
if boot_status != BOOT_STATUS_APP_VALID:
Expand All @@ -296,10 +294,11 @@ def _intersect(a_min, a_max, b_min, b_max):
f"Bootloader for {self.board.name} did not respond to command to verify application integrity."
)

self.ui_callback("Verifying programming", self.size_bytes(), self.size_bytes())
if ui_callback is not None:
ui_callback("Verifying programming", self.size_bytes(), self.size_bytes())
time.sleep(0.5)

def erase(self) -> None:
def erase(self, ui_callback: Optional[Callable] = None) -> None:
"""
Erase this bootloader's application.

Expand All @@ -322,7 +321,8 @@ def erase(self) -> None:
)
time.sleep(0.5)

self.ui_callback("Verifying erase", erase_size, 0)
if ui_callback is not None:
ui_callback("Verifying erase", erase_size, 0)
boot_status = self.status()
if boot_status is not None:
if boot_status != BOOT_STATUS_NO_APP:
Expand All @@ -333,8 +333,8 @@ def erase(self) -> None:
raise RuntimeError(
f"Bootloader for {self.board.name} did not respond to command to erase flash."
)

self.ui_callback("Verifying erase", erase_size, erase_size)
if ui_callback is not None:
ui_callback("Verifying erase", erase_size, erase_size)
time.sleep(0.5)

def _await_can_msg(
Expand Down
96 changes: 45 additions & 51 deletions update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import argparse
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List

import can
Expand All @@ -27,7 +28,7 @@
)


def all_goto_bootloader(live: Live, bootloaders: List[bootloader.Bootloader]):
def all_goto_bootloader(bootloaders: List[bootloader.Bootloader], live: Live):
live.console.log("Putting all boards into bootloader mode")
# first put everybody into bootloader mode
bootload_task = progress.add_task("Jump to Bootloader")
Expand All @@ -46,7 +47,7 @@ def all_goto_bootloader(live: Live, bootloaders: List[bootloader.Bootloader]):
live.console.log(f"[bold green]All boards pushed into bootloader mode successfully")


def all_goto_app(live: Live, bootloaders: List[bootloader.Bootloader]):
def all_goto_app(bootloaders: List[bootloader.Bootloader], live: Live):
live.console.log("Pushing all boards out of bootloader mode")
app_task = progress.add_task("Jump to App")
for b_idx, bootload_board in enumerate(bootloaders):
Expand All @@ -66,20 +67,28 @@ def all_goto_app(live: Live, bootloaders: List[bootloader.Bootloader]):
)


def update_board(bootload_board: bootloader.Bootloader, live: Live):
steps_task = progress.add_task(
f"Updating board [blue bold]{bootload_board.board.name}"
)
bootload_board.update(
ui_callback=lambda description, total, completed: progress.update(
task_id=steps_task,
total=total,
description=description,
completed=completed,
)
)
live.console.log(f"[green]{bootload_board.board.name} updated successfully")
progress.remove_task(steps_task)


def update(configs: List[boards.Board], build_dir: str) -> None:
"""Update and handle UI."""
num_boards = len(configs)
steps_task = progress.add_task("Steps")
bootloaders: List[bootloader.Bootloader] = [
bootloader.Bootloader(
bus=bus,
board=board,
ui_callback=lambda description, total, completed: progress.update(
task_id=steps_task,
total=total,
description=description,
completed=completed,
),
ih=intelhex.IntelHex(os.path.join(build_dir, board.path)),
)
for board in configs
Expand All @@ -88,69 +97,54 @@ def update(configs: List[boards.Board], build_dir: str) -> None:
# push all boards into bootloader
with Live(Group(status, progress), transient=True) as live:
# push all boards into bootloader
all_goto_bootloader(live, bootloaders)
all_goto_bootloader(bootloaders, live)
live.console.log(
f"Updating firmware for boards: [blue bold]{', '.join(board.name for board in configs)}"
)
for b_idx, bootload_board in enumerate(bootloaders):
# TODO do this in parallel
progress.update(
task_id=steps_task,
total=0,
completed=0,
description=f"Starting update for {bootload_board.board.name}",
)
status.update(
f"Updating board [yellow]{b_idx + 1}/{num_boards}[/]: [blue bold]{bootload_board.board.name}"
)
bootload_board.update()
live.console.log(f"[green]{bootload_board.board.name} updated successfully")
progress.remove_task(steps_task)
with ThreadPoolExecutor(max_workers=(len(configs))) as executor:
executor.map(update_board, [(b, live) for b in bootloaders])
live.console.log(
f"[bold green]Firmware update successfully ({num_boards} board{'s' if num_boards > 1 else ''} updated)"
f"[bold green]Firmware update successfully ({len(configs)} board{'s' if len(configs) > 1 else ''} updated)"
)
# push all boards out of bootloader
all_goto_app(live, bootloaders)
all_goto_app(bootloaders, live)


def erase_board(bootloader_board: bootloader.Bootloader, live: Live):
steps_task = progress.add_task(
f"Erasing board [blue bold]{bootloader_board.board.name}"
)
bootloader_board.erase(
ui_callback=lambda description, total, completed: progress.update(
task_id=steps_task,
total=total,
description=description,
completed=completed,
)
)
live.console.log(f"[green]{bootloader_board.board.name} erased successfully")
progress.remove_task(steps_task)


def erase(configs: List[boards.Board]) -> None:
"""Erase and handle UI."""
# push all boards into bootloader
num_boards = len(configs)
steps_task = progress.add_task("Steps")
bootloaders = [
bootloader.Bootloader(
bus=bus,
board=board,
ui_callback=lambda description, total, completed: progress.update(
task_id=steps_task,
total=total,
description=description,
completed=completed,
),
)
for board in configs
]

with Live(Group(status, progress), transient=True) as live:
all_goto_bootloader(live, bootloaders)

all_goto_bootloader(bootloaders, live)
live.console.log(
f"Erasing with config: [blue bold]{', '.join(board.name for board in configs)}"
)
for b_idx, bootloader_board in enumerate(bootloaders):
# TODO do this in parallel
status.update(f"Sending board {bootloader_board.board.name} to bootloader")
status.update(
f"Erasing board [yellow]{b_idx + 1}/{num_boards}[/]: [blue bold]{bootloader_board.board.name}"
)
bootloader_board.erase()
live.console.log(
f"[green]{bootloader_board.board.name} erased successfully"
)
progress.remove_task(steps_task)
with ThreadPoolExecutor(max_workers=(len(configs))) as executor:
executor.map(update_board, [(b, live) for b in bootloaders])
live.console.log(
f"[bold green]Erase successful ({num_boards} board{'s' if num_boards > 1 else ''} erased)"
f"[bold green]Erase successful ({len(configs)} board{'s' if len(configs) > 1 else ''} erased)"
)


Expand Down Expand Up @@ -187,7 +181,7 @@ def erase(configs: List[boards.Board]) -> None:
for board in boards.CONFIGS[config_name.strip()]
}
)
with can.interface.Bus(
with can.ThreadSafeBus(
interface=args.bus, channel=args.channel, bitrate=args.bit_rate
) as bus:
if args.erase:
Expand Down