runners: canopen: poll for flash ready

Poll the flash status instead of just reading the flash status once. Add
support for controlling the number of SDO retries and the SDO timeouts.

These changes allows for greater control of the CANopen program
download, which is especially useful on noisy or congested CAN networks
and on devices with slower flash access.

Fixes: #39409

Signed-off-by: Klaus H. Sorensen <khso@vestas.com>
Signed-off-by: Henrik Brix Andersen <hebad@vestas.com>
This commit is contained in:
Henrik Brix Andersen 2021-10-04 15:06:10 +02:00 committed by Carles Cufí
parent 9e97b3b0f0
commit 10fb5c203b
2 changed files with 75 additions and 23 deletions

View file

@ -6,6 +6,7 @@
import argparse
import os
import time
from runners.core import ZephyrBinaryRunner, RunnerCaps
@ -19,6 +20,14 @@ except ImportError:
# Default Python-CAN context to use, see python-can documentation for details
DEFAULT_CAN_CONTEXT = 'default'
# Default program number
DEFAULT_PROGRAM_NUMBER = 1
# Default timeouts and retries
DEFAULT_TIMEOUT = 10.0 # seconds
DEFAULT_SDO_TIMEOUT = 0.3 # seconds
DEFAULT_SDO_RETRIES = 1
# Object dictionary indexes
H1F50_PROGRAM_DATA = 0x1F50
H1F51_PROGRAM_CTRL = 0x1F51
@ -40,8 +49,9 @@ class ToggleAction(argparse.Action):
class CANopenBinaryRunner(ZephyrBinaryRunner):
'''Runner front-end for CANopen.'''
def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT,
program_number=1, confirm=True,
confirm_only=True, timeout=10):
program_number=DEFAULT_PROGRAM_NUMBER, confirm=True,
confirm_only=True, timeout=DEFAULT_TIMEOUT,
sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT):
if MISSING_REQUIREMENTS:
raise RuntimeError('one or more Python dependencies were missing; '
"see the getting started guide for details on "
@ -56,7 +66,9 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
self.downloader = CANopenProgramDownloader(logger=self.logger,
node_id=dev_id,
can_context=can_context,
program_number=program_number)
program_number=program_number,
sdo_retries=sdo_retries,
sdo_timeout=sdo_timeout)
@classmethod
def name(cls):
@ -77,17 +89,22 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
help=cls.dev_id_help())
parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
help='Custom Python-CAN context to use')
parser.add_argument('--program-number', default=1,
help='program number, default is 1')
help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})')
parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER,
help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})')
parser.add_argument('--confirm', '--no-confirm',
dest='confirm', nargs=0,
action=ToggleAction,
help='confirm after starting? (default: yes)')
parser.add_argument('--confirm-only', default=False, action='store_true',
help='confirm only, no program download (default: no)')
parser.add_argument('--timeout', default=10,
help='boot-up timeout, default is 10 seconds')
parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})')
parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES,
help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})')
parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT,
help=f'''CANopen SDO response timeout in seconds
(default: {DEFAULT_SDO_TIMEOUT})''')
parser.set_defaults(confirm=True)
@ -95,10 +112,12 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
def do_create(cls, cfg, args):
return CANopenBinaryRunner(cfg, int(args.dev_id),
can_context=args.can_context,
program_number=int(args.program_number),
program_number=args.program_number,
confirm=args.confirm,
confirm_only=args.confirm_only,
timeout=int(args.timeout))
timeout=args.timeout,
sdo_retries=args.sdo_retries,
sdo_timeout=args.sdo_timeout)
def do_run(self, command, **kwargs):
if not self.dev_id:
@ -115,7 +134,7 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
self.downloader.program_number)
self.downloader.connect()
status = self.downloader.flash_status()
status = self.downloader.wait_for_flash_status_ok(self.timeout)
if status == 0:
self.downloader.swid()
else:
@ -134,13 +153,15 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
self.downloader.stop_program()
self.downloader.clear_program()
self.downloader.wait_for_flash_status_ok(self.timeout)
self.downloader.download(self.bin_file)
status = self.downloader.flash_status()
status = self.downloader.wait_for_flash_status_ok(self.timeout)
if status != 0:
raise ValueError('Program download failed: '
'flash status 0x{:02x}'.format(status))
self.downloader.swid()
self.downloader.start_program()
self.downloader.wait_for_bootup(self.timeout)
self.downloader.swid()
@ -154,7 +175,8 @@ class CANopenBinaryRunner(ZephyrBinaryRunner):
class CANopenProgramDownloader(object):
'''CANopen program downloader'''
def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
program_number=1):
program_number=DEFAULT_PROGRAM_NUMBER,
sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT):
super(CANopenProgramDownloader, self).__init__()
self.logger = logger
self.node_id = node_id
@ -168,6 +190,9 @@ class CANopenProgramDownloader(object):
self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
self.node.sdo.MAX_RETRIES = sdo_retries
self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout
def connect(self):
'''Connect to CAN network'''
try:
@ -246,13 +271,14 @@ class CANopenProgramDownloader(object):
break
outfile.write(chunk)
progress.next(n=len(chunk))
except:
raise ValueError('Failed to download program')
finally:
progress.finish()
infile.close()
outfile.close()
except:
raise ValueError('Failed to download program')
def wait_for_bootup(self, timeout=10):
def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT):
'''Wait for boot-up message reception'''
self.logger.info('Waiting for boot-up message...')
try:
@ -260,6 +286,21 @@ class CANopenProgramDownloader(object):
except:
raise ValueError('Timeout waiting for boot-up message')
def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT):
'''Wait for flash status ok'''
self.logger.info('Waiting for flash status ok')
end_time = time.time() + timeout
while True:
now = time.time()
status = self.flash_status()
if status == 0:
break
if now > end_time:
return status
return status
@staticmethod
def create_object_dictionary():
'''Create a synthetic CANopen object dictionary for program download'''

View file

@ -22,13 +22,15 @@ TEST_ALT_CONTEXT = 'alternate'
# Test cases
#
TEST_CASES = [(n, x, p, c, o, t)
TEST_CASES = [(n, x, p, c, o, t, r, s)
for n in range(1, 3)
for x in (None, TEST_ALT_CONTEXT)
for p in range(1, 3)
for c in (False, True)
for o in (False, True)
for t in range(1, 3)]
for t in range(1, 3)
for r in range(1, 3)
for s in range(1, 3)]
os_path_isfile = os.path.isfile
@ -41,7 +43,7 @@ def os_path_isfile_patch(filename):
@patch('runners.canopen_program.CANopenProgramDownloader')
def test_canopen_program_create(cpd, test_case, runner_config):
'''Test CANopen runner created from command line parameters.'''
node_id, context, program_number, confirm, confirm_only, timeout = test_case
node_id, context, program_number, confirm, confirm_only, timeout, sdo_retries, sdo_timeout = test_case
args = ['--node-id', str(node_id)]
if context is not None:
@ -54,9 +56,14 @@ def test_canopen_program_create(cpd, test_case, runner_config):
args.append('--confirm-only')
if timeout:
args.extend(['--timeout', str(timeout)])
if sdo_retries:
args.extend(['--sdo-retries', str(sdo_retries)])
if sdo_timeout:
args.extend(['--sdo-timeout', str(sdo_timeout)])
mock = cpd.return_value
mock.flash_status.return_value = 0
mock.wait_for_flash_status_ok.return_value = 0
mock.swid.return_value = 0
parser = argparse.ArgumentParser()
@ -71,17 +78,21 @@ def test_canopen_program_create(cpd, test_case, runner_config):
assert cpd.call_args == call(node_id=node_id,
can_context=context,
logger=runner.logger,
program_number=program_number)
program_number=program_number,
sdo_retries=sdo_retries,
sdo_timeout=sdo_timeout)
else:
assert cpd.call_args == call(node_id=node_id,
can_context=TEST_DEF_CONTEXT,
logger=runner.logger,
program_number=program_number)
program_number=program_number,
sdo_retries=sdo_retries,
sdo_timeout=sdo_timeout)
mock.connect.assert_called_once()
if confirm_only:
mock.flash_status.assert_called_once()
mock.wait_for_flash_status_ok.assert_called_with(timeout)
mock.swid.assert_called_once()
mock.enter_pre_operational.assert_called_once()
mock.zephyr_confirm_program.assert_called_once()
@ -92,7 +103,7 @@ def test_canopen_program_create(cpd, test_case, runner_config):
mock.wait_for_bootup.assert_not_called()
else:
mock.enter_pre_operational.assert_called()
mock.flash_status.assert_called()
mock.wait_for_flash_status_ok.assert_called_with(timeout)
mock.swid.assert_called()
mock.stop_program.assert_called_once()
mock.clear_program.assert_called_once()