scripts: pylib: twister: twisterlib: Handlers refactor

Simple refactoring aiming to reduce the average method length.
Minor corrections of the handlers module, removing wholly unused
variables, etc.

Signed-off-by: Lukasz Mrugala <lukaszx.mrugala@intel.com>
This commit is contained in:
Lukasz Mrugala 2023-08-25 09:07:13 +02:00 committed by Anas Nashif
parent b961be53a0
commit 2d3d22d666

View file

@ -5,19 +5,20 @@
# Copyright 2022 NXP
# SPDX-License-Identifier: Apache-2.0
import csv
import logging
import math
import os
import sys
import csv
import time
import signal
import logging
import shlex
import subprocess
import threading
import select
import re
import psutil
import re
import select
import shlex
import signal
import subprocess
import sys
import threading
import time
from twisterlib.environment import ZEPHYR_BASE
from twisterlib.error import TwisterException
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
@ -223,10 +224,7 @@ class BinaryHandler(Handler):
except subprocess.TimeoutExpired:
self.terminate(proc)
def handle(self, harness):
robot_test = getattr(harness, "is_robot_test", False)
def _create_command(self, robot_test):
if robot_test:
command = [self.generator_cmd, "run_renode_test"]
elif self.call_make_run:
@ -236,7 +234,6 @@ class BinaryHandler(Handler):
else:
command = [self.binary]
run_valgrind = False
if self.options.enable_valgrind:
command = ["valgrind", "--error-exitcode=2",
"--leak-check=full",
@ -244,7 +241,6 @@ class BinaryHandler(Handler):
"--log-file=" + self.build_dir + "/valgrind.log",
"--track-origins=yes",
] + command
run_valgrind = True
# Only valid for native_posix
if self.seed is not None:
@ -252,12 +248,9 @@ class BinaryHandler(Handler):
if self.extra_test_args is not None:
command.extend(self.extra_test_args)
logger.debug("Spawning process: " +
" ".join(shlex.quote(word) for word in command) + os.linesep +
"in directory: " + self.build_dir)
start_time = time.time()
return command
def _create_env(self):
env = os.environ.copy()
if self.options.enable_asan:
env["ASAN_OPTIONS"] = "log_path=stdout:" + \
@ -269,6 +262,40 @@ class BinaryHandler(Handler):
env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \
env.get("UBSAN_OPTIONS", "")
return env
def _update_instance_info(self, harness_state, handler_time):
self.instance.execution_time = handler_time
if not self.terminated and self.returncode != 0:
self.instance.status = "failed"
if self.options.enable_valgrind and self.returncode == 2:
self.instance.reason = "Valgrind error"
else:
# When a process is killed, the default handler returns 128 + SIGTERM
# so in that case the return code itself is not meaningful
self.instance.reason = "Failed"
elif harness_state:
self.instance.status = harness_state
if harness_state == "failed":
self.instance.reason = "Failed"
else:
self.instance.status = "failed"
self.instance.reason = "Timeout"
self.instance.add_missing_case_status("blocked", "Timeout")
def handle(self, harness):
robot_test = getattr(harness, "is_robot_test", False)
command = self._create_command(robot_test)
logger.debug("Spawning process: " +
" ".join(shlex.quote(word) for word in command) + os.linesep +
"in directory: " + self.build_dir)
start_time = time.time()
env = self._create_env()
if robot_test:
harness.run_robot_test(command, self)
return
@ -297,23 +324,7 @@ class BinaryHandler(Handler):
if sys.stdout.isatty():
subprocess.call(["stty", "sane"], stdin=sys.stdout)
self.instance.execution_time = handler_time
if not self.terminated and self.returncode != 0:
self.instance.status = "failed"
if run_valgrind and self.returncode == 2:
self.instance.reason = "Valgrind error"
else:
# When a process is killed, the default handler returns 128 + SIGTERM
# so in that case the return code itself is not meaningful
self.instance.reason = "Failed"
elif harness.state:
self.instance.status = harness.state
if harness.state == "failed":
self.instance.reason = "Failed"
else:
self.instance.status = "failed"
self.instance.reason = "Timeout"
self.instance.add_missing_case_status("blocked", "Timeout")
self._update_instance_info(harness.state, handler_time)
self._final_handle_actions(harness, handler_time)
@ -333,6 +344,7 @@ class SimulationHandler(BinaryHandler):
self.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe")
self.ready = True
class DeviceHandler(Handler):
def __init__(self, instance, type_str):
@ -451,45 +463,7 @@ class DeviceHandler(Handler):
proc.communicate()
logger.error("{} timed out".format(script))
def get_hardware(self):
hardware = None
try:
hardware = self.device_is_available(self.instance)
while not hardware:
time.sleep(1)
hardware = self.device_is_available(self.instance)
except TwisterException as error:
self.instance.status = "failed"
self.instance.reason = str(error)
logger.error(self.instance.reason)
return hardware
def handle(self, harness):
runner = None
hardware = self.get_hardware()
if hardware:
self.instance.dut = hardware.id
if not hardware:
return
runner = hardware.runner or self.options.west_runner
serial_pty = hardware.serial_pty
ser_pty_process = None
if serial_pty:
master, slave = pty.openpty()
try:
ser_pty_process = subprocess.Popen(re.split(',| ', serial_pty), stdout=master, stdin=master, stderr=master)
except subprocess.CalledProcessError as error:
logger.error("Failed to run subprocess {}, error {}".format(serial_pty, error.output))
return
serial_device = os.ttyname(slave)
else:
serial_device = hardware.serial
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
def _create_command(self, runner, hardware):
if (self.options.west_flash is not None) or runner:
command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir]
command_extra_args = []
@ -516,44 +490,50 @@ class DeviceHandler(Handler):
command_extra_args.append(board_id)
elif runner == "openocd" and product == "STM32 STLink":
command_extra_args.append("--cmd-pre-init")
command_extra_args.append("hla_serial %s" % (board_id))
command_extra_args.append("hla_serial %s" % board_id)
elif runner == "openocd" and product == "STLINK-V3":
command_extra_args.append("--cmd-pre-init")
command_extra_args.append("hla_serial %s" % (board_id))
command_extra_args.append("hla_serial %s" % board_id)
elif runner == "openocd" and product == "EDBG CMSIS-DAP":
command_extra_args.append("--cmd-pre-init")
command_extra_args.append("cmsis_dap_serial %s" % (board_id))
command_extra_args.append("cmsis_dap_serial %s" % board_id)
elif runner == "jlink":
command.append("--tool-opt=-SelectEmuBySN %s" % (board_id))
command.append("--tool-opt=-SelectEmuBySN %s" % board_id)
elif runner == "stm32cubeprogrammer":
command.append("--tool-opt=sn=%s" % (board_id))
command.append("--tool-opt=sn=%s" % board_id)
# Receive parameters from runner_params field.
if hardware.runner_params:
for param in hardware.runner_params:
command.append(param)
if command_extra_args != []:
if command_extra_args:
command.append('--')
command.extend(command_extra_args)
else:
command = [self.generator_cmd, "-C", self.build_dir, "flash"]
pre_script = hardware.pre_script
post_flash_script = hardware.post_flash_script
post_script = hardware.post_script
return command
if pre_script:
self.run_custom_script(pre_script, 30)
def _update_instance_info(self, harness_state, handler_time, flash_error):
self.instance.execution_time = handler_time
if harness_state:
self.instance.status = harness_state
if harness_state == "failed":
self.instance.reason = "Failed"
elif not flash_error:
self.instance.status = "failed"
self.instance.reason = "Timeout"
flash_timeout = hardware.flash_timeout
if hardware.flash_with_test:
flash_timeout += self.timeout
if self.instance.status in ["error", "failed"]:
self.instance.add_missing_case_status("blocked", self.instance.reason)
def _create_serial_connection(self, serial_device, hardware_baud,
flash_timeout, serial_pty, ser_pty_process):
try:
ser = serial.Serial(
serial_device,
baudrate=hardware.baud,
baudrate=hardware_baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
@ -574,6 +554,86 @@ class DeviceHandler(Handler):
self.make_device_available(serial_pty)
else:
self.make_device_available(serial_device)
raise
return ser
def get_hardware(self):
hardware = None
try:
hardware = self.device_is_available(self.instance)
while not hardware:
time.sleep(1)
hardware = self.device_is_available(self.instance)
except TwisterException as error:
self.instance.status = "failed"
self.instance.reason = str(error)
logger.error(self.instance.reason)
return hardware
def _get_serial_device(self, serial_pty, hardware_serial):
ser_pty_process = None
if serial_pty:
master, slave = pty.openpty()
try:
ser_pty_process = subprocess.Popen(
re.split('[, ]', serial_pty),
stdout=master,
stdin=master,
stderr=master
)
except subprocess.CalledProcessError as error:
logger.error(
"Failed to run subprocess {}, error {}".format(
serial_pty,
error.output
)
)
return
serial_device = os.ttyname(slave)
else:
serial_device = hardware_serial
return serial_device, ser_pty_process
def handle(self, harness):
runner = None
hardware = self.get_hardware()
if hardware:
self.instance.dut = hardware.id
if not hardware:
return
runner = hardware.runner or self.options.west_runner
serial_pty = hardware.serial_pty
serial_device, ser_pty_process = self._get_serial_device(serial_pty, hardware.serial)
logger.debug(f"Using serial device {serial_device} @ {hardware.baud} baud")
command = self._create_command(runner, hardware)
pre_script = hardware.pre_script
post_flash_script = hardware.post_flash_script
post_script = hardware.post_script
if pre_script:
self.run_custom_script(pre_script, 30)
flash_timeout = hardware.flash_timeout
if hardware.flash_with_test:
flash_timeout += self.timeout
try:
ser = self._create_serial_connection(
serial_device,
hardware.baud,
flash_timeout,
serial_pty,
ser_pty_process
)
except serial.SerialException:
return
halt_monitor_evt = threading.Event()
@ -592,7 +652,7 @@ class DeviceHandler(Handler):
try:
(stdout, stderr) = proc.communicate(timeout=flash_timeout)
# ignore unencodable unicode chars
logger.debug(stdout.decode(errors = "ignore"))
logger.debug(stdout.decode(errors="ignore"))
if proc.returncode != 0:
self.instance.status = "error"
@ -645,17 +705,7 @@ class DeviceHandler(Handler):
handler_time = time.time() - start_time
self.instance.execution_time = handler_time
if harness.state:
self.instance.status = harness.state
if harness.state == "failed":
self.instance.reason = "Failed"
elif not flash_error:
self.instance.status = "failed"
self.instance.reason = "Timeout"
if self.instance.status in ["error", "failed"]:
self.instance.add_missing_case_status("blocked", self.instance.reason)
self._update_instance_info(harness.state, handler_time, flash_error)
self._final_handle_actions(harness, handler_time)
@ -667,6 +717,7 @@ class DeviceHandler(Handler):
else:
self.make_device_available(serial_device)
class QEMUHandler(Handler):
"""Spawns a thread to monitor QEMU output from pipes
@ -707,11 +758,14 @@ class QEMUHandler(Handler):
return cpu_time.user + cpu_time.system
@staticmethod
def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness,
ignore_unexpected_eof=False):
def _thread_get_fifo_names(fifo_fn):
fifo_in = fifo_fn + ".in"
fifo_out = fifo_fn + ".out"
return fifo_in, fifo_out
@staticmethod
def _thread_open_files(fifo_in, fifo_out, logfile):
# These in/out nodes are named from QEMU's perspective, not ours
if os.path.exists(fifo_in):
os.unlink(fifo_in)
@ -728,6 +782,51 @@ class QEMUHandler(Handler):
in_fp = open(fifo_out, "rb", buffering=0)
log_out_fp = open(logfile, "wt")
return out_fp, in_fp, log_out_fp
@staticmethod
def _thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp):
log_out_fp.close()
out_fp.close()
in_fp.close()
if pid:
try:
if pid:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
# Oh well, as long as it's dead! User probably sent Ctrl-C
pass
os.unlink(fifo_in)
os.unlink(fifo_out)
@staticmethod
def _thread_update_instance_info(handler, handler_time, out_state):
handler.instance.execution_time = handler_time
if out_state == "timeout":
handler.instance.status = "failed"
handler.instance.reason = "Timeout"
elif out_state == "failed":
handler.instance.status = "failed"
handler.instance.reason = "Failed"
elif out_state in ['unexpected eof', 'unexpected byte']:
handler.instance.status = "failed"
handler.instance.reason = out_state
else:
handler.instance.status = out_state
handler.instance.reason = "Unknown"
@staticmethod
def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results,
harness, ignore_unexpected_eof=False):
fifo_in, fifo_out = QEMUHandler._thread_get_fifo_names(fifo_fn)
out_fp, in_fp, log_out_fp = QEMUHandler._thread_open_files(
fifo_in,
fifo_out,
logfile
)
start_time = time.time()
timeout_time = start_time + timeout
p = select.poll()
@ -746,9 +845,9 @@ class QEMUHandler(Handler):
if this_timeout < 0 or not p.poll(this_timeout):
try:
if pid and this_timeout > 0:
#there's possibility we polled nothing because
#of not enough CPU time scheduled by host for
#QEMU process during p.poll(this_timeout)
# there's possibility we polled nothing because
# of not enough CPU time scheduled by host for
# QEMU process during p.poll(this_timeout)
cpu_time = QEMUHandler._get_cpu_time(pid)
if cpu_time < timeout and not out_state:
timeout_time = time.time() + (timeout - cpu_time)
@ -810,62 +909,62 @@ class QEMUHandler(Handler):
handler_time = time.time() - start_time
logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds")
handler.instance.execution_time = handler_time
if out_state == "timeout":
handler.instance.status = "failed"
handler.instance.reason = "Timeout"
elif out_state == "failed":
handler.instance.status = "failed"
handler.instance.reason = "Failed"
elif out_state in ['unexpected eof', 'unexpected byte']:
handler.instance.status = "failed"
handler.instance.reason = out_state
else:
handler.instance.status = out_state
handler.instance.reason = "Unknown"
QEMUHandler._thread_update_instance_info(handler, handler_time, out_state)
log_out_fp.close()
out_fp.close()
in_fp.close()
if pid:
try:
if pid:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
# Oh well, as long as it's dead! User probably sent Ctrl-C
pass
os.unlink(fifo_in)
os.unlink(fifo_out)
def handle(self, harness):
self.results = {}
self.run = True
# We pass this to QEMU which looks for fifos with .in and .out
# suffixes.
QEMUHandler._thread_close_files(fifo_in, fifo_out, pid, out_fp, in_fp, log_out_fp)
def _get_sysbuild_build_dir(self):
if self.instance.testsuite.sysbuild:
# Load domain yaml to get default domain build directory
# Note: for targets using QEMU, we assume that the target will
# have added any additional images to the run target manually
domain_path = os.path.join(self.build_dir, "domains.yaml")
domains = Domains.from_file(domain_path)
logger.debug("Loaded sysbuild domain data from %s" % (domain_path))
logger.debug("Loaded sysbuild domain data from %s" % domain_path)
build_dir = domains.get_default_domain().build_dir
else:
build_dir = self.build_dir
return build_dir
def _set_qemu_filenames(self, sysbuild_build_dir):
# We pass this to QEMU which looks for fifos with .in and .out suffixes.
# QEMU fifo will use main build dir
self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo")
# PID file will be created in the main sysbuild app's build dir
self.pid_fn = os.path.join(build_dir, "qemu.pid")
self.pid_fn = os.path.join(sysbuild_build_dir, "qemu.pid")
if os.path.exists(self.pid_fn):
os.unlink(self.pid_fn)
self.log_fn = self.log
def _create_command(self, sysbuild_build_dir):
command = [self.generator_cmd]
command += ["-C", sysbuild_build_dir, "run"]
return command
def _update_instance_info(self, harness_state, is_timeout):
if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
self.instance.status = "failed"
if is_timeout:
self.instance.reason = "Timeout"
else:
if not self.instance.reason:
self.instance.reason = "Exited with {}".format(self.returncode)
self.instance.add_missing_case_status("blocked")
def handle(self, harness):
self.results = {}
self.run = True
sysbuild_build_dir = self._get_sysbuild_build_dir()
command = self._create_command(sysbuild_build_dir)
self._set_qemu_filenames(sysbuild_build_dir)
self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
args=(self, self.timeout, self.build_dir,
self.log_fn, self.fifo_fn,
@ -879,8 +978,6 @@ class QEMUHandler(Handler):
subprocess.call(["stty", "sane"], stdin=sys.stdout)
logger.debug("Running %s (%s)" % (self.name, self.type_str))
command = [self.generator_cmd]
command += ["-C", build_dir, "run"]
is_timeout = False
qemu_pid = None
@ -919,14 +1016,7 @@ class QEMUHandler(Handler):
logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}")
if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state:
self.instance.status = "failed"
if is_timeout:
self.instance.reason = "Timeout"
else:
if not self.instance.reason:
self.instance.reason = "Exited with {}".format(self.returncode)
self.instance.add_missing_case_status("blocked")
self._update_instance_info(harness.state, is_timeout)
self._final_handle_actions(harness, 0)