zephyr/scripts/west_commands/runners/canopen_program.py
Henrik Brix Andersen d151424a6d scripts: west: runners: canopen: add SDO block transfer support
Add SDO block transfer support for the CANopen program download west
runner. SDO block transfers are experimental in the underlying CANopen for
Python.

Reduce the program download chunk size to half the program download buffer
size to avoid blocking IO.

Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
2023-01-11 09:39:57 +01:00

349 lines
14 KiB
Python

# Copyright (c) 2020 Vestas Wind Systems A/S
#
# SPDX-License-Identifier: Apache-2.0
'''Runner for performing program download over CANopen (DSP 302-3).'''
import argparse
import os
import time
from runners.core import ZephyrBinaryRunner, RunnerCaps
try:
import canopen
from progress.bar import Bar
MISSING_REQUIREMENTS = False
except ImportError:
MISSING_REQUIREMENTS = True
# Default Python-CAN context to use, see python-can documentation for details
DEFAULT_CAN_CONTEXT = 'default'
# Default program number
DEFAULT_PROGRAM_NUMBER = 1
# Program download buffer size in bytes
PROGRAM_DOWNLOAD_BUFFER_SIZE = 1024
# Program download chunk size in bytes
PROGRAM_DOWNLOAD_CHUNK_SIZE = PROGRAM_DOWNLOAD_BUFFER_SIZE // 2
# Default timeouts and retries
DEFAULT_TIMEOUT = 10.0 # seconds
DEFAULT_SDO_TIMEOUT = 0.3 # seconds
DEFAULT_SDO_RETRIES = 1
# Object dictionary indexes
H1F50_PROGRAM_DATA = 0x1F50
H1F51_PROGRAM_CTRL = 0x1F51
H1F56_PROGRAM_SWID = 0x1F56
H1F57_FLASH_STATUS = 0x1F57
# Program control commands
PROGRAM_CTRL_STOP = 0x00
PROGRAM_CTRL_START = 0x01
PROGRAM_CTRL_RESET = 0x02
PROGRAM_CTRL_CLEAR = 0x03
PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
class ToggleAction(argparse.Action):
'''Toggle argument parser'''
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, not option_string.startswith('--no-'))
class CANopenBinaryRunner(ZephyrBinaryRunner):
'''Runner front-end for CANopen.'''
def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT,
program_number=DEFAULT_PROGRAM_NUMBER, confirm=True,
confirm_only=True, timeout=DEFAULT_TIMEOUT,
sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
block_transfer=False):
if MISSING_REQUIREMENTS:
raise RuntimeError('one or more Python dependencies were missing; '
"see the getting started guide for details on "
"how to fix")
super().__init__(cfg)
self.dev_id = dev_id # Only use for error checking in do_run()
self.bin_file = cfg.bin_file
self.confirm = confirm
self.confirm_only = confirm_only
self.timeout = timeout
self.downloader = CANopenProgramDownloader(logger=self.logger,
node_id=dev_id,
can_context=can_context,
program_number=program_number,
sdo_retries=sdo_retries,
sdo_timeout=sdo_timeout,
block_transfer=block_transfer)
@classmethod
def name(cls):
return 'canopen'
@classmethod
def capabilities(cls):
return RunnerCaps(commands={'flash'}, dev_id=True, flash_addr=False)
@classmethod
def dev_id_help(cls) -> str:
return 'CANopen Node ID.'
@classmethod
def do_add_parser(cls, parser):
# Optional:
parser.add_argument('--node-id', dest='dev_id',
help=cls.dev_id_help())
parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})')
parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER,
help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})')
parser.add_argument('--confirm', '--no-confirm',
dest='confirm', nargs=0,
action=ToggleAction,
help='confirm after starting? (default: yes)')
parser.add_argument('--confirm-only', default=False, action='store_true',
help='confirm only, no program download (default: no)')
parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})')
parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES,
help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})')
parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT,
help=f'''CANopen SDO response timeout in seconds
(default: {DEFAULT_SDO_TIMEOUT})''')
parser.add_argument('--block-transfer', default=False, action='store_true',
help='Use SDO block transfers (experimental, default: no)')
parser.set_defaults(confirm=True)
@classmethod
def do_create(cls, cfg, args):
return CANopenBinaryRunner(cfg, int(args.dev_id),
can_context=args.can_context,
program_number=args.program_number,
confirm=args.confirm,
confirm_only=args.confirm_only,
timeout=args.timeout,
sdo_retries=args.sdo_retries,
sdo_timeout=args.sdo_timeout,
block_transfer=args.block_transfer)
def do_run(self, command, **kwargs):
if not self.dev_id:
raise RuntimeError('Please specify a CANopen node ID with the '
'-i/--dev-id or --node-id command-line switch.')
if command == 'flash':
self.flash(**kwargs)
def flash(self, **kwargs):
'''Download program to flash over CANopen'''
self.ensure_output('bin')
self.logger.info('Using Node ID %d, program number %d',
self.downloader.node_id,
self.downloader.program_number)
self.downloader.connect()
status = self.downloader.wait_for_flash_status_ok(self.timeout)
if status == 0:
self.downloader.swid()
else:
self.logger.warning('Flash status 0x{:02x}, '
'skipping software identification'.format(status))
self.downloader.enter_pre_operational()
if self.confirm_only:
self.downloader.zephyr_confirm_program()
self.downloader.disconnect()
return
if self.bin_file is None:
raise ValueError('Cannot download program; bin_file is missing')
self.downloader.stop_program()
self.downloader.clear_program()
self.downloader.wait_for_flash_status_ok(self.timeout)
self.downloader.download(self.bin_file)
status = self.downloader.wait_for_flash_status_ok(self.timeout)
if status != 0:
raise ValueError('Program download failed: '
'flash status 0x{:02x}'.format(status))
self.downloader.swid()
self.downloader.start_program()
self.downloader.wait_for_bootup(self.timeout)
self.downloader.swid()
if self.confirm:
self.downloader.enter_pre_operational()
self.downloader.zephyr_confirm_program()
self.downloader.disconnect()
class CANopenProgramDownloader(object):
'''CANopen program downloader'''
def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
program_number=DEFAULT_PROGRAM_NUMBER,
sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
block_transfer=False):
super(CANopenProgramDownloader, self).__init__()
self.logger = logger
self.node_id = node_id
self.can_context = can_context
self.program_number = program_number
self.network = canopen.Network()
self.node = self.network.add_node(self.node_id,
self.create_object_dictionary())
self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
self.node.sdo.MAX_RETRIES = sdo_retries
self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout
self.block_transfer = block_transfer
def connect(self):
'''Connect to CAN network'''
try:
self.network.connect(context=self.can_context)
except:
raise ValueError('Unable to connect to CAN network')
def disconnect(self):
'''Disconnect from CAN network'''
self.network.disconnect()
def enter_pre_operational(self):
'''Enter pre-operational NMT state'''
self.logger.info("Entering pre-operational mode")
try:
self.node.nmt.state = 'PRE-OPERATIONAL'
except:
raise ValueError('Failed to enter pre-operational mode')
def _ctrl_program(self, cmd):
'''Write program control command to CANopen object dictionary (0x1f51)'''
try:
self.ctrl_sdo.raw = cmd
except:
raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
def stop_program(self):
'''Write stop control command to CANopen object dictionary (0x1f51)'''
self.logger.info('Stopping program')
self._ctrl_program(PROGRAM_CTRL_STOP)
def start_program(self):
'''Write start control command to CANopen object dictionary (0x1f51)'''
self.logger.info('Starting program')
self._ctrl_program(PROGRAM_CTRL_START)
def clear_program(self):
'''Write clear control command to CANopen object dictionary (0x1f51)'''
self.logger.info('Clearing program')
self._ctrl_program(PROGRAM_CTRL_CLEAR)
def zephyr_confirm_program(self):
'''Write confirm control command to CANopen object dictionary (0x1f51)'''
self.logger.info('Confirming program')
self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
def swid(self):
'''Read software identification from CANopen object dictionary (0x1f56)'''
try:
swid = self.swid_sdo.raw
except:
raise ValueError('Failed to read software identification')
self.logger.info('Program software identification: 0x{:08x}'.format(swid))
return swid
def flash_status(self):
'''Read flash status identification'''
try:
status = self.flash_sdo.raw
except:
raise ValueError('Failed to read flash status identification')
return status
def download(self, bin_file):
'''Download program to CANopen object dictionary (0x1f50)'''
self.logger.info('Downloading program: %s', bin_file)
try:
size = os.path.getsize(bin_file)
infile = open(bin_file, 'rb')
outfile = self.data_sdo.open('wb', buffering=PROGRAM_DOWNLOAD_BUFFER_SIZE,
size=size, block_transfer=self.block_transfer)
progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
while True:
chunk = infile.read(PROGRAM_DOWNLOAD_CHUNK_SIZE)
if not chunk:
break
outfile.write(chunk)
progress.next(n=len(chunk))
except:
raise ValueError('Failed to download program')
finally:
progress.finish()
infile.close()
outfile.close()
def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT):
'''Wait for boot-up message reception'''
self.logger.info('Waiting for boot-up message...')
try:
self.node.nmt.wait_for_bootup(timeout=timeout)
except:
raise ValueError('Timeout waiting for boot-up message')
def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT):
'''Wait for flash status ok'''
self.logger.info('Waiting for flash status ok')
end_time = time.time() + timeout
while True:
now = time.time()
status = self.flash_status()
if status == 0:
break
if now > end_time:
return status
return status
@staticmethod
def create_object_dictionary():
'''Create a synthetic CANopen object dictionary for program download'''
objdict = canopen.objectdictionary.ObjectDictionary()
array = canopen.objectdictionary.Array('Program data', 0x1f50)
member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
member.data_type = canopen.objectdictionary.DOMAIN
array.add_member(member)
objdict.add_object(array)
array = canopen.objectdictionary.Array('Program control', 0x1f51)
member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
member.data_type = canopen.objectdictionary.UNSIGNED8
array.add_member(member)
objdict.add_object(array)
array = canopen.objectdictionary.Array('Program software ID', 0x1f56)
member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
member.data_type = canopen.objectdictionary.UNSIGNED32
array.add_member(member)
objdict.add_object(array)
array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
member.data_type = canopen.objectdictionary.UNSIGNED32
array.add_member(member)
objdict.add_object(array)
return objdict