56d7471b0c
Running sanitycheck from Windows is not supported yet. Unfortunately, the error message is obscure when a user is unaware of this and runs it from Windows anyway. This patch gives an easy to understand error message explaining this issue. Signed-off-by: Sebastian Bøe <sebastian.boe@nordicsemi.no>
3149 lines
114 KiB
Python
Executable file
3149 lines
114 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# vim: set syntax=python ts=4 :
|
|
"""Zephyr Sanity Tests
|
|
|
|
This script scans for the set of unit test applications in the git
|
|
repository and attempts to execute them. By default, it tries to
|
|
build each test case on one platform per architecture, using a precedence
|
|
list defined in an architecture configuration file, and if possible
|
|
run the tests in the QEMU emulator.
|
|
|
|
Test cases are detected by the presence of a 'testcase.yaml' or a sample.yaml
|
|
files in the application's project directory. This file may contain one or more
|
|
blocks, each identifying a test scenario. The title of the block is a name for
|
|
the test case, which only needs to be unique for the test cases specified in
|
|
that testcase meta-data. The full canonical name for each test case is <path to
|
|
test case>/<block>.
|
|
|
|
Each test block in the testcase meta data can define the following key/value
|
|
pairs:
|
|
|
|
tags: <list of tags> (required)
|
|
A set of string tags for the testcase. Usually pertains to
|
|
functional domains but can be anything. Command line invocations
|
|
of this script can filter the set of tests to run based on tag.
|
|
|
|
skip: <True|False> (default False)
|
|
skip testcase unconditionally. This can be used for broken tests.
|
|
|
|
slow: <True|False> (default False)
|
|
Don't build or run this test case unless --enable-slow was passed
|
|
in on the command line. Intended for time-consuming test cases
|
|
that are only run under certain circumstances, like daily
|
|
builds.
|
|
|
|
extra_args: <list of extra arguments>
|
|
Extra cache entries to pass to CMake when building or running the
|
|
test case.
|
|
|
|
extra_configs: <list of extra configurations>
|
|
Extra configuration options to be merged with a master prj.conf
|
|
when building or running the test case.
|
|
|
|
build_only: <True|False> (default False)
|
|
If true, don't try to run the test under QEMU even if the
|
|
selected platform supports it.
|
|
|
|
build_on_all: <True|False> (default False)
|
|
If true, attempt to build test on all available platforms.
|
|
|
|
depends_on: <list of features>
|
|
A board or platform can announce what features it supports, this option
|
|
will enable the test only those platforms that provide this feature.
|
|
|
|
min_ram: <integer>
|
|
minimum amount of RAM needed for this test to build and run. This is
|
|
compared with information provided by the board metadata.
|
|
|
|
min_flash: <integer>
|
|
minimum amount of ROM needed for this test to build and run. This is
|
|
compared with information provided by the board metadata.
|
|
|
|
timeout: <number of seconds>
|
|
Length of time to run test in QEMU before automatically killing it.
|
|
Default to 60 seconds.
|
|
|
|
arch_whitelist: <list of arches, such as x86, arm, arc>
|
|
Set of architectures that this test case should only be run for.
|
|
|
|
arch_exclude: <list of arches, such as x86, arm, arc>
|
|
Set of architectures that this test case should not run on.
|
|
|
|
platform_whitelist: <list of platforms>
|
|
Set of platforms that this test case should only be run for.
|
|
|
|
platform_exclude: <list of platforms>
|
|
Set of platforms that this test case should not run on.
|
|
|
|
extra_sections: <list of extra binary sections>
|
|
When computing sizes, sanitycheck will report errors if it finds
|
|
extra, unexpected sections in the Zephyr binary unless they are named
|
|
here. They will not be included in the size calculation.
|
|
|
|
filter: <expression>
|
|
Filter whether the testcase should be run by evaluating an expression
|
|
against an environment containing the following values:
|
|
|
|
{ ARCH : <architecture>,
|
|
PLATFORM : <platform>,
|
|
<all CONFIG_* key/value pairs in the test's generated defconfig>,
|
|
*<env>: any environment variable available
|
|
}
|
|
|
|
The grammar for the expression language is as follows:
|
|
|
|
expression ::= expression "and" expression
|
|
| expression "or" expression
|
|
| "not" expression
|
|
| "(" expression ")"
|
|
| symbol "==" constant
|
|
| symbol "!=" constant
|
|
| symbol "<" number
|
|
| symbol ">" number
|
|
| symbol ">=" number
|
|
| symbol "<=" number
|
|
| symbol "in" list
|
|
| symbol ":" string
|
|
| symbol
|
|
|
|
list ::= "[" list_contents "]"
|
|
|
|
list_contents ::= constant
|
|
| list_contents "," constant
|
|
|
|
constant ::= number
|
|
| string
|
|
|
|
|
|
For the case where expression ::= symbol, it evaluates to true
|
|
if the symbol is defined to a non-empty string.
|
|
|
|
Operator precedence, starting from lowest to highest:
|
|
|
|
or (left associative)
|
|
and (left associative)
|
|
not (right associative)
|
|
all comparison operators (non-associative)
|
|
|
|
arch_whitelist, arch_exclude, platform_whitelist, platform_exclude
|
|
are all syntactic sugar for these expressions. For instance
|
|
|
|
arch_exclude = x86 arc
|
|
|
|
Is the same as:
|
|
|
|
filter = not ARCH in ["x86", "arc"]
|
|
|
|
The ':' operator compiles the string argument as a regular expression,
|
|
and then returns a true value only if the symbol's value in the environment
|
|
matches. For example, if CONFIG_SOC="quark_se" then
|
|
|
|
filter = CONFIG_SOC : "quark.*"
|
|
|
|
Would match it.
|
|
|
|
The set of test cases that actually run depends on directives in the testcase
|
|
filed and options passed in on the command line. If there is any confusion,
|
|
running with -v or --discard-report can help show why particular test cases
|
|
were skipped.
|
|
|
|
Metrics (such as pass/fail state and binary size) for the last code
|
|
release are stored in scripts/sanity_chk/sanity_last_release.csv.
|
|
To update this, pass the --all --release options.
|
|
|
|
To load arguments from a file, write '+' before the file name, e.g.,
|
|
+file_name. File content must be one or more valid arguments separated by
|
|
line break instead of white spaces.
|
|
|
|
Most everyday users will run with no arguments.
|
|
|
|
"""
|
|
|
|
import os
|
|
|
|
if os.name == 'nt':
|
|
print("Running sanitycheck on Windows is not supported yet.")
|
|
print("https://github.com/zephyrproject-rtos/zephyr/issues/2664")
|
|
exit(1)
|
|
|
|
import contextlib
|
|
import string
|
|
import mmap
|
|
import argparse
|
|
import sys
|
|
import re
|
|
import subprocess
|
|
import multiprocessing
|
|
import select
|
|
import shutil
|
|
import signal
|
|
import threading
|
|
import time
|
|
import csv
|
|
import glob
|
|
import serial
|
|
import concurrent
|
|
import concurrent.futures
|
|
import xml.etree.ElementTree as ET
|
|
from xml.sax.saxutils import escape
|
|
from collections import OrderedDict
|
|
from itertools import islice
|
|
from functools import cmp_to_key
|
|
from pathlib import Path
|
|
from distutils.spawn import find_executable
|
|
|
|
import logging
|
|
from sanity_chk import scl
|
|
from sanity_chk import expr_parser
|
|
|
|
log_format = "%(levelname)s %(name)s::%(module)s.%(funcName)s():%(lineno)d: %(message)s"
|
|
logging.basicConfig(format=log_format, level=30)
|
|
|
|
ZEPHYR_BASE = os.environ.get("ZEPHYR_BASE")
|
|
if not ZEPHYR_BASE:
|
|
sys.stderr.write("$ZEPHYR_BASE environment variable undefined.\n")
|
|
exit(1)
|
|
|
|
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/"))
|
|
|
|
|
|
VERBOSE = 0
|
|
LAST_SANITY = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk",
|
|
"last_sanity.csv")
|
|
LAST_SANITY_XUNIT = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk",
|
|
"last_sanity.xml")
|
|
RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk",
|
|
"sanity_last_release.csv")
|
|
JOBS = multiprocessing.cpu_count() * 2
|
|
|
|
if os.isatty(sys.stdout.fileno()):
|
|
TERMINAL = True
|
|
COLOR_NORMAL = '\033[0m'
|
|
COLOR_RED = '\033[91m'
|
|
COLOR_GREEN = '\033[92m'
|
|
COLOR_YELLOW = '\033[93m'
|
|
else:
|
|
TERMINAL = False
|
|
COLOR_NORMAL = ""
|
|
COLOR_RED = ""
|
|
COLOR_GREEN = ""
|
|
COLOR_YELLOW = ""
|
|
|
|
class SanityCheckException(Exception):
|
|
pass
|
|
|
|
|
|
class SanityRuntimeError(SanityCheckException):
|
|
pass
|
|
|
|
|
|
class ConfigurationError(SanityCheckException):
|
|
def __init__(self, cfile, message):
|
|
self.cfile = cfile
|
|
self.message = message
|
|
|
|
def __str__(self):
|
|
return repr(self.cfile + ": " + self.message)
|
|
|
|
|
|
class MakeError(SanityCheckException):
|
|
pass
|
|
|
|
|
|
class BuildError(MakeError):
|
|
pass
|
|
|
|
|
|
class ExecutionError(MakeError):
|
|
pass
|
|
|
|
|
|
log_file = None
|
|
|
|
|
|
# Debug Functions
|
|
def info(what):
|
|
sys.stdout.write(what + "\n")
|
|
sys.stdout.flush()
|
|
if log_file:
|
|
log_file.write(what + "\n")
|
|
log_file.flush()
|
|
|
|
|
|
def error(what):
|
|
sys.stderr.write(COLOR_RED + what + COLOR_NORMAL + "\n")
|
|
if log_file:
|
|
log_file(what + "\n")
|
|
log_file.flush()
|
|
|
|
|
|
def debug(what):
|
|
if VERBOSE >= 1:
|
|
info(what)
|
|
|
|
|
|
def verbose(what):
|
|
if VERBOSE >= 2:
|
|
info(what)
|
|
|
|
class HarnessImporter:
|
|
|
|
def __init__(self, name):
|
|
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/sanity_chk"))
|
|
module = __import__("harness")
|
|
if name:
|
|
my_class = getattr(module, name)
|
|
else:
|
|
my_class = getattr(module, "Test")
|
|
|
|
self.instance = my_class()
|
|
|
|
class Handler:
|
|
def __init__(self, instance):
|
|
"""Constructor
|
|
|
|
@param name Arbitrary name of the created thread
|
|
@param outdir Working directory, should be where handler pid file (qemu.pid for example)
|
|
gets created by the build system
|
|
@param log_fn Absolute path to write out handler's log data
|
|
@param timeout Kill the handler process if it doesn't finish up within
|
|
the given number of seconds
|
|
"""
|
|
self.lock = threading.Lock()
|
|
self.state = "waiting"
|
|
self.run = False
|
|
self.metrics = {}
|
|
self.metrics["handler_time"] = 0
|
|
self.metrics["ram_size"] = 0
|
|
self.metrics["rom_size"] = 0
|
|
|
|
self.binary = None
|
|
self.pid_fn = None
|
|
self.call_make_run = False
|
|
|
|
self.name = instance.name
|
|
self.instance = instance
|
|
self.timeout = instance.test.timeout
|
|
self.sourcedir = instance.test.test_path
|
|
self.outdir = instance.outdir
|
|
self.log = os.path.join(self.outdir, "handler.log")
|
|
self.returncode = 0
|
|
self.set_state("running", {})
|
|
|
|
def set_state(self, state, metrics):
|
|
self.lock.acquire()
|
|
self.state = state
|
|
self.metrics.update(metrics)
|
|
self.lock.release()
|
|
|
|
def get_state(self):
|
|
self.lock.acquire()
|
|
ret = (self.state, self.metrics)
|
|
self.lock.release()
|
|
return ret
|
|
|
|
class BinaryHandler(Handler):
|
|
def __init__(self, instance):
|
|
"""Constructor
|
|
|
|
@param instance Test Instance
|
|
"""
|
|
super().__init__(instance)
|
|
|
|
self.valgrind = False
|
|
self.terminated = False
|
|
|
|
def try_kill_process_by_pid(self):
|
|
if self.pid_fn != None:
|
|
pid = int(open(self.pid_fn).read())
|
|
os.unlink(self.pid_fn)
|
|
self.pid_fn = None # clear so we don't try to kill the binary twice
|
|
try:
|
|
os.kill(pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
pass
|
|
|
|
def _output_reader(self, proc, harness):
|
|
log_out_fp = open(self.log, "wt")
|
|
for line in iter(proc.stdout.readline, b''):
|
|
verbose("OUTPUT: {0}".format(line.decode('utf-8').rstrip()))
|
|
log_out_fp.write(line.decode('utf-8'))
|
|
log_out_fp.flush()
|
|
harness.handle(line.decode('utf-8').rstrip())
|
|
if harness.state:
|
|
try:
|
|
#POSIX arch based ztests end on their own,
|
|
#so let's give it up to 100ms to do so
|
|
proc.wait(0.1)
|
|
except subprocess.TimeoutExpired:
|
|
proc.terminate()
|
|
self.terminated = True
|
|
break
|
|
|
|
log_out_fp.close()
|
|
|
|
def handle(self):
|
|
|
|
harness_name = self.instance.test.harness.capitalize()
|
|
harness_import = HarnessImporter(harness_name)
|
|
harness = harness_import.instance
|
|
harness.configure(self.instance)
|
|
|
|
if self.call_make_run:
|
|
if options.ninja:
|
|
generator_cmd = "ninja"
|
|
else:
|
|
generator_cmd = "make"
|
|
command = [generator_cmd, "-C", self.outdir, "run"]
|
|
else:
|
|
command = [self.binary]
|
|
|
|
if shutil.which("valgrind") and self.valgrind:
|
|
command = ["valgrind", "--error-exitcode=2",
|
|
"--leak-check=full",
|
|
"--suppressions="+ZEPHYR_BASE+"/scripts/valgrind.supp",
|
|
"--log-file="+self.outdir+"/valgrind.log"
|
|
] + command
|
|
|
|
with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
|
|
t = threading.Thread(target=self._output_reader, args=(proc, harness, ))
|
|
t.start()
|
|
t.join(self.timeout)
|
|
if t.is_alive():
|
|
self.try_kill_process_by_pid()
|
|
proc.terminate()
|
|
self.terminated = True
|
|
t.join()
|
|
proc.wait()
|
|
self.returncode = proc.returncode
|
|
|
|
if options.enable_coverage:
|
|
returncode = subprocess.call(["GCOV_PREFIX=" + self.outdir,
|
|
"gcov", self.sourcedir, "-b", "-s", self.outdir], shell=True)
|
|
|
|
self.try_kill_process_by_pid()
|
|
|
|
# FIME: This is needed when killing the simulator, the console is
|
|
# garbled and needs to be reset. Did not find a better way to do that.
|
|
|
|
subprocess.call(["stty", "sane"])
|
|
self.instance.results = harness.tests
|
|
if self.terminated==False and self.returncode != 0:
|
|
#When a process is killed, the default handler returns 128 + SIGTERM
|
|
#so in that case the return code itself is not meaningful
|
|
self.set_state("error", {})
|
|
elif harness.state:
|
|
self.set_state(harness.state, {})
|
|
else:
|
|
self.set_state("timeout", {})
|
|
|
|
class DeviceHandler(Handler):
|
|
|
|
def __init__(self, instance):
|
|
"""Constructor
|
|
|
|
@param instance Test Instance
|
|
"""
|
|
super().__init__(instance)
|
|
|
|
def monitor_serial(self, ser, harness):
|
|
log_out_fp = open(self.log, "wt")
|
|
|
|
while ser.isOpen():
|
|
try:
|
|
serial_line = ser.readline()
|
|
except TypeError:
|
|
pass
|
|
|
|
if serial_line:
|
|
sl = serial_line.decode('utf-8', 'ignore')
|
|
verbose("DEVICE: {0}".format(sl.rstrip()))
|
|
|
|
log_out_fp.write(sl)
|
|
log_out_fp.flush()
|
|
harness.handle(sl.rstrip())
|
|
if harness.state:
|
|
ser.close()
|
|
break
|
|
|
|
log_out_fp.close()
|
|
|
|
def handle(self):
|
|
out_state = "failed"
|
|
|
|
if options.ninja:
|
|
generator_cmd = "ninja"
|
|
else:
|
|
generator_cmd = "make"
|
|
|
|
command = [generator_cmd, "-C", self.outdir, "flash"]
|
|
|
|
device = options.device_serial
|
|
ser = serial.Serial(
|
|
device,
|
|
baudrate=115200,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
bytesize=serial.EIGHTBITS,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
ser.flush()
|
|
|
|
harness_name = self.instance.test.harness.capitalize()
|
|
harness_import = HarnessImporter(harness_name)
|
|
harness = harness_import.instance
|
|
harness.configure(self.instance)
|
|
|
|
t = threading.Thread(target=self.monitor_serial, args=(ser, harness))
|
|
t.start()
|
|
|
|
try:
|
|
subprocess.check_output(command, stderr=subprocess.PIPE)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
t.join(self.timeout)
|
|
if t.is_alive():
|
|
out_state = "timeout"
|
|
ser.close()
|
|
|
|
if ser.isOpen():
|
|
ser.close()
|
|
|
|
if out_state == "timeout":
|
|
for c in self.instance.test.cases:
|
|
if c not in harness.tests:
|
|
harness.tests[c] = "BLOCK"
|
|
|
|
self.instance.results = harness.tests
|
|
if harness.state:
|
|
self.set_state(harness.state, {})
|
|
else:
|
|
self.set_state(out_state, {})
|
|
|
|
|
|
class QEMUHandler(Handler):
|
|
"""Spawns a thread to monitor QEMU output from pipes
|
|
|
|
We pass QEMU_PIPE to 'make run' and monitor the pipes for output.
|
|
We need to do this as once qemu starts, it runs forever until killed.
|
|
Test cases emit special messages to the console as they run, we check
|
|
for these to collect whether the test passed or failed.
|
|
"""
|
|
|
|
@staticmethod
|
|
def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness):
|
|
fifo_in = fifo_fn + ".in"
|
|
fifo_out = fifo_fn + ".out"
|
|
|
|
# These in/out nodes are named from QEMU's perspective, not ours
|
|
if os.path.exists(fifo_in):
|
|
os.unlink(fifo_in)
|
|
os.mkfifo(fifo_in)
|
|
if os.path.exists(fifo_out):
|
|
os.unlink(fifo_out)
|
|
os.mkfifo(fifo_out)
|
|
|
|
# We don't do anything with out_fp but we need to open it for
|
|
# writing so that QEMU doesn't block, due to the way pipes work
|
|
out_fp = open(fifo_in, "wb")
|
|
# Disable internal buffering, we don't
|
|
# want read() or poll() to ever block if there is data in there
|
|
in_fp = open(fifo_out, "rb", buffering=0)
|
|
log_out_fp = open(logfile, "wt")
|
|
|
|
start_time = time.time()
|
|
timeout_time = start_time + timeout
|
|
p = select.poll()
|
|
p.register(in_fp, select.POLLIN)
|
|
out_state = None
|
|
|
|
metrics = {}
|
|
line = ""
|
|
timeout_extended = False
|
|
while True:
|
|
this_timeout = int((timeout_time - time.time()) * 1000)
|
|
if this_timeout < 0 or not p.poll(this_timeout):
|
|
if not out_state:
|
|
out_state = "timeout"
|
|
break
|
|
|
|
try:
|
|
c = in_fp.read(1).decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
# Test is writing something weird, fail
|
|
out_state = "unexpected byte"
|
|
break
|
|
|
|
if c == "":
|
|
# EOF, this shouldn't happen unless QEMU crashes
|
|
out_state = "unexpected eof"
|
|
break
|
|
line = line + c
|
|
if c != "\n":
|
|
continue
|
|
|
|
# line contains a full line of data output from QEMU
|
|
log_out_fp.write(line)
|
|
log_out_fp.flush()
|
|
line = line.strip()
|
|
verbose("QEMU: %s" % line)
|
|
|
|
harness.handle(line)
|
|
if harness.state:
|
|
# if we have registered a fail make sure the state is not
|
|
# overridden by a false success message coming from the
|
|
# testsuite
|
|
if out_state != 'failed':
|
|
out_state = harness.state
|
|
|
|
# if we get some state, that means test is doing well, we reset
|
|
# the timeout and wait for 2 more seconds just in case we have
|
|
# crashed after test has completed
|
|
|
|
if harness.type:
|
|
break
|
|
else:
|
|
if not timeout_extended:
|
|
timeout_extended= True
|
|
timeout_time = time.time() + 2
|
|
|
|
# TODO: Add support for getting numerical performance data
|
|
# from test cases. Will involve extending test case reporting
|
|
# APIs. Add whatever gets reported to the metrics dictionary
|
|
line = ""
|
|
|
|
metrics["handler_time"] = time.time() - start_time
|
|
verbose("QEMU complete (%s) after %f seconds" %
|
|
(out_state, metrics["handler_time"]))
|
|
handler.set_state(out_state, metrics)
|
|
|
|
log_out_fp.close()
|
|
out_fp.close()
|
|
in_fp.close()
|
|
|
|
pid = int(open(pid_fn).read())
|
|
os.unlink(pid_fn)
|
|
try:
|
|
os.kill(pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
# Oh well, as long as it's dead! User probably sent Ctrl-C
|
|
pass
|
|
|
|
os.unlink(fifo_in)
|
|
os.unlink(fifo_out)
|
|
|
|
def __init__(self, instance):
|
|
"""Constructor
|
|
|
|
@param instance Test instance
|
|
"""
|
|
|
|
super().__init__(instance)
|
|
|
|
self.results = {}
|
|
self.run = True
|
|
|
|
# We pass this to QEMU which looks for fifos with .in and .out
|
|
# suffixes.
|
|
self.fifo_fn = os.path.join(instance.outdir, "qemu-fifo")
|
|
|
|
self.pid_fn = os.path.join(instance.outdir, "qemu.pid")
|
|
if os.path.exists(self.pid_fn):
|
|
os.unlink(self.pid_fn)
|
|
|
|
self.log_fn = self.log
|
|
|
|
harness_import = HarnessImporter(instance.test.harness.capitalize())
|
|
harness = harness_import.instance
|
|
harness.configure(self.instance)
|
|
self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread,
|
|
args=(self, self.timeout, self.outdir,
|
|
self.log_fn, self.fifo_fn,
|
|
self.pid_fn, self.results, harness))
|
|
|
|
self.instance.results = harness.tests
|
|
self.thread.daemon = True
|
|
verbose("Spawning QEMU process for %s" % self.name)
|
|
self.thread.start()
|
|
|
|
def get_fifo(self):
|
|
return self.fifo_fn
|
|
|
|
|
|
class SizeCalculator:
|
|
|
|
alloc_sections = ["bss", "noinit", "app_bss", "app_noinit", "ccm_bss",
|
|
"ccm_noinit"]
|
|
rw_sections = ["datas", "initlevel", "_k_task_list", "_k_event_list",
|
|
"_k_memory_pool", "exceptions", "initshell",
|
|
"_static_thread_area", "_k_timer_area",
|
|
"_k_mem_slab_area", "_k_mem_pool_area", "sw_isr_table",
|
|
"_k_sem_area", "_k_mutex_area",
|
|
"_k_fifo_area", "_k_lifo_area", "_k_stack_area",
|
|
"_k_msgq_area", "_k_mbox_area", "_k_pipe_area",
|
|
"net_if", "net_if_dev", "net_stack", "net_l2_data",
|
|
"_k_queue_area", "_net_buf_pool_area", "app_datas",
|
|
"kobject_data", "mmu_tables", "app_pad", "priv_stacks",
|
|
"ccm_data", "usb_descriptor", "usb_data", "usb_bos_desc",
|
|
'log_backends_sections', 'log_dynamic_sections',
|
|
'log_const_sections',"app_smem", 'shell_root_cmds_sections',
|
|
'log_const_sections',"app_smem", "font_entry_sections",
|
|
"priv_stacks_noinit", "_TEXT_SECTION_NAME_2",
|
|
'_GCOV_BSS_SECTION_NAME', 'gcov']
|
|
|
|
# These get copied into RAM only on non-XIP
|
|
ro_sections = ["text", "ctors", "init_array", "reset", "object_access",
|
|
"rodata", "devconfig", "net_l2", "vector", "sw_isr_table",
|
|
"_bt_settings_area"]
|
|
|
|
def __init__(self, filename, extra_sections):
|
|
"""Constructor
|
|
|
|
@param filename Path to the output binary
|
|
The <filename> is parsed by objdump to determine section sizes
|
|
"""
|
|
# Make sure this is an ELF binary
|
|
with open(filename, "rb") as f:
|
|
magic = f.read(4)
|
|
|
|
try:
|
|
if (magic != b'\x7fELF'):
|
|
raise SanityRuntimeError("%s is not an ELF binary" % filename)
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
# Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK.
|
|
# GREP can not be used as it returns an error if the symbol is not
|
|
# found.
|
|
is_xip_command = "nm " + filename + \
|
|
" | awk '/CONFIG_XIP/ { print $3 }'"
|
|
is_xip_output = subprocess.check_output(
|
|
is_xip_command, shell=True, stderr=subprocess.STDOUT).decode(
|
|
"utf-8").strip()
|
|
try:
|
|
if is_xip_output.endswith("no symbols"):
|
|
raise SanityRuntimeError("%s has no symbol information" % filename)
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
self.is_xip = (len(is_xip_output) != 0)
|
|
|
|
self.filename = filename
|
|
self.sections = []
|
|
self.rom_size = 0
|
|
self.ram_size = 0
|
|
self.extra_sections = extra_sections
|
|
|
|
self._calculate_sizes()
|
|
|
|
def get_ram_size(self):
|
|
"""Get the amount of RAM the application will use up on the device
|
|
|
|
@return amount of RAM, in bytes
|
|
"""
|
|
return self.ram_size
|
|
|
|
def get_rom_size(self):
|
|
"""Get the size of the data that this application uses on device's flash
|
|
|
|
@return amount of ROM, in bytes
|
|
"""
|
|
return self.rom_size
|
|
|
|
def unrecognized_sections(self):
|
|
"""Get a list of sections inside the binary that weren't recognized
|
|
|
|
@return list of unrecognized section names
|
|
"""
|
|
slist = []
|
|
for v in self.sections:
|
|
if not v["recognized"]:
|
|
slist.append(v["name"])
|
|
return slist
|
|
|
|
def _calculate_sizes(self):
|
|
""" Calculate RAM and ROM usage by section """
|
|
objdump_command = "objdump -h " + self.filename
|
|
objdump_output = subprocess.check_output(
|
|
objdump_command, shell=True).decode("utf-8").splitlines()
|
|
|
|
for line in objdump_output:
|
|
words = line.split()
|
|
|
|
if (len(words) == 0): # Skip lines that are too short
|
|
continue
|
|
|
|
index = words[0]
|
|
if (not index[0].isdigit()): # Skip lines that do not start
|
|
continue # with a digit
|
|
|
|
name = words[1] # Skip lines with section names
|
|
if (name[0] == '.'): # starting with '.'
|
|
continue
|
|
|
|
# TODO this doesn't actually reflect the size in flash or RAM as
|
|
# it doesn't include linker-imposed padding between sections.
|
|
# It is close though.
|
|
size = int(words[2], 16)
|
|
if size == 0:
|
|
continue
|
|
|
|
load_addr = int(words[4], 16)
|
|
virt_addr = int(words[3], 16)
|
|
|
|
# Add section to memory use totals (for both non-XIP and XIP scenarios)
|
|
# Unrecognized section names are not included in the calculations.
|
|
recognized = True
|
|
if name in SizeCalculator.alloc_sections:
|
|
self.ram_size += size
|
|
stype = "alloc"
|
|
elif name in SizeCalculator.rw_sections:
|
|
self.ram_size += size
|
|
self.rom_size += size
|
|
stype = "rw"
|
|
elif name in SizeCalculator.ro_sections:
|
|
self.rom_size += size
|
|
if not self.is_xip:
|
|
self.ram_size += size
|
|
stype = "ro"
|
|
else:
|
|
stype = "unknown"
|
|
if name not in self.extra_sections:
|
|
recognized = False
|
|
|
|
self.sections.append({"name": name, "load_addr": load_addr,
|
|
"size": size, "virt_addr": virt_addr,
|
|
"type": stype, "recognized": recognized})
|
|
|
|
|
|
class MakeGoal:
|
|
"""Metadata class representing one of the sub-makes called by MakeGenerator
|
|
|
|
MakeGenerator returns a dictionary of these which can then be associated
|
|
with TestInstances to get a complete picture of what happened during a test.
|
|
MakeGenerator is used for tasks outside of building tests (such as
|
|
defconfigs) which is why MakeGoal is a separate class from TestInstance.
|
|
"""
|
|
|
|
def __init__(self, name, text, handler, make_log, build_log, run_log, handler_log):
|
|
self.name = name
|
|
self.text = text
|
|
self.handler = handler
|
|
self.make_log = make_log
|
|
self.build_log = build_log
|
|
self.run_log = run_log
|
|
self.handler_log = handler_log
|
|
self.make_state = "waiting"
|
|
self.failed = False
|
|
self.finished = False
|
|
self.reason = None
|
|
self.metrics = {}
|
|
|
|
def get_error_log(self):
|
|
if self.make_state == "waiting":
|
|
# Shouldn't ever see this; breakage in the main Makefile itself.
|
|
return self.make_log
|
|
elif self.make_state == "building":
|
|
# Failure when calling the sub-make to build the code
|
|
return self.build_log
|
|
elif self.make_state == "running":
|
|
# Failure in sub-make for "make run", qemu probably failed to start
|
|
return self.run_log
|
|
elif self.make_state == "finished":
|
|
# Execution handler finished, but timed out or otherwise wasn't successful
|
|
return self.handler_log
|
|
|
|
def fail(self, reason):
|
|
self.failed = True
|
|
self.finished = True
|
|
self.reason = reason
|
|
|
|
def success(self):
|
|
self.finished = True
|
|
|
|
def __str__(self):
|
|
if self.finished:
|
|
if self.failed:
|
|
return "[%s] failed (%s: see %s)" % (self.name, self.reason,
|
|
self.get_error_log())
|
|
else:
|
|
return "[%s] passed" % self.name
|
|
else:
|
|
return "[%s] in progress (%s)" % (self.name, self.make_state)
|
|
|
|
|
|
class MakeGenerator:
|
|
"""Generates a Makefile which just calls a bunch of sub-make sessions
|
|
|
|
In any given test suite we may need to build dozens if not hundreds of
|
|
test cases. The cleanest way to parallelize this is to just let Make
|
|
do the parallelization, sharing the jobserver among all the different
|
|
sub-make targets.
|
|
"""
|
|
|
|
GOAL_HEADER_TMPL = """.PHONY: {goal}
|
|
{goal}:
|
|
"""
|
|
|
|
MAKE_RULE_TMPL = """\t@echo sanity_test_{phase} {goal} >&2
|
|
\tcmake \\
|
|
\t\t-G"{generator}"\\
|
|
\t\t-H{directory}\\
|
|
\t\t-B{outdir}\\
|
|
\t\t-DEXTRA_CFLAGS="-Werror {cflags}"\\
|
|
\t\t-DEXTRA_AFLAGS=-Wa,--fatal-warnings\\
|
|
\t\t-DEXTRA_LDFLAGS="{ldflags}"\\
|
|
\t\t{args}\\
|
|
\t\t>{logfile} 2>&1
|
|
\t{generator_cmd} -C {outdir}\\
|
|
\t\t{verb} {make_args}\\
|
|
\t\t>>{logfile} 2>&1
|
|
"""
|
|
MAKE_RULE_TMPL_RUN = """\t@echo sanity_test_{phase} {goal} >&2
|
|
\t{generator_cmd} -C {outdir}\\
|
|
\t\t{verb} {make_args}\\
|
|
\t\t>>{logfile} 2>&1
|
|
"""
|
|
|
|
GOAL_FOOTER_TMPL = "\t@echo sanity_test_finished {goal} >&2\n\n"
|
|
|
|
re_make = re.compile(
|
|
"sanity_test_([A-Za-z0-9]+) (.+)|$|make[:] \*\*\* \[(.+:.+: )?(.+)\] Error.+$")
|
|
|
|
def __init__(self, base_outdir):
|
|
"""MakeGenerator constructor
|
|
|
|
@param base_outdir Intended to be the base out directory. A make.log
|
|
file will be created here which contains the output of the
|
|
top-level Make session, as well as the dynamic control Makefile
|
|
@param verbose If true, pass V=1 to all the sub-makes which greatly
|
|
increases their verbosity
|
|
"""
|
|
self.goals = {}
|
|
if not os.path.exists(base_outdir):
|
|
os.makedirs(base_outdir)
|
|
self.logfile = os.path.join(base_outdir, "make.log")
|
|
self.makefile = os.path.join(base_outdir, "Makefile")
|
|
self.deprecations = options.error_on_deprecations
|
|
|
|
def _get_rule_header(self, name):
|
|
return MakeGenerator.GOAL_HEADER_TMPL.format(goal=name)
|
|
|
|
def _get_sub_make(self, name, phase, workdir, outdir,
|
|
logfile, args, make_args=""):
|
|
"""
|
|
@param args Arguments given to CMake
|
|
@param make_args Arguments given to the Makefile generated by CMake
|
|
"""
|
|
args = " ".join(["-D{}".format(a) for a in args])
|
|
ldflags = ""
|
|
cflags = ""
|
|
|
|
if self.deprecations:
|
|
cflags = cflags + " -Wno-deprecated-declarations"
|
|
|
|
ldflags="-Wl,--fatal-warnings"
|
|
|
|
if options.ninja:
|
|
generator = "Ninja"
|
|
generator_cmd = "ninja -j1"
|
|
verb = "-v" if VERBOSE else ""
|
|
else:
|
|
generator = "Unix Makefiles"
|
|
generator_cmd = "$(MAKE)"
|
|
verb = "VERBOSE=1" if VERBOSE else ""
|
|
|
|
if phase == 'running':
|
|
return MakeGenerator.MAKE_RULE_TMPL_RUN.format(
|
|
generator_cmd=generator_cmd,
|
|
phase=phase,
|
|
goal=name,
|
|
outdir=outdir,
|
|
verb=verb,
|
|
logfile=logfile,
|
|
make_args=make_args
|
|
)
|
|
else:
|
|
return MakeGenerator.MAKE_RULE_TMPL.format(
|
|
generator=generator,
|
|
generator_cmd=generator_cmd,
|
|
phase=phase,
|
|
goal=name,
|
|
outdir=outdir,
|
|
cflags=cflags,
|
|
ldflags=ldflags,
|
|
directory=workdir,
|
|
verb=verb,
|
|
args=args,
|
|
logfile=logfile,
|
|
make_args=make_args
|
|
)
|
|
|
|
def _get_rule_footer(self, name):
|
|
return MakeGenerator.GOAL_FOOTER_TMPL.format(goal=name)
|
|
|
|
def add_build_goal(self, name, directory, outdir,
|
|
args, buildlog, make_args=""):
|
|
"""Add a goal to invoke a build session
|
|
|
|
@param name A unique string name for this build goal. The results
|
|
dictionary returned by execute() will be keyed by this name.
|
|
@param directory Absolute path to working directory, will be passed
|
|
to make -C
|
|
@param outdir Absolute path to output directory, will be passed to
|
|
cmake via -B=<path>
|
|
@param args Extra command line arguments to pass to 'cmake', typically
|
|
environment variables or specific Make goals
|
|
"""
|
|
|
|
if not os.path.exists(outdir):
|
|
os.makedirs(outdir)
|
|
|
|
build_logfile = os.path.join(outdir, buildlog)
|
|
text = self._get_rule_header(name)
|
|
text += self._get_sub_make(name, "building", directory, outdir, build_logfile,
|
|
args, make_args=make_args)
|
|
text += self._get_rule_footer(name)
|
|
|
|
self.goals[name] = MakeGoal( name, text, None, self.logfile, build_logfile, None, None)
|
|
|
|
def add_goal(self, instance, type, args, make_args=""):
|
|
|
|
"""Add a goal to build a Zephyr project and then run it using a handler
|
|
|
|
The generated make goal invokes Make twice, the first time it will
|
|
build the default goal, and the second will invoke the 'qemu' goal.
|
|
The output of the handler session will be monitored, and terminated
|
|
either upon pass/fail result of the test program, or the timeout
|
|
is reached.
|
|
|
|
@param args Extra cache entries to define in CMake.
|
|
"""
|
|
|
|
name = instance.name
|
|
directory = instance.test.test_path
|
|
outdir = instance.outdir
|
|
|
|
build_logfile = os.path.join(outdir, "build.log")
|
|
run_logfile = os.path.join(outdir, "run.log")
|
|
|
|
if not os.path.exists(outdir):
|
|
os.makedirs(outdir)
|
|
|
|
handler = None
|
|
if type == "qemu":
|
|
handler = QEMUHandler(instance)
|
|
elif type == "native":
|
|
handler = BinaryHandler(instance)
|
|
handler.binary = os.path.join(outdir, "zephyr", "zephyr.exe")
|
|
elif type == "nsim":
|
|
handler = BinaryHandler(instance)
|
|
handler.call_make_run = True
|
|
elif type == "unit":
|
|
handler = BinaryHandler(instance)
|
|
handler.binary = os.path.join(outdir, "testbinary")
|
|
elif type == "device":
|
|
handler = DeviceHandler(instance)
|
|
elif type == "renode":
|
|
handler = BinaryHandler(instance)
|
|
handler.pid_fn = os.path.join(instance.outdir, "renode.pid")
|
|
handler.call_make_run = True
|
|
|
|
if options.enable_coverage:
|
|
args += ["EXTRA_LDFLAGS=--coverage"]
|
|
|
|
if type == 'qemu':
|
|
args.append("QEMU_PIPE=%s" % handler.get_fifo())
|
|
|
|
text = (self._get_rule_header(name) +
|
|
self._get_sub_make(name, "building", directory,
|
|
outdir, build_logfile, args, make_args=make_args))
|
|
if handler and handler.run:
|
|
text += self._get_sub_make(name, "running", directory,
|
|
outdir, run_logfile,
|
|
args, make_args="run")
|
|
|
|
text += self._get_rule_footer(name)
|
|
|
|
self.goals[name] = MakeGoal(name, text, handler, self.logfile, build_logfile,
|
|
run_logfile, handler.log if handler else None)
|
|
|
|
|
|
def add_test_instance(self, ti, extra_args=[]):
|
|
"""Add a goal to build/test a TestInstance object
|
|
|
|
@param ti TestInstance object to build. The status dictionary returned
|
|
by execute() will be keyed by its .name field.
|
|
"""
|
|
args = ti.test.extra_args[:]
|
|
if len(ti.test.extra_configs) > 0 or options.coverage:
|
|
args.append("OVERLAY_CONFIG=%s" %
|
|
os.path.join(ti.outdir, "overlay.conf"))
|
|
|
|
if ti.test.type == "unit" and options.enable_coverage:
|
|
args.append("COVERAGE=1")
|
|
|
|
args.append("BOARD={}".format(ti.platform.name))
|
|
args.extend(extra_args)
|
|
|
|
do_build_only = ti.build_only or options.build_only
|
|
do_run = not do_build_only
|
|
skip_slow = ti.test.slow and not options.enable_slow
|
|
|
|
# FIXME: Need refactoring and cleanup
|
|
type = None
|
|
if ti.platform.qemu_support and do_run:
|
|
type = "qemu"
|
|
elif ti.test.type == "unit":
|
|
type = "unit"
|
|
elif ti.platform.type == "native" and do_run:
|
|
type = "native"
|
|
elif ti.platform.simulation == "nsim" and do_run:
|
|
if find_executable("nsimdrv"):
|
|
type = "nsim"
|
|
elif ti.platform.simulation == "renode" and do_run:
|
|
if find_executable("renode"):
|
|
type = "renode"
|
|
elif options.device_testing and (not ti.build_only) and (not options.build_only):
|
|
type = "device"
|
|
|
|
if not skip_slow:
|
|
self.add_goal(ti, type, args)
|
|
else:
|
|
verbose("Skipping slow test: " + ti.name)
|
|
|
|
def execute(self, callback_fn=None, context=None):
|
|
"""Execute all the registered build goals
|
|
|
|
@param callback_fn If not None, a callback function will be called
|
|
as individual goals transition between states. This function
|
|
should accept two parameters: a string state and an arbitrary
|
|
context object, supplied here
|
|
@param context Context object to pass to the callback function.
|
|
Type and semantics are specific to that callback function.
|
|
@return A dictionary mapping goal names to final status.
|
|
"""
|
|
|
|
with open(self.makefile, "wt") as tf, \
|
|
open(os.devnull, "wb") as devnull, \
|
|
open(self.logfile, "wt") as make_log:
|
|
# Create our dynamic Makefile and execute it.
|
|
# Watch stderr output which is where we will keep
|
|
# track of build state
|
|
for name, goal in self.goals.items():
|
|
tf.write(goal.text)
|
|
tf.write("all: %s\n" % (" ".join(self.goals.keys())))
|
|
tf.flush()
|
|
|
|
cmd = ["make", "-k", "-j", str(JOBS), "-f", tf.name, "all"]
|
|
|
|
# assure language neutral environemnt
|
|
make_env = os.environ.copy()
|
|
make_env['LC_MESSAGES'] = 'C.UTF-8'
|
|
p = subprocess.Popen(cmd, stderr=subprocess.PIPE,
|
|
stdout=devnull, env=make_env)
|
|
|
|
for line in iter(p.stderr.readline, b''):
|
|
line = line.decode("utf-8")
|
|
make_log.write(line)
|
|
verbose("MAKE: " + repr(line.strip()))
|
|
m = MakeGenerator.re_make.match(line)
|
|
if not m:
|
|
continue
|
|
|
|
state, name, _, error = m.groups()
|
|
if error:
|
|
goal = self.goals[error]
|
|
# Sometimes QEMU will run an image and then crash out, which
|
|
# will cause the 'make run' invocation to exit with
|
|
# nonzero status.
|
|
# Need to distinguish this case from a compilation failure.
|
|
if goal.handler:
|
|
goal.fail("handler_crash")
|
|
else:
|
|
goal.fail("build_error")
|
|
else:
|
|
goal = self.goals[name]
|
|
goal.make_state = state
|
|
|
|
if state == "finished":
|
|
if goal.handler:
|
|
if hasattr(goal.handler, "handle"):
|
|
goal.handler.handle()
|
|
goal.handler_log = goal.handler.log
|
|
|
|
thread_status, metrics = goal.handler.get_state()
|
|
goal.metrics.update(metrics)
|
|
if thread_status == "passed":
|
|
goal.success()
|
|
else:
|
|
goal.fail(thread_status)
|
|
else:
|
|
goal.success()
|
|
|
|
if callback_fn:
|
|
callback_fn(context, self.goals, goal)
|
|
|
|
p.wait()
|
|
return self.goals
|
|
|
|
|
|
# "list" - List of strings
|
|
# "list:<type>" - List of <type>
|
|
# "set" - Set of unordered, unique strings
|
|
# "set:<type>" - Set of <type>
|
|
# "float" - Floating point
|
|
# "int" - Integer
|
|
# "bool" - Boolean
|
|
# "str" - String
|
|
|
|
# XXX Be sure to update __doc__ if you change any of this!!
|
|
|
|
platform_valid_keys = {"qemu_support": {"type": "bool", "default": False},
|
|
"supported_toolchains": {"type": "list", "default": []},
|
|
"env": {"type": "list", "default": []}
|
|
}
|
|
|
|
testcase_valid_keys = {"tags": {"type": "set", "required": False},
|
|
"type": {"type": "str", "default": "integration"},
|
|
"extra_args": {"type": "list"},
|
|
"extra_configs": {"type": "list"},
|
|
"build_only": {"type": "bool", "default": False},
|
|
"build_on_all": {"type": "bool", "default": False},
|
|
"skip": {"type": "bool", "default": False},
|
|
"slow": {"type": "bool", "default": False},
|
|
"timeout": {"type": "int", "default": 60},
|
|
"min_ram": {"type": "int", "default": 8},
|
|
"depends_on": {"type": "set"},
|
|
"min_flash": {"type": "int", "default": 32},
|
|
"arch_whitelist": {"type": "set"},
|
|
"arch_exclude": {"type": "set"},
|
|
"extra_sections": {"type": "list", "default": []},
|
|
"platform_exclude": {"type": "set"},
|
|
"platform_whitelist": {"type": "set"},
|
|
"toolchain_exclude": {"type": "set"},
|
|
"toolchain_whitelist": {"type": "set"},
|
|
"filter": {"type": "str"},
|
|
"harness": {"type": "str"},
|
|
"harness_config": {"type": "map", "default": {}}
|
|
}
|
|
|
|
|
|
class SanityConfigParser:
|
|
"""Class to read test case files with semantic checking
|
|
"""
|
|
|
|
def __init__(self, filename, schema):
|
|
"""Instantiate a new SanityConfigParser object
|
|
|
|
@param filename Source .yaml file to read
|
|
"""
|
|
self.data = scl.yaml_load_verify(filename, schema)
|
|
self.filename = filename
|
|
self.tests = {}
|
|
self.common = {}
|
|
if 'tests' in self.data:
|
|
self.tests = self.data['tests']
|
|
if 'common' in self.data:
|
|
self.common = self.data['common']
|
|
|
|
def _cast_value(self, value, typestr):
|
|
if isinstance(value, str):
|
|
v = value.strip()
|
|
if typestr == "str":
|
|
return v
|
|
|
|
elif typestr == "float":
|
|
return float(value)
|
|
|
|
elif typestr == "int":
|
|
return int(value)
|
|
|
|
elif typestr == "bool":
|
|
return value
|
|
|
|
elif typestr.startswith("list") and isinstance(value, list):
|
|
return value
|
|
elif typestr.startswith("list") and isinstance(value, str):
|
|
vs = v.split()
|
|
if len(typestr) > 4 and typestr[4] == ":":
|
|
return [self._cast_value(vsi, typestr[5:]) for vsi in vs]
|
|
else:
|
|
return vs
|
|
|
|
elif typestr.startswith("set"):
|
|
vs = v.split()
|
|
if len(typestr) > 3 and typestr[3] == ":":
|
|
return set([self._cast_value(vsi, typestr[4:]) for vsi in vs])
|
|
else:
|
|
return set(vs)
|
|
|
|
elif typestr.startswith("map"):
|
|
return value
|
|
else:
|
|
raise ConfigurationError(
|
|
self.filename, "unknown type '%s'" % value)
|
|
|
|
def get_test(self, name, valid_keys):
|
|
"""Get a dictionary representing the keys/values within a test
|
|
|
|
@param name The test in the .yaml file to retrieve data from
|
|
@param valid_keys A dictionary representing the intended semantics
|
|
for this test. Each key in this dictionary is a key that could
|
|
be specified, if a key is given in the .yaml file which isn't in
|
|
here, it will generate an error. Each value in this dictionary
|
|
is another dictionary containing metadata:
|
|
|
|
"default" - Default value if not given
|
|
"type" - Data type to convert the text value to. Simple types
|
|
supported are "str", "float", "int", "bool" which will get
|
|
converted to respective Python data types. "set" and "list"
|
|
may also be specified which will split the value by
|
|
whitespace (but keep the elements as strings). finally,
|
|
"list:<type>" and "set:<type>" may be given which will
|
|
perform a type conversion after splitting the value up.
|
|
"required" - If true, raise an error if not defined. If false
|
|
and "default" isn't specified, a type conversion will be
|
|
done on an empty string
|
|
@return A dictionary containing the test key-value pairs with
|
|
type conversion and default values filled in per valid_keys
|
|
"""
|
|
|
|
d = {}
|
|
for k, v in self.common.items():
|
|
d[k] = v
|
|
|
|
for k, v in self.tests[name].items():
|
|
if k not in valid_keys:
|
|
raise ConfigurationError(
|
|
self.filename,
|
|
"Unknown config key '%s' in definition for '%s'" %
|
|
(k, name))
|
|
|
|
if k in d:
|
|
if isinstance(d[k], str):
|
|
d[k] += " " + v
|
|
else:
|
|
d[k] = v
|
|
|
|
for k, kinfo in valid_keys.items():
|
|
if k not in d:
|
|
if "required" in kinfo:
|
|
required = kinfo["required"]
|
|
else:
|
|
required = False
|
|
|
|
if required:
|
|
raise ConfigurationError(
|
|
self.filename,
|
|
"missing required value for '%s' in test '%s'" %
|
|
(k, name))
|
|
else:
|
|
if "default" in kinfo:
|
|
default = kinfo["default"]
|
|
else:
|
|
default = self._cast_value("", kinfo["type"])
|
|
d[k] = default
|
|
else:
|
|
try:
|
|
d[k] = self._cast_value(d[k], kinfo["type"])
|
|
except ValueError as ve:
|
|
raise ConfigurationError(
|
|
self.filename, "bad %s value '%s' for key '%s' in name '%s'" %
|
|
(kinfo["type"], d[k], k, name))
|
|
|
|
return d
|
|
|
|
|
|
class Platform:
|
|
"""Class representing metadata for a particular platform
|
|
|
|
Maps directly to BOARD when building"""
|
|
|
|
yaml_platform_schema = scl.yaml_load(
|
|
os.path.join(
|
|
ZEPHYR_BASE,
|
|
"scripts",
|
|
"sanity_chk",
|
|
"sanitycheck-platform-schema.yaml"))
|
|
|
|
def __init__(self, cfile):
|
|
"""Constructor.
|
|
|
|
@param cfile Path to platform configuration file, which gives
|
|
info about the platform to be added.
|
|
"""
|
|
scp = SanityConfigParser(cfile, self.yaml_platform_schema)
|
|
data = scp.data
|
|
|
|
self.name = data['identifier']
|
|
self.sanitycheck = data.get("sanitycheck", True)
|
|
# if no RAM size is specified by the board, take a default of 128K
|
|
self.ram = data.get("ram", 128)
|
|
testing = data.get("testing", {})
|
|
self.ignore_tags = testing.get("ignore_tags", [])
|
|
self.default = testing.get("default", False)
|
|
# if no flash size is specified by the board, take a default of 512K
|
|
self.flash = data.get("flash", 512)
|
|
self.supported = set()
|
|
for supp_feature in data.get("supported", []):
|
|
for item in supp_feature.split(":"):
|
|
self.supported.add(item)
|
|
|
|
self.qemu_support = True if data.get('simulation', "na") == 'qemu' else False
|
|
self.arch = data['arch']
|
|
self.type = data.get('type', "na")
|
|
self.simulation = data.get('simulation', "na")
|
|
self.supported_toolchains = data.get("toolchain", [])
|
|
self.env = data.get("env", [])
|
|
self.env_satisfied = True
|
|
for env in self.env:
|
|
if os.environ.get(env, None) == None:
|
|
self.env_satisfied = False
|
|
self.defconfig = None
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return "<%s on %s>" % (self.name, self.arch)
|
|
|
|
|
|
class Architecture:
|
|
"""Class representing metadata for a particular architecture
|
|
"""
|
|
|
|
def __init__(self, name, platforms):
|
|
"""Architecture constructor
|
|
|
|
@param name String name for this architecture
|
|
@param platforms list of platforms belonging to this architecture
|
|
"""
|
|
self.platforms = platforms
|
|
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return "<arch %s>" % self.name
|
|
|
|
|
|
class TestCase:
|
|
"""Class representing a test application
|
|
"""
|
|
|
|
def __init__(self, testcase_root, workdir, name, tc_dict, yamlfile):
|
|
"""TestCase constructor.
|
|
|
|
This gets called by TestSuite as it finds and reads test yaml files.
|
|
Multiple TestCase instances may be generated from a single testcase.yaml,
|
|
each one corresponds to an entry within that file.
|
|
|
|
We need to have a unique name for every single test case. Since
|
|
a testcase.yaml can define multiple tests, the canonical name for
|
|
the test case is <workdir>/<name>.
|
|
|
|
@param testcase_root Absolute path to the root directory where
|
|
all the test cases live
|
|
@param workdir Relative path to the project directory for this
|
|
test application from the test_case root.
|
|
@param name Name of this test case, corresponding to the entry name
|
|
in the test case configuration file. For many test cases that just
|
|
define one test, can be anything and is usually "test". This is
|
|
really only used to distinguish between different cases when
|
|
the testcase.yaml defines multiple tests
|
|
@param tc_dict Dictionary with test values for this test case
|
|
from the testcase.yaml file
|
|
"""
|
|
self.test_path = os.path.join(testcase_root, workdir)
|
|
|
|
self.id = name
|
|
self.cases = []
|
|
self.type = tc_dict["type"]
|
|
self.tags = tc_dict["tags"]
|
|
self.extra_args = tc_dict["extra_args"]
|
|
self.extra_configs = tc_dict["extra_configs"]
|
|
self.arch_whitelist = tc_dict["arch_whitelist"]
|
|
self.arch_exclude = tc_dict["arch_exclude"]
|
|
self.skip = tc_dict["skip"]
|
|
self.platform_exclude = tc_dict["platform_exclude"]
|
|
self.platform_whitelist = tc_dict["platform_whitelist"]
|
|
self.toolchain_exclude = tc_dict["toolchain_exclude"]
|
|
self.toolchain_whitelist = tc_dict["toolchain_whitelist"]
|
|
self.tc_filter = tc_dict["filter"]
|
|
self.timeout = tc_dict["timeout"]
|
|
self.harness = tc_dict["harness"]
|
|
self.harness_config = tc_dict["harness_config"]
|
|
self.build_only = tc_dict["build_only"]
|
|
self.build_on_all = tc_dict["build_on_all"]
|
|
self.slow = tc_dict["slow"]
|
|
self.min_ram = tc_dict["min_ram"]
|
|
self.depends_on = tc_dict["depends_on"]
|
|
self.min_flash = tc_dict["min_flash"]
|
|
self.extra_sections = tc_dict["extra_sections"]
|
|
|
|
self.name = self.get_unique(testcase_root, workdir, name)
|
|
|
|
self.defconfig = {}
|
|
self.dt_config = {}
|
|
self.yamlfile = yamlfile
|
|
|
|
|
|
def get_unique(self, testcase_root, workdir, name):
|
|
|
|
if ZEPHYR_BASE in testcase_root:
|
|
# This is a Zephyr Test, so include path in name for uniqueness
|
|
# FIXME: We should not depend on path of test for unique names.
|
|
|
|
zephyr_base = os.path.join(os.path.realpath(ZEPHYR_BASE))
|
|
short_path = os.path.normpath(testcase_root.replace(zephyr_base + "/", ""))
|
|
else:
|
|
short_path = ""
|
|
|
|
unique = os.path.normpath(os.path.join(short_path, workdir, name))
|
|
return unique
|
|
|
|
def scan_file(self, inf_name):
|
|
suite_regex = re.compile(
|
|
# do not match until end-of-line, otherwise we won't allow
|
|
# stc_regex below to catch the ones that are declared in the same
|
|
# line--as we only search starting the end of this match
|
|
br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,",
|
|
re.MULTILINE)
|
|
stc_regex = re.compile(
|
|
br"^\s*" # empy space at the beginning is ok
|
|
# catch the case where it is declared in the same sentence, e.g:
|
|
#
|
|
# ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME));
|
|
br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?"
|
|
# Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME)
|
|
br"ztest_(?:user_)?unit_test(?:_setup_teardown)?"
|
|
# Consume the argument that becomes the extra testcse
|
|
br"\(\s*"
|
|
br"(?P<stc_name>[a-zA-Z0-9_]+)"
|
|
# _setup_teardown() variant has two extra arguments that we ignore
|
|
br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?"
|
|
br"\s*\)",
|
|
# We don't check how it finishes; we don't care
|
|
re.MULTILINE)
|
|
suite_run_regex = re.compile(
|
|
br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)",
|
|
re.MULTILINE)
|
|
achtung_regex = re.compile(
|
|
br"(#ifdef|#endif)",
|
|
re.MULTILINE)
|
|
warnings = None
|
|
|
|
with open(inf_name) as inf:
|
|
with contextlib.closing(mmap.mmap(inf.fileno(), 0, mmap.MAP_PRIVATE,
|
|
mmap.PROT_READ, 0)) as main_c:
|
|
suite_regex_match = suite_regex.search(main_c)
|
|
if not suite_regex_match:
|
|
# can't find ztest_test_suite, maybe a client, because
|
|
# it includes ztest.h
|
|
return None, None
|
|
|
|
suite_run_match = suite_run_regex.search(main_c)
|
|
if not suite_run_match:
|
|
raise ValueError("can't find ztest_run_test_suite")
|
|
|
|
achtung_matches = re.findall(
|
|
achtung_regex,
|
|
main_c[suite_regex_match.end():suite_run_match.start()])
|
|
if achtung_matches:
|
|
warnings = "found invalid %s in ztest_test_suite()" \
|
|
% ", ".join(set([
|
|
match.decode() for match in achtung_matches
|
|
]))
|
|
_matches = re.findall(
|
|
stc_regex,
|
|
main_c[suite_regex_match.end():suite_run_match.start()])
|
|
matches = [ match.decode().replace("test_", "") for match in _matches ]
|
|
return matches, warnings
|
|
|
|
def scan_path(self, path):
|
|
subcases = []
|
|
for filename in glob.glob(os.path.join(path, "src", "*.c")):
|
|
try:
|
|
_subcases, warnings = self.scan_file(filename)
|
|
if warnings:
|
|
error("%s: %s" % (filename, warnings))
|
|
if _subcases:
|
|
subcases += _subcases
|
|
except ValueError as e:
|
|
error("%s: can't find: %s", filename, e)
|
|
return subcases
|
|
|
|
|
|
def parse_subcases(self):
|
|
results = self.scan_path(self.test_path)
|
|
for sub in results:
|
|
name = "{}.{}".format(self.id, sub)
|
|
self.cases.append(name)
|
|
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class TestInstance:
|
|
"""Class representing the execution of a particular TestCase on a platform
|
|
|
|
@param test The TestCase object we want to build/execute
|
|
@param platform Platform object that we want to build and run against
|
|
@param base_outdir Base directory for all test results. The actual
|
|
out directory used is <outdir>/<platform>/<test case name>
|
|
"""
|
|
|
|
def __init__(self, test, platform, base_outdir):
|
|
self.test = test
|
|
self.platform = platform
|
|
self.name = os.path.join(platform.name, test.name)
|
|
self.outdir = os.path.join(base_outdir, platform.name, test.name)
|
|
|
|
self.build_only = options.build_only or test.build_only \
|
|
or self.check_dependency()
|
|
self.results = {}
|
|
|
|
def check_dependency(self):
|
|
build_only = False
|
|
if self.test.harness == 'console':
|
|
if "fixture" in self.test.harness_config:
|
|
fixture = self.test.harness_config['fixture']
|
|
if fixture not in options.fixture:
|
|
build_only = True
|
|
elif self.test.harness:
|
|
build_only = True
|
|
|
|
return build_only
|
|
|
|
def create_overlay(self, platform):
|
|
file = os.path.join(self.outdir, "overlay.conf")
|
|
os.makedirs(self.outdir, exist_ok=True)
|
|
f = open(file, "w")
|
|
content = ""
|
|
|
|
if len(self.test.extra_configs) > 0:
|
|
content = "\n".join(self.test.extra_configs)
|
|
if options.enable_coverage:
|
|
if platform in options.coverage_platform:
|
|
content = content + "\nCONFIG_COVERAGE=y"
|
|
|
|
f.write(content)
|
|
f.close()
|
|
|
|
def calculate_sizes(self):
|
|
"""Get the RAM/ROM sizes of a test case.
|
|
|
|
This can only be run after the instance has been executed by
|
|
MakeGenerator, otherwise there won't be any binaries to measure.
|
|
|
|
@return A SizeCalculator object
|
|
"""
|
|
fns = glob.glob(os.path.join(self.outdir, "zephyr", "*.elf"))
|
|
fns.extend(glob.glob(os.path.join(self.outdir, "zephyr", "*.exe")))
|
|
fns = [x for x in fns if not x.endswith('_prebuilt.elf')]
|
|
if (len(fns) != 1):
|
|
raise BuildError("Missing/multiple output ELF binary")
|
|
return SizeCalculator(fns[0], self.test.extra_sections)
|
|
|
|
def __repr__(self):
|
|
return "<TestCase %s on %s>" % (self.test.name, self.platform.name)
|
|
|
|
|
|
def defconfig_cb(context, goals, goal):
|
|
if not goal.failed:
|
|
return
|
|
|
|
info("%sCould not build defconfig for %s%s" %
|
|
(COLOR_RED, goal.name, COLOR_NORMAL))
|
|
if INLINE_LOGS:
|
|
with open(goal.get_error_log()) as fp:
|
|
data = fp.read()
|
|
sys.stdout.write(data)
|
|
if log_file:
|
|
log_file.write(data)
|
|
else:
|
|
info("\tsee: " + COLOR_YELLOW + goal.get_error_log() + COLOR_NORMAL)
|
|
|
|
|
|
class TestSuite:
|
|
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
|
dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
|
|
|
|
yaml_tc_schema = scl.yaml_load(
|
|
os.path.join(ZEPHYR_BASE,
|
|
"scripts", "sanity_chk", "sanitycheck-tc-schema.yaml"))
|
|
|
|
def __init__(self, board_root_list, testcase_roots, outdir):
|
|
# Keep track of which test cases we've filtered out and why
|
|
self.arches = {}
|
|
self.testcases = {}
|
|
self.platforms = []
|
|
self.outdir = os.path.abspath(outdir)
|
|
self.instances = {}
|
|
self.goals = None
|
|
self.discards = None
|
|
self.load_errors = 0
|
|
|
|
for testcase_root in testcase_roots:
|
|
testcase_root = os.path.abspath(testcase_root)
|
|
|
|
debug("Reading test case configuration files under %s..." %
|
|
testcase_root)
|
|
for dirpath, dirnames, filenames in os.walk(testcase_root,
|
|
topdown=True):
|
|
verbose("scanning %s" % dirpath)
|
|
if 'sample.yaml' in filenames:
|
|
filename = 'sample.yaml'
|
|
elif 'testcase.yaml' in filenames:
|
|
filename = 'testcase.yaml'
|
|
else:
|
|
continue
|
|
|
|
verbose("Found possible test case in " + dirpath)
|
|
dirnames[:] = []
|
|
yaml_path = os.path.join(dirpath, filename)
|
|
try:
|
|
parsed_data = SanityConfigParser(
|
|
yaml_path, self.yaml_tc_schema)
|
|
|
|
workdir = os.path.relpath(dirpath, testcase_root)
|
|
|
|
for name in parsed_data.tests.keys():
|
|
tc_dict = parsed_data.get_test(name, testcase_valid_keys)
|
|
tc = TestCase(testcase_root, workdir, name, tc_dict,
|
|
yaml_path)
|
|
tc.parse_subcases()
|
|
self.testcases[tc.name] = tc
|
|
|
|
except Exception as e:
|
|
error("E: %s: can't load (skipping): %s" % (yaml_path, e))
|
|
self.load_errors += 1
|
|
|
|
|
|
for board_root in board_root_list:
|
|
board_root = os.path.abspath(board_root)
|
|
|
|
debug(
|
|
"Reading platform configuration files under %s..." %
|
|
board_root)
|
|
for fn in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")):
|
|
verbose("Found plaform configuration " + fn)
|
|
try:
|
|
platform = Platform(fn)
|
|
if platform.sanitycheck:
|
|
self.platforms.append(platform)
|
|
except RuntimeError as e:
|
|
error("E: %s: can't load: %s" % (fn, e))
|
|
self.load_errors += 1
|
|
|
|
arches = []
|
|
for p in self.platforms:
|
|
arches.append(p.arch)
|
|
for a in list(set(arches)):
|
|
aplatforms = [p for p in self.platforms if p.arch == a]
|
|
arch = Architecture(a, aplatforms)
|
|
self.arches[a] = arch
|
|
|
|
self.instances = {}
|
|
|
|
def get_last_failed(self):
|
|
|
|
try:
|
|
if not os.path.exists(LAST_SANITY):
|
|
raise SanityRuntimeError("Couldn't find last sanity run.")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
result = []
|
|
with open(LAST_SANITY, "r") as fp:
|
|
cr = csv.DictReader(fp)
|
|
for row in cr:
|
|
if row["passed"] == "True":
|
|
continue
|
|
test = row["test"]
|
|
platform = row["platform"]
|
|
result.append((test, platform))
|
|
return result
|
|
|
|
def load_from_file(self, file):
|
|
try:
|
|
if not os.path.exists(file):
|
|
raise SanityRuntimeError(
|
|
"Couldn't find input file with list of tests.")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
with open(file, "r") as fp:
|
|
cr = csv.reader(fp)
|
|
instance_list = []
|
|
for row in cr:
|
|
name = os.path.join(row[0], row[1])
|
|
platforms = self.arches[row[3]].platforms
|
|
myp = None
|
|
for platform in platforms:
|
|
if platform.name == row[2]:
|
|
selected_platform = platform
|
|
break
|
|
instance = TestInstance(self.testcases[name], selected_platform, self.outdir)
|
|
instance.create_overlay(selected_platform.name)
|
|
instance_list.append(instance)
|
|
self.add_instances(instance_list)
|
|
|
|
def apply_filters(self):
|
|
|
|
toolchain = os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT", None) or \
|
|
os.environ.get("ZEPHYR_GCC_VARIANT", None)
|
|
|
|
if toolchain == "gccarmemb":
|
|
# Remove this translation when gccarmemb is no longer supported.
|
|
toolchain = "gnuarmemb"
|
|
|
|
try:
|
|
if not toolchain:
|
|
raise SanityRuntimeError("E: Variable ZEPHYR_TOOLCHAIN_VARIANT is not defined")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
|
|
instances = []
|
|
discards = {}
|
|
platform_filter = options.platform
|
|
last_failed = options.only_failed
|
|
testcase_filter = run_individual_tests
|
|
arch_filter = options.arch
|
|
tag_filter = options.tag
|
|
exclude_tag = options.exclude_tag
|
|
config_filter = options.config
|
|
extra_args = options.extra_args
|
|
all_plats = options.all
|
|
|
|
verbose("platform filter: " + str(platform_filter))
|
|
verbose(" arch_filter: " + str(arch_filter))
|
|
verbose(" tag_filter: " + str(tag_filter))
|
|
verbose(" exclude_tag: " + str(exclude_tag))
|
|
verbose(" config_filter: " + str(config_filter))
|
|
|
|
if last_failed:
|
|
failed_tests = self.get_last_failed()
|
|
|
|
default_platforms = False
|
|
|
|
if all_plats:
|
|
info("Selecting all possible platforms per test case")
|
|
# When --all used, any --platform arguments ignored
|
|
platform_filter = []
|
|
elif not platform_filter:
|
|
info("Selecting default platforms per test case")
|
|
default_platforms = True
|
|
|
|
mg = MakeGenerator(self.outdir)
|
|
defconfig_list = {}
|
|
dt_list = {}
|
|
for tc_name, tc in self.testcases.items():
|
|
for arch_name, arch in self.arches.items():
|
|
for plat in arch.platforms:
|
|
instance = TestInstance(tc, plat, self.outdir)
|
|
|
|
if (arch_name == "unit") != (tc.type == "unit"):
|
|
continue
|
|
|
|
if tc.build_on_all and not platform_filter:
|
|
platform_filter = []
|
|
|
|
if tc.skip:
|
|
continue
|
|
|
|
if tag_filter and not tc.tags.intersection(tag_filter):
|
|
continue
|
|
|
|
if exclude_tag and tc.tags.intersection(exclude_tag):
|
|
continue
|
|
|
|
if testcase_filter and tc_name not in testcase_filter:
|
|
continue
|
|
|
|
if last_failed and (
|
|
tc.name, plat.name) not in failed_tests:
|
|
continue
|
|
|
|
if arch_filter and arch_name not in arch_filter:
|
|
continue
|
|
|
|
if tc.arch_whitelist and arch.name not in tc.arch_whitelist:
|
|
continue
|
|
|
|
if tc.arch_exclude and arch.name in tc.arch_exclude:
|
|
continue
|
|
|
|
if tc.platform_exclude and plat.name in tc.platform_exclude:
|
|
continue
|
|
|
|
if tc.toolchain_exclude and toolchain in tc.toolchain_exclude:
|
|
continue
|
|
|
|
if platform_filter and plat.name not in platform_filter:
|
|
continue
|
|
|
|
if plat.ram < tc.min_ram:
|
|
continue
|
|
|
|
if set(plat.ignore_tags) & tc.tags:
|
|
continue
|
|
|
|
if tc.depends_on:
|
|
dep_intersection = tc.depends_on.intersection(
|
|
set(plat.supported))
|
|
if dep_intersection != set(tc.depends_on):
|
|
continue
|
|
|
|
if plat.flash < tc.min_flash:
|
|
continue
|
|
|
|
if tc.platform_whitelist and plat.name not in tc.platform_whitelist:
|
|
continue
|
|
|
|
if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist:
|
|
continue
|
|
|
|
if (plat.env_satisfied and tc.tc_filter
|
|
and (plat.default or all_plats or platform_filter)
|
|
and (toolchain in plat.supported_toolchains or options.force_toolchain)):
|
|
args = tc.extra_args[:]
|
|
args.append("BOARD={}".format(plat.name))
|
|
args.extend(extra_args)
|
|
# FIXME would be nice to use a common outdir for this so that
|
|
# conf, gen_idt, etc aren't rebuilt for every combination,
|
|
# need a way to avoid different Make processes from clobbering
|
|
# each other since they all try to build them
|
|
# simultaneously
|
|
|
|
o = os.path.join(self.outdir, plat.name, tc.name)
|
|
generated_dt_confg = "include/generated/generated_dts_board.conf"
|
|
dt_config_path = os.path.join(o, "zephyr", generated_dt_confg)
|
|
dt_list[tc, plat, tc.name.split("/")[-1]] = dt_config_path
|
|
defconfig_list[tc, plat, tc.name.split("/")[-1]] = os.path.join(o, "zephyr", ".config")
|
|
goal = "_".join([plat.name, "_".join(tc.name.split("/")), "config-sanitycheck"])
|
|
mg.add_build_goal(goal, os.path.join(ZEPHYR_BASE, tc.test_path),
|
|
o, args, "config-sanitycheck.log", make_args="config-sanitycheck")
|
|
|
|
info("Building testcase defconfigs...")
|
|
results = mg.execute(defconfig_cb)
|
|
|
|
for name, goal in results.items():
|
|
try:
|
|
if goal.failed:
|
|
raise SanityRuntimeError("Couldn't build some defconfigs")
|
|
except Exception as e:
|
|
error(str(e))
|
|
sys.exit(2)
|
|
|
|
|
|
for k, out_config in defconfig_list.items():
|
|
test, plat, name = k
|
|
defconfig = {}
|
|
with open(out_config, "r") as fp:
|
|
for line in fp.readlines():
|
|
m = TestSuite.config_re.match(line)
|
|
if not m:
|
|
if line.strip() and not line.startswith("#"):
|
|
sys.stderr.write("Unrecognized line %s\n" % line)
|
|
continue
|
|
defconfig[m.group(1)] = m.group(2).strip()
|
|
test.defconfig[plat] = defconfig
|
|
|
|
for k, out_config in dt_list.items():
|
|
if not os.path.exists(out_config):
|
|
continue
|
|
|
|
test, plat, name = k
|
|
dt_conf = {}
|
|
with open(out_config, "r") as fp:
|
|
for line in fp.readlines():
|
|
m = TestSuite.dt_re.match(line)
|
|
if not m:
|
|
if line.strip() and not line.startswith("#"):
|
|
sys.stderr.write("Unrecognized line %s\n" % line)
|
|
continue
|
|
dt_conf[m.group(1)] = m.group(2).strip()
|
|
test.dt_config[plat] = dt_conf
|
|
|
|
for tc_name, tc in self.testcases.items():
|
|
for arch_name, arch in self.arches.items():
|
|
instance_list = []
|
|
for plat in arch.platforms:
|
|
instance = TestInstance(tc, plat, self.outdir)
|
|
|
|
if (arch_name == "unit") != (tc.type == "unit"):
|
|
# Discard silently
|
|
continue
|
|
|
|
if tc.skip:
|
|
discards[instance] = "Skip filter"
|
|
continue
|
|
|
|
if tc.build_on_all and not platform_filter:
|
|
platform_filter = []
|
|
|
|
if tag_filter and not tc.tags.intersection(tag_filter):
|
|
discards[instance] = "Command line testcase tag filter"
|
|
continue
|
|
|
|
if exclude_tag and tc.tags.intersection(exclude_tag):
|
|
discards[instance] = "Command line testcase exclude filter"
|
|
continue
|
|
|
|
if testcase_filter and tc_name not in testcase_filter:
|
|
discards[instance] = "Testcase name filter"
|
|
continue
|
|
|
|
if last_failed and (
|
|
tc.name, plat.name) not in failed_tests:
|
|
discards[instance] = "Passed or skipped during last run"
|
|
continue
|
|
|
|
if arch_filter and arch_name not in arch_filter:
|
|
discards[instance] = "Command line testcase arch filter"
|
|
continue
|
|
|
|
if tc.arch_whitelist and arch.name not in tc.arch_whitelist:
|
|
discards[instance] = "Not in test case arch whitelist"
|
|
continue
|
|
|
|
if tc.arch_exclude and arch.name in tc.arch_exclude:
|
|
discards[instance] = "In test case arch exclude"
|
|
continue
|
|
|
|
if tc.platform_exclude and plat.name in tc.platform_exclude:
|
|
discards[instance] = "In test case platform exclude"
|
|
continue
|
|
|
|
if tc.toolchain_exclude and toolchain in tc.toolchain_exclude:
|
|
discards[instance] = "In test case toolchain exclude"
|
|
continue
|
|
|
|
if platform_filter and plat.name not in platform_filter:
|
|
discards[instance] = "Command line platform filter"
|
|
continue
|
|
|
|
if tc.platform_whitelist and plat.name not in tc.platform_whitelist:
|
|
discards[instance] = "Not in testcase platform whitelist"
|
|
continue
|
|
|
|
if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist:
|
|
discards[instance] = "Not in testcase toolchain whitelist"
|
|
continue
|
|
|
|
if not plat.env_satisfied:
|
|
discards[instance] = "Environment ({}) not satisfied".format(", ".join(plat.env))
|
|
continue
|
|
|
|
if not options.force_toolchain \
|
|
and toolchain and (toolchain not in plat.supported_toolchains) \
|
|
and tc.type != 'unit':
|
|
discards[instance] = "Not supported by the toolchain"
|
|
continue
|
|
|
|
if plat.ram < tc.min_ram:
|
|
discards[instance] = "Not enough RAM"
|
|
continue
|
|
|
|
if tc.depends_on:
|
|
dep_intersection = tc.depends_on.intersection(set(plat.supported))
|
|
if dep_intersection != set(tc.depends_on):
|
|
discards[instance] = "No hardware support"
|
|
continue
|
|
|
|
if plat.flash < tc.min_flash:
|
|
discards[instance] = "Not enough FLASH"
|
|
continue
|
|
|
|
if set(plat.ignore_tags) & tc.tags:
|
|
discards[instance] = "Excluded tags per platform"
|
|
continue
|
|
|
|
defconfig = {
|
|
"ARCH": arch.name,
|
|
"PLATFORM": plat.name
|
|
}
|
|
defconfig.update(os.environ)
|
|
for p, tdefconfig in tc.defconfig.items():
|
|
if p == plat:
|
|
defconfig.update(tdefconfig)
|
|
break
|
|
|
|
for p, tdefconfig in tc.dt_config.items():
|
|
if p == plat:
|
|
defconfig.update(tdefconfig)
|
|
break
|
|
|
|
if tc.tc_filter:
|
|
try:
|
|
res = expr_parser.parse(tc.tc_filter, defconfig)
|
|
except (ValueError, SyntaxError) as se:
|
|
sys.stderr.write(
|
|
"Failed processing %s\n" % tc.yamlfile)
|
|
raise se
|
|
if not res:
|
|
discards[instance] = (
|
|
"defconfig doesn't satisfy expression '%s'" %
|
|
tc.tc_filter)
|
|
continue
|
|
|
|
instance_list.append(instance)
|
|
|
|
if not instance_list:
|
|
# Every platform in this arch was rejected already
|
|
continue
|
|
|
|
if default_platforms and not tc.build_on_all:
|
|
if not tc.platform_whitelist:
|
|
instances = list(
|
|
filter(
|
|
lambda tc: tc.platform.default,
|
|
instance_list))
|
|
self.add_instances(instances)
|
|
else:
|
|
self.add_instances(instance_list[:1])
|
|
|
|
for instance in list(
|
|
filter(lambda tc: not tc.platform.default, instance_list)):
|
|
discards[instance] = "Not a default test platform"
|
|
else:
|
|
self.add_instances(instance_list)
|
|
|
|
for name, case in self.instances.items():
|
|
case.create_overlay(case.platform.name)
|
|
|
|
self.discards = discards
|
|
return discards
|
|
|
|
def add_instances(self, ti_list):
|
|
for ti in ti_list:
|
|
self.instances[ti.name] = ti
|
|
|
|
def execute(self, cb, cb_context):
|
|
|
|
def calc_one_elf_size(name, goal):
|
|
if not goal.failed:
|
|
if self.instances[name].platform.type != "native":
|
|
i = self.instances[name]
|
|
sc = i.calculate_sizes()
|
|
goal.metrics["ram_size"] = sc.get_ram_size()
|
|
goal.metrics["rom_size"] = sc.get_rom_size()
|
|
goal.metrics["unrecognized"] = sc.unrecognized_sections()
|
|
else:
|
|
goal.metrics["ram_size"] = 0
|
|
goal.metrics["rom_size"] = 0
|
|
goal.metrics["unrecognized"] = []
|
|
|
|
mg = MakeGenerator(self.outdir)
|
|
for i in self.instances.values():
|
|
mg.add_test_instance(i, options.extra_args)
|
|
self.goals = mg.execute(cb, cb_context)
|
|
|
|
if not options.disable_size_report:
|
|
# Parallelize size calculation
|
|
executor = concurrent.futures.ThreadPoolExecutor(JOBS)
|
|
futures = [executor.submit(calc_one_elf_size, name, goal)
|
|
for name, goal in self.goals.items()]
|
|
concurrent.futures.wait(futures)
|
|
else:
|
|
for goal in self.goals.values():
|
|
goal.metrics["ram_size"] = 0
|
|
goal.metrics["rom_size"] = 0
|
|
goal.metrics["unrecognized"] = []
|
|
|
|
return self.goals
|
|
|
|
def run_report(self, filename):
|
|
with open(filename, "at") as csvfile:
|
|
fieldnames = ['path', 'test', 'platform', 'arch']
|
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
|
|
for instance in self.instances.values():
|
|
rowdict = {
|
|
"path": os.path.dirname(instance.test.name),
|
|
"test": os.path.basename(instance.test.name),
|
|
"platform": instance.platform.name,
|
|
"arch": instance.platform.arch
|
|
}
|
|
cw.writerow(rowdict)
|
|
|
|
def discard_report(self, filename):
|
|
|
|
try:
|
|
if self.discards is None:
|
|
raise SanityRuntimeError("apply_filters() hasn't been run!")
|
|
except Exception as e:
|
|
error(str(e))
|
|
sys.exit(2)
|
|
|
|
with open(filename, "wt") as csvfile:
|
|
fieldnames = ["test", "arch", "platform", "reason"]
|
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
|
|
cw.writeheader()
|
|
for instance, reason in self.discards.items():
|
|
rowdict = {"test": instance.test.name,
|
|
"arch": instance.platform.arch,
|
|
"platform": instance.platform.name,
|
|
"reason": reason}
|
|
cw.writerow(rowdict)
|
|
|
|
def compare_metrics(self, filename):
|
|
# name, datatype, lower results better
|
|
interesting_metrics = [("ram_size", int, True),
|
|
("rom_size", int, True)]
|
|
|
|
try:
|
|
if self.goals is None:
|
|
raise SanityRuntimeError("execute() hasn't been run!")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
if not os.path.exists(filename):
|
|
info("Cannot compare metrics, %s not found" % filename)
|
|
return []
|
|
|
|
results = []
|
|
saved_metrics = {}
|
|
with open(filename) as fp:
|
|
cr = csv.DictReader(fp)
|
|
for row in cr:
|
|
d = {}
|
|
for m, _, _ in interesting_metrics:
|
|
d[m] = row[m]
|
|
saved_metrics[(row["test"], row["platform"])] = d
|
|
|
|
for name, goal in self.goals.items():
|
|
i = self.instances[name]
|
|
mkey = (i.test.name, i.platform.name)
|
|
if mkey not in saved_metrics:
|
|
continue
|
|
sm = saved_metrics[mkey]
|
|
for metric, mtype, lower_better in interesting_metrics:
|
|
if metric not in goal.metrics:
|
|
continue
|
|
if sm[metric] == "":
|
|
continue
|
|
delta = goal.metrics[metric] - mtype(sm[metric])
|
|
if delta == 0:
|
|
continue
|
|
results.append((i, metric, goal.metrics[metric], delta,
|
|
lower_better))
|
|
return results
|
|
|
|
|
|
|
|
def encode_for_xml(self, unicode_data, encoding='ascii'):
|
|
unicode_data = unicode_data.replace('\x00', '')
|
|
return unicode_data
|
|
|
|
def testcase_target_report(self, report_file):
|
|
|
|
run = "Sanitycheck"
|
|
eleTestsuite = None
|
|
append = options.only_failed
|
|
|
|
errors = 0
|
|
passes = 0
|
|
fails = 0
|
|
duration = 0
|
|
skips = 0
|
|
|
|
for identifier, ti in self.instances.items():
|
|
for k in ti.results.keys():
|
|
if ti.results[k] == 'PASS':
|
|
passes += 1
|
|
elif ti.results[k] == 'BLOCK':
|
|
errors += 1
|
|
elif ti.results[k] == 'SKIP':
|
|
skips += 1
|
|
else:
|
|
fails += 1
|
|
|
|
eleTestsuites = ET.Element('testsuites')
|
|
eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
|
|
name=run, time="%d" % duration,
|
|
tests="%d" % (errors + passes + fails),
|
|
failures="%d" % fails,
|
|
errors="%d" % errors, skipped="%d" %skips)
|
|
|
|
handler_time = "0"
|
|
|
|
# print out test results
|
|
for identifier, ti in self.instances.items():
|
|
for k in ti.results.keys():
|
|
|
|
eleTestcase = ET.SubElement(
|
|
eleTestsuite, 'testcase', classname="%s:%s" %(ti.platform.name, os.path.basename(ti.test.name)),
|
|
name="%s" % (k), time=handler_time)
|
|
if ti.results[k] in ['FAIL', 'BLOCK']:
|
|
el = None
|
|
|
|
if ti.results[k] == 'FAIL':
|
|
el = ET.SubElement(
|
|
eleTestcase,
|
|
'failure',
|
|
type="failure",
|
|
message="failed")
|
|
elif ti.results[k] == 'BLOCK':
|
|
el = ET.SubElement(
|
|
eleTestcase,
|
|
'error',
|
|
type="failure",
|
|
message="failed")
|
|
p = os.path.join(options.outdir, ti.platform.name, ti.test.name)
|
|
bl = os.path.join(p, "handler.log")
|
|
|
|
if os.path.exists(bl):
|
|
with open(bl, "rb") as f:
|
|
log = f.read().decode("utf-8")
|
|
el.text = self.encode_for_xml(log)
|
|
|
|
elif ti.results[k] == 'SKIP':
|
|
el = ET.SubElement(
|
|
eleTestcase,
|
|
'skipped',
|
|
type="skipped",
|
|
message="Skipped")
|
|
|
|
result = ET.tostring(eleTestsuites)
|
|
f = open(report_file, 'wb')
|
|
f.write(result)
|
|
f.close()
|
|
|
|
|
|
def testcase_xunit_report(self, filename, duration):
|
|
try:
|
|
if self.goals is None:
|
|
raise SanityRuntimeError("execute() hasn't been run!")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
fails = 0
|
|
passes = 0
|
|
errors = 0
|
|
|
|
for name, goal in self.goals.items():
|
|
if goal.failed:
|
|
if goal.reason in ['build_error', 'handler_crash']:
|
|
errors += 1
|
|
else:
|
|
fails += 1
|
|
else:
|
|
passes += 1
|
|
|
|
run = "Sanitycheck"
|
|
eleTestsuite = None
|
|
append = options.only_failed
|
|
|
|
if os.path.exists(filename) and append:
|
|
tree = ET.parse(filename)
|
|
eleTestsuites = tree.getroot()
|
|
eleTestsuite = tree.findall('testsuite')[0]
|
|
else:
|
|
eleTestsuites = ET.Element('testsuites')
|
|
eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite',
|
|
name=run, time="%d" % duration,
|
|
tests="%d" % (errors + passes + fails),
|
|
failures="%d" % fails,
|
|
errors="%d" % errors, skip="0")
|
|
|
|
handler_time = "0"
|
|
for name, goal in self.goals.items():
|
|
|
|
i = self.instances[name]
|
|
if append:
|
|
for tc in eleTestsuite.findall('testcase'):
|
|
if tc.get('classname') == "%s:%s" % (
|
|
i.platform.name, i.test.name):
|
|
eleTestsuite.remove(tc)
|
|
|
|
if not goal.failed and goal.handler:
|
|
handler_time = "%s" %(goal.metrics["handler_time"])
|
|
|
|
eleTestcase = ET.SubElement(
|
|
eleTestsuite, 'testcase', classname="%s:%s" %
|
|
(i.platform.name, i.test.name), name="%s" %
|
|
(name), time=handler_time)
|
|
if goal.failed:
|
|
failure = ET.SubElement(
|
|
eleTestcase,
|
|
'failure',
|
|
type="failure",
|
|
message=goal.reason)
|
|
p = ("%s/%s/%s" % (options.outdir, i.platform.name, i.test.name))
|
|
bl = os.path.join(p, "build.log")
|
|
if goal.reason != 'build_error':
|
|
bl = os.path.join(p, "handler.log")
|
|
|
|
if os.path.exists(bl):
|
|
with open(bl, "rb") as f:
|
|
log = f.read().decode("utf-8")
|
|
filtered_string = ''.join(filter(lambda x: x in string.printable, log))
|
|
failure.text = filtered_string
|
|
f.close()
|
|
|
|
result = ET.tostring(eleTestsuites)
|
|
f = open(filename, 'wb')
|
|
f.write(result)
|
|
f.close()
|
|
|
|
def testcase_report(self, filename):
|
|
try:
|
|
if self.goals is None:
|
|
raise SanityRuntimeError("execute() hasn't been run!")
|
|
except Exception as e:
|
|
print(str(e))
|
|
sys.exit(2)
|
|
|
|
with open(filename, "wt") as csvfile:
|
|
fieldnames = ["test", "arch", "platform", "passed", "status",
|
|
"extra_args", "qemu", "handler_time", "ram_size",
|
|
"rom_size"]
|
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
|
|
cw.writeheader()
|
|
for name, goal in self.goals.items():
|
|
i = self.instances[name]
|
|
rowdict = {"test": i.test.name,
|
|
"arch": i.platform.arch,
|
|
"platform": i.platform.name,
|
|
"extra_args": " ".join(i.test.extra_args),
|
|
"qemu": i.platform.qemu_support}
|
|
if goal.failed:
|
|
rowdict["passed"] = False
|
|
rowdict["status"] = goal.reason
|
|
else:
|
|
rowdict["passed"] = True
|
|
if goal.handler:
|
|
rowdict["handler_time"] = goal.metrics["handler_time"]
|
|
rowdict["ram_size"] = goal.metrics["ram_size"]
|
|
rowdict["rom_size"] = goal.metrics["rom_size"]
|
|
cw.writerow(rowdict)
|
|
|
|
|
|
def parse_arguments():
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.fromfile_prefix_chars = "+"
|
|
|
|
parser.add_argument("--force-toolchain", action="store_true",
|
|
help="Do not filter based on toolchain, use the set "
|
|
" toolchain unconditionally")
|
|
parser.add_argument(
|
|
"-p", "--platform", action="append",
|
|
help="Platform filter for testing. This option may be used multiple "
|
|
"times. Testcases will only be built/run on the platforms "
|
|
"specified. If this option is not used, then platforms marked "
|
|
"as default in the platform metadata file will be chosen "
|
|
"to build and test. ")
|
|
parser.add_argument(
|
|
"-a", "--arch", action="append",
|
|
help="Arch filter for testing. Takes precedence over --platform. "
|
|
"If unspecified, test all arches. Multiple invocations "
|
|
"are treated as a logical 'or' relationship")
|
|
parser.add_argument(
|
|
"-t", "--tag", action="append",
|
|
help="Specify tags to restrict which tests to run by tag value. "
|
|
"Default is to not do any tag filtering. Multiple invocations "
|
|
"are treated as a logical 'or' relationship")
|
|
parser.add_argument("-e", "--exclude-tag", action="append",
|
|
help="Specify tags of tests that should not run. "
|
|
"Default is to run all tests with all tags.")
|
|
parser.add_argument(
|
|
"-f",
|
|
"--only-failed",
|
|
action="store_true",
|
|
help="Run only those tests that failed the previous sanity check "
|
|
"invocation.")
|
|
parser.add_argument(
|
|
"-c", "--config", action="append",
|
|
help="Specify platform configuration values filtering. This can be "
|
|
"specified two ways: <config>=<value> or just <config>. The "
|
|
"defconfig for all platforms will be "
|
|
"checked. For the <config>=<value> case, only match defconfig "
|
|
"that have that value defined. For the <config> case, match "
|
|
"defconfig that have that value assigned to any value. "
|
|
"Prepend a '!' to invert the match.")
|
|
|
|
parser.add_argument(
|
|
"-s", "--test", action="append",
|
|
help="Run only the specified test cases. These are named by "
|
|
"<path to test project relative to "
|
|
"--testcase-root>/<testcase.yaml section name>")
|
|
|
|
parser.add_argument(
|
|
"--sub-test", action="append",
|
|
help="Run only the specified sub-test cases and its parent. These are named by "
|
|
"test case name appended by test function, i.e. kernel.mutex.mutex_lock_unlock."
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-l", "--all", action="store_true",
|
|
help="Build/test on all platforms. Any --platform arguments "
|
|
"ignored.")
|
|
|
|
parser.add_argument(
|
|
"-o", "--testcase-report",
|
|
help="Output a CSV spreadsheet containing results of the test run")
|
|
parser.add_argument(
|
|
"-d", "--discard-report",
|
|
help="Output a CSV spreadsheet showing tests that were skipped "
|
|
"and why")
|
|
parser.add_argument("--compare-report",
|
|
help="Use this report file for size comparison")
|
|
|
|
parser.add_argument(
|
|
"-B", "--subset",
|
|
help="Only run a subset of the tests, 1/4 for running the first 25%%, "
|
|
"3/5 means run the 3rd fifth of the total. "
|
|
"This option is useful when running a large number of tests on "
|
|
"different hosts to speed up execution time.")
|
|
|
|
parser.add_argument(
|
|
"-N", "--ninja", action="store_true",
|
|
help="Use the Ninja generator with CMake")
|
|
|
|
parser.add_argument(
|
|
"-y", "--dry-run", action="store_true",
|
|
help="Create the filtered list of test cases, but don't actually "
|
|
"run them. Useful if you're just interested in "
|
|
"--discard-report")
|
|
|
|
parser.add_argument("--list-tags", action="store_true",
|
|
help="list all tags in selected tests")
|
|
|
|
parser.add_argument("--list-tests", action="store_true",
|
|
help="list all tests.")
|
|
|
|
parser.add_argument("--export-tests", action="store",
|
|
metavar="FILENAME",
|
|
help="Export tests case meta-data to a file in CSV format.")
|
|
|
|
parser.add_argument("--detailed-report",
|
|
action="store",
|
|
metavar="FILENAME",
|
|
help="Generate a junit report with detailed testcase results.")
|
|
|
|
parser.add_argument(
|
|
"-r", "--release", action="store_true",
|
|
help="Update the benchmark database with the results of this test "
|
|
"run. Intended to be run by CI when tagging an official "
|
|
"release. This database is used as a basis for comparison "
|
|
"when looking for deltas in metrics such as footprint")
|
|
parser.add_argument("-w", "--warnings-as-errors", action="store_true",
|
|
help="Treat warning conditions as errors")
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="count",
|
|
default=0,
|
|
help="Emit debugging information, call multiple times to increase "
|
|
"verbosity")
|
|
parser.add_argument(
|
|
"-i", "--inline-logs", action="store_true",
|
|
help="Upon test failure, print relevant log data to stdout "
|
|
"instead of just a path to it")
|
|
parser.add_argument("--log-file", metavar="FILENAME", action="store",
|
|
help="log also to file")
|
|
parser.add_argument(
|
|
"-m", "--last-metrics", action="store_true",
|
|
help="Instead of comparing metrics from the last --release, "
|
|
"compare with the results of the previous sanity check "
|
|
"invocation")
|
|
parser.add_argument(
|
|
"-u",
|
|
"--no-update",
|
|
action="store_true",
|
|
help="do not update the results of the last run of the sanity "
|
|
"checks")
|
|
parser.add_argument(
|
|
"-F",
|
|
"--load-tests",
|
|
metavar="FILENAME",
|
|
action="store",
|
|
help="Load list of tests to be run from file.")
|
|
|
|
parser.add_argument(
|
|
"-E",
|
|
"--save-tests",
|
|
metavar="FILENAME",
|
|
action="store",
|
|
help="Save list of tests to be run to file.")
|
|
|
|
parser.add_argument(
|
|
"-b", "--build-only", action="store_true",
|
|
help="Only build the code, do not execute any of it in QEMU")
|
|
parser.add_argument(
|
|
"-j", "--jobs", type=int,
|
|
help="Number of jobs for building, defaults to number of CPU threads "
|
|
"overcommited by factor 2")
|
|
|
|
parser.add_argument(
|
|
"--device-testing", action="store_true",
|
|
help="Test on device directly. Specify the serial device to "
|
|
"use with the --device-serial option.")
|
|
|
|
parser.add_argument(
|
|
"-X", "--fixture", action="append", default=[],
|
|
help="Specify a fixture that a board might support")
|
|
parser.add_argument(
|
|
"--device-serial",
|
|
help="Serial device for accessing the board (e.g., /dev/ttyACM0)")
|
|
parser.add_argument(
|
|
"--show-footprint", action="store_true",
|
|
help="Show footprint statistics and deltas since last release."
|
|
)
|
|
parser.add_argument(
|
|
"-H", "--footprint-threshold", type=float, default=5,
|
|
help="When checking test case footprint sizes, warn the user if "
|
|
"the new app size is greater then the specified percentage "
|
|
"from the last release. Default is 5. 0 to warn on any "
|
|
"increase on app size")
|
|
parser.add_argument(
|
|
"-D", "--all-deltas", action="store_true",
|
|
help="Show all footprint deltas, positive or negative. Implies "
|
|
"--footprint-threshold=0")
|
|
parser.add_argument(
|
|
"-O", "--outdir",
|
|
default="%s/sanity-out" % os.getcwd(),
|
|
help="Output directory for logs and binaries. "
|
|
"Default is 'sanity-out' in the current directory. "
|
|
"This directory will be deleted unless '--no-clean' is set.")
|
|
parser.add_argument(
|
|
"-n", "--no-clean", action="store_true",
|
|
help="Do not delete the outdir before building. Will result in "
|
|
"faster compilation since builds will be incremental")
|
|
parser.add_argument(
|
|
"-T", "--testcase-root", action="append", default=[],
|
|
help="Base directory to recursively search for test cases. All "
|
|
"testcase.yaml files under here will be processed. May be "
|
|
"called multiple times. Defaults to the 'samples' and "
|
|
"'tests' directories in the Zephyr tree.")
|
|
|
|
board_root_list = ["%s/boards" % ZEPHYR_BASE,
|
|
"%s/scripts/sanity_chk/boards" % ZEPHYR_BASE]
|
|
|
|
parser.add_argument(
|
|
"-A", "--board-root", action="append", default=board_root_list,
|
|
help="Directory to search for board configuration files. All .yaml "
|
|
"files in the directory will be processed.")
|
|
parser.add_argument(
|
|
"-z", "--size", action="append",
|
|
help="Don't run sanity checks. Instead, produce a report to "
|
|
"stdout detailing RAM/ROM sizes on the specified filenames. "
|
|
"All other command line arguments ignored.")
|
|
parser.add_argument(
|
|
"-S", "--enable-slow", action="store_true",
|
|
help="Execute time-consuming test cases that have been marked "
|
|
"as 'slow' in testcase.yaml. Normally these are only built.")
|
|
parser.add_argument(
|
|
"--disable-unrecognized-section-test", action="store_true",
|
|
default=False,
|
|
help="Skip the 'unrecognized section' test.")
|
|
parser.add_argument("-R", "--enable-asserts", action="store_true",
|
|
default=True,
|
|
help="deprecated, left for compatibility")
|
|
parser.add_argument("--disable-asserts", action="store_false",
|
|
dest="enable_asserts",
|
|
help="deprecated, left for compatibility")
|
|
parser.add_argument("-Q", "--error-on-deprecations", action="store_false",
|
|
help="Error on deprecation warnings.")
|
|
parser.add_argument("--disable-size-report", action="store_true",
|
|
help="Skip expensive computation of ram/rom segment sizes.")
|
|
|
|
parser.add_argument(
|
|
"-x", "--extra-args", action="append", default=[],
|
|
help="""Extra CMake cache entries to define when building test cases.
|
|
May be called multiple times. The key-value entries will be
|
|
prefixed with -D before being passed to CMake.
|
|
|
|
E.g
|
|
"sanitycheck -x=USE_CCACHE=0"
|
|
will translate to
|
|
"cmake -DUSE_CCACHE=0"
|
|
|
|
which will ultimately disable ccache.
|
|
"""
|
|
)
|
|
|
|
parser.add_argument("--enable-coverage", action="store_true",
|
|
help="Enable code coverage using gcov.")
|
|
|
|
parser.add_argument("-C", "--coverage", action="store_true",
|
|
help="Generate coverage reports. Implies --enable_coverage")
|
|
|
|
coverage_platforms = ["native_posix", "nrf52_bsim"]
|
|
parser.add_argument("--coverage-platform", action="append", default=coverage_platforms,
|
|
help="Plarforms to run coverage reports on. "
|
|
"This option may be used multiple times.")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def log_info(filename):
|
|
filename = os.path.relpath(os.path.realpath(filename))
|
|
if INLINE_LOGS:
|
|
info("{:-^100}".format(filename))
|
|
|
|
try:
|
|
with open(filename) as fp:
|
|
data = fp.read()
|
|
except Exception as e:
|
|
data = "Unable to read log data (%s)\n" % (str(e))
|
|
|
|
sys.stdout.write(data)
|
|
if log_file:
|
|
log_file.write(data)
|
|
info("{:-^100}".format(filename))
|
|
else:
|
|
info("\tsee: " + COLOR_YELLOW + filename + COLOR_NORMAL)
|
|
|
|
|
|
def terse_test_cb(instances, goals, goal):
|
|
total_tests = len(goals)
|
|
total_done = 0
|
|
total_failed = 0
|
|
|
|
for k, g in goals.items():
|
|
if g.finished:
|
|
total_done += 1
|
|
if g.failed:
|
|
total_failed += 1
|
|
|
|
if goal.failed:
|
|
i = instances[goal.name]
|
|
info(
|
|
"\n\n{:<25} {:<50} {}FAILED{}: {}".format(
|
|
i.platform.name,
|
|
i.test.name,
|
|
COLOR_RED,
|
|
COLOR_NORMAL,
|
|
goal.reason))
|
|
log_info(goal.get_error_log())
|
|
info("")
|
|
|
|
sys.stdout.write(
|
|
"\rtotal complete: %s%4d/%4d%s %2d%% failed: %s%4d%s" %
|
|
(COLOR_GREEN, total_done, total_tests, COLOR_NORMAL,
|
|
int((float(total_done) / total_tests) * 100),
|
|
COLOR_RED if total_failed > 0 else COLOR_NORMAL, total_failed,
|
|
COLOR_NORMAL))
|
|
sys.stdout.flush()
|
|
|
|
|
|
def chatty_test_cb(instances, goals, goal):
|
|
i = instances[goal.name]
|
|
|
|
if VERBOSE < 2 and not goal.finished:
|
|
return
|
|
|
|
total_tests = len(goals)
|
|
total_tests_width = len(str(total_tests))
|
|
total_done = 0
|
|
|
|
for k, g in goals.items():
|
|
if g.finished:
|
|
total_done += 1
|
|
|
|
if goal.failed:
|
|
status = COLOR_RED + "FAILED" + COLOR_NORMAL + ": " + goal.reason
|
|
elif goal.finished:
|
|
status = COLOR_GREEN + "PASSED" + COLOR_NORMAL
|
|
else:
|
|
status = goal.make_state
|
|
|
|
info("{:>{}}/{} {:<25} {:<50} {}".format(
|
|
total_done, total_tests_width, total_tests, i.platform.name,
|
|
i.test.name, status))
|
|
if goal.failed:
|
|
log_info(goal.get_error_log())
|
|
|
|
|
|
def size_report(sc):
|
|
info(sc.filename)
|
|
info("SECTION NAME VMA LMA SIZE HEX SZ TYPE")
|
|
for i in range(len(sc.sections)):
|
|
v = sc.sections[i]
|
|
|
|
info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" %
|
|
(v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"],
|
|
v["type"]))
|
|
|
|
info("Totals: %d bytes (ROM), %d bytes (RAM)" %
|
|
(sc.rom_size, sc.ram_size))
|
|
info("")
|
|
|
|
def retrieve_data(intput_file):
|
|
if VERBOSE:
|
|
print("Working on %s" %intput_file)
|
|
extracted_coverage_info = {}
|
|
capture_data = False
|
|
with open(intput_file, 'r') as fp:
|
|
for line in fp.readlines():
|
|
if re.search("GCOV_COVERAGE_DUMP_START", line):
|
|
capture_data = True
|
|
continue
|
|
if re.search("GCOV_COVERAGE_DUMP_END", line):
|
|
capture_data = True
|
|
break
|
|
# Loop until the coverage data is found.
|
|
if not capture_data:
|
|
continue
|
|
if line.startswith("*"):
|
|
sp = line.split("<")
|
|
if len(sp) > 1:
|
|
# Remove the leading delimiter "*"
|
|
file_name = sp[0][1:]
|
|
# Remove the trailing new line char
|
|
hex_dump = sp[1][:-1]
|
|
else:
|
|
continue
|
|
else:
|
|
continue
|
|
extracted_coverage_info.update({file_name:hex_dump})
|
|
return extracted_coverage_info
|
|
|
|
def create_gcda_files(extracted_coverage_info):
|
|
if VERBOSE:
|
|
print("Generating gcda files")
|
|
for filename, hexdump_val in extracted_coverage_info.items():
|
|
# if kobject_hash is given for coverage gcovr fails
|
|
# hence skipping it problem only in gcovr v4.1
|
|
if "kobject_hash" in filename:
|
|
filename = (filename[:-4]) +"gcno"
|
|
try:
|
|
os.remove(filename)
|
|
except:
|
|
pass
|
|
continue
|
|
|
|
with open(filename, 'wb') as fp:
|
|
fp.write(bytes.fromhex(hexdump_val))
|
|
|
|
def generate_coverage(outdir, ignores):
|
|
|
|
for filename in glob.glob("%s/**/handler.log" %outdir, recursive=True):
|
|
extracted_coverage_info = retrieve_data(filename)
|
|
create_gcda_files(extracted_coverage_info)
|
|
|
|
with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
|
|
coveragefile = os.path.join(outdir, "coverage.info")
|
|
ztestfile = os.path.join(outdir, "ztest.info")
|
|
subprocess.call(["lcov", "--capture", "--directory", outdir,
|
|
"--rc", "lcov_branch_coverage=1",
|
|
"--output-file", coveragefile], stdout=coveragelog)
|
|
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
|
|
subprocess.call(["lcov", "--extract", coveragefile,
|
|
os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"),
|
|
"--output-file", ztestfile,
|
|
"--rc", "lcov_branch_coverage=1"], stdout=coveragelog)
|
|
|
|
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
|
|
subprocess.call(["lcov", "--remove", ztestfile,
|
|
os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"),
|
|
"--output-file", ztestfile,
|
|
"--rc", "lcov_branch_coverage=1"],
|
|
stdout=coveragelog)
|
|
files = [coveragefile, ztestfile];
|
|
else:
|
|
files = [coveragefile];
|
|
|
|
for i in ignores:
|
|
subprocess.call(
|
|
["lcov", "--remove", coveragefile, i, "--output-file",
|
|
coveragefile, "--rc", "lcov_branch_coverage=1"],
|
|
stdout=coveragelog)
|
|
|
|
ret = subprocess.call(["genhtml", "--legend", "--branch-coverage",
|
|
"-output-directory",
|
|
os.path.join(outdir, "coverage")] + files,
|
|
stdout=coveragelog)
|
|
if ret==0:
|
|
info("HTML report generated: %s"%
|
|
os.path.join(outdir, "coverage","index.html"));
|
|
|
|
|
|
def main():
|
|
start_time = time.time()
|
|
global VERBOSE, INLINE_LOGS, JOBS, log_file
|
|
global options
|
|
global run_individual_tests
|
|
options = parse_arguments()
|
|
|
|
if options.coverage:
|
|
options.enable_coverage = True
|
|
|
|
if options.size:
|
|
for fn in options.size:
|
|
size_report(SizeCalculator(fn, []))
|
|
sys.exit(0)
|
|
|
|
|
|
if options.device_testing:
|
|
if options.device_serial is None or len(options.platform) != 1:
|
|
sys.exit(1)
|
|
|
|
VERBOSE += options.verbose
|
|
INLINE_LOGS = options.inline_logs
|
|
if options.log_file:
|
|
log_file = open(options.log_file, "w")
|
|
|
|
if options.jobs:
|
|
JOBS = options.jobs
|
|
|
|
# Decrease JOBS for Ninja, if jobs weren't explicitly set
|
|
if options.ninja and not options.jobs:
|
|
JOBS = int(JOBS * 0.75)
|
|
|
|
info("JOBS: %d" % JOBS);
|
|
|
|
if options.subset:
|
|
subset, sets = options.subset.split("/")
|
|
if int(subset) > 0 and int(sets) >= int(subset):
|
|
info("Running only a subset: %s/%s" % (subset, sets))
|
|
else:
|
|
error("You have provided a wrong subset value: %s." % options.subset)
|
|
return
|
|
|
|
if os.path.exists(options.outdir) and not options.no_clean:
|
|
info("Cleaning output directory " + options.outdir)
|
|
shutil.rmtree(options.outdir)
|
|
|
|
if not options.testcase_root:
|
|
options.testcase_root = [os.path.join(ZEPHYR_BASE, "tests"),
|
|
os.path.join(ZEPHYR_BASE, "samples")]
|
|
|
|
ts = TestSuite(options.board_root, options.testcase_root, options.outdir)
|
|
|
|
if ts.load_errors:
|
|
sys.exit(1)
|
|
|
|
if options.list_tags:
|
|
tags = set()
|
|
for n,tc in ts.testcases.items():
|
|
tags = tags.union(tc.tags)
|
|
|
|
for t in tags:
|
|
print("- {}".format(t))
|
|
|
|
return
|
|
|
|
|
|
def export_tests(filename, tests):
|
|
with open(filename, "wt") as csvfile:
|
|
fieldnames = ['section', 'subsection', 'title', 'reference']
|
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep)
|
|
for test in tests:
|
|
data = test.split(".")
|
|
subsec = " ".join(data[1].split("_")).title()
|
|
rowdict = {
|
|
"section": data[0].capitalize(),
|
|
"subsection": subsec,
|
|
"title": test,
|
|
"reference": test
|
|
}
|
|
cw.writerow(rowdict)
|
|
|
|
if options.export_tests:
|
|
cnt = 0
|
|
unq = []
|
|
for n,tc in ts.testcases.items():
|
|
for c in tc.cases:
|
|
unq.append(c)
|
|
|
|
tests = sorted(set(unq))
|
|
export_tests(options.export_tests, tests)
|
|
return
|
|
|
|
run_individual_tests = []
|
|
|
|
if options.test:
|
|
run_individual_tests = options.test
|
|
|
|
if options.list_tests or options.sub_test:
|
|
cnt = 0
|
|
unq = []
|
|
run_individual_tests = []
|
|
for n,tc in ts.testcases.items():
|
|
for c in tc.cases:
|
|
if options.sub_test and c in options.sub_test:
|
|
if tc.name not in run_individual_tests:
|
|
run_individual_tests.append(tc.name)
|
|
unq.append(c)
|
|
|
|
if options.sub_test:
|
|
if run_individual_tests:
|
|
info("Running the following tests:")
|
|
for t in run_individual_tests:
|
|
print(" - {}".format(t))
|
|
else:
|
|
info("Tests not found")
|
|
return
|
|
|
|
elif options.list_tests:
|
|
for u in sorted(set(unq)):
|
|
cnt = cnt + 1
|
|
print(" - {}".format(u))
|
|
print("{} total.".format(cnt))
|
|
return
|
|
|
|
discards = []
|
|
if options.load_tests:
|
|
ts.load_from_file(options.load_tests)
|
|
else:
|
|
discards = ts.apply_filters()
|
|
|
|
if options.discard_report:
|
|
ts.discard_report(options.discard_report)
|
|
|
|
if VERBOSE > 1 and discards:
|
|
# if we are using command line platform filter, no need to list every
|
|
# other platform as excluded, we know that already.
|
|
# Show only the discards that apply to the selected platforms on the
|
|
# command line
|
|
|
|
for i, reason in discards.items():
|
|
if options.platform and i.platform.name not in options.platform:
|
|
continue
|
|
debug(
|
|
"{:<25} {:<50} {}SKIPPED{}: {}".format(
|
|
i.platform.name,
|
|
i.test.name,
|
|
COLOR_YELLOW,
|
|
COLOR_NORMAL,
|
|
reason))
|
|
|
|
|
|
def native_and_unit_first(a, b):
|
|
if a[0].startswith('unit_testing'):
|
|
return -1
|
|
if b[0].startswith('unit_testing'):
|
|
return 1
|
|
if a[0].startswith('native_posix'):
|
|
return -1
|
|
if b[0].startswith('native_posix'):
|
|
return 1
|
|
if a[0].split("/",1)[0].endswith("_bsim"):
|
|
return -1
|
|
if b[0].split("/",1)[0].endswith("_bsim"):
|
|
return 1
|
|
|
|
return (a > b) - (a < b)
|
|
|
|
ts.instances = OrderedDict(sorted(ts.instances.items(),
|
|
key=cmp_to_key(native_and_unit_first)))
|
|
|
|
if options.save_tests:
|
|
ts.run_report(options.save_tests)
|
|
return
|
|
|
|
if options.subset:
|
|
|
|
subset, sets = options.subset.split("/")
|
|
total = len(ts.instances)
|
|
per_set = round(total / int(sets))
|
|
start = (int(subset) - 1) * per_set
|
|
if subset == sets:
|
|
end = total
|
|
else:
|
|
end = start + per_set
|
|
|
|
sliced_instances = islice(ts.instances.items(), start, end)
|
|
ts.instances = OrderedDict(sliced_instances)
|
|
|
|
info("%d tests selected, %d tests discarded due to filters" %
|
|
(len(ts.instances), len(discards)))
|
|
|
|
if options.dry_run:
|
|
return
|
|
|
|
if VERBOSE or not TERMINAL:
|
|
goals = ts.execute(
|
|
chatty_test_cb,
|
|
ts.instances)
|
|
else:
|
|
goals = ts.execute(
|
|
terse_test_cb,
|
|
ts.instances)
|
|
info("")
|
|
|
|
if options.detailed_report:
|
|
ts.testcase_target_report(options.detailed_report)
|
|
|
|
# figure out which report to use for size comparison
|
|
if options.compare_report:
|
|
report_to_use = options.compare_report
|
|
elif options.last_metrics:
|
|
report_to_use = LAST_SANITY
|
|
else:
|
|
report_to_use = RELEASE_DATA
|
|
|
|
deltas = ts.compare_metrics(report_to_use)
|
|
warnings = 0
|
|
if deltas and options.show_footprint:
|
|
for i, metric, value, delta, lower_better in deltas:
|
|
if not options.all_deltas and ((delta < 0 and lower_better) or
|
|
(delta > 0 and not lower_better)):
|
|
continue
|
|
|
|
percentage = (float(delta) / float(value - delta))
|
|
if not options.all_deltas and (percentage <
|
|
(options.footprint_threshold / 100.0)):
|
|
continue
|
|
|
|
info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format(
|
|
i.platform.name, i.test.name, COLOR_YELLOW,
|
|
"INFO" if options.all_deltas else "WARNING", COLOR_NORMAL,
|
|
metric, delta, value, percentage))
|
|
warnings += 1
|
|
|
|
if warnings:
|
|
info("Deltas based on metrics from last %s" %
|
|
("release" if not options.last_metrics else "run"))
|
|
|
|
failed = 0
|
|
for name, goal in goals.items():
|
|
if goal.failed:
|
|
failed += 1
|
|
elif goal.metrics.get("unrecognized") and not options.disable_unrecognized_section_test:
|
|
info("%sFAILED%s: %s has unrecognized binary sections: %s" %
|
|
(COLOR_RED, COLOR_NORMAL, goal.name,
|
|
str(goal.metrics["unrecognized"])))
|
|
failed += 1
|
|
|
|
if options.coverage:
|
|
info("Generating coverage files...")
|
|
generate_coverage(options.outdir, ["*generated*", "tests/*", "samples/*"])
|
|
|
|
duration = time.time() - start_time
|
|
info("%s%d of %d%s tests passed with %s%d%s warnings in %d seconds" %
|
|
(COLOR_RED if failed else COLOR_GREEN, len(goals) - failed,
|
|
len(goals), COLOR_NORMAL, COLOR_YELLOW if warnings else COLOR_NORMAL,
|
|
warnings, COLOR_NORMAL, duration))
|
|
|
|
if options.testcase_report:
|
|
ts.testcase_report(options.testcase_report)
|
|
if not options.no_update:
|
|
ts.testcase_xunit_report(LAST_SANITY_XUNIT, duration)
|
|
ts.testcase_report(LAST_SANITY)
|
|
if options.release:
|
|
ts.testcase_report(RELEASE_DATA)
|
|
if log_file:
|
|
log_file.close()
|
|
if failed or (warnings and options.warnings_as_errors):
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|