twister: move runner code into runner class

Move all code related to test execution into runner class.

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2022-06-10 07:27:28 -04:00
parent 20f257a97d
commit facc685ae9
2 changed files with 85 additions and 76 deletions

View file

@ -11,8 +11,11 @@ import subprocess
import pickle
import logging
import queue
import time
import multiprocessing
from colorama import Fore
from multiprocessing import Lock, Process, Value
from multiprocessing.managers import BaseManager
from twister.cmakecache import CMakeCache
@ -746,25 +749,89 @@ class ProjectBuilder(FilterBuilder):
class TwisterRunner:
def __init__(self, instances, jobs=1, env=None) -> None:
def __init__(self, instances, suites, env=None) -> None:
self.pipeline = None
self.options = env.options
self.env = env
self.instances = instances
self.jobs = jobs
self.suites = suites
self.jobs = 1
self.results = None
def update_counting(self, results=None):
def run(self):
retries = self.options.retry_failed + 1
completed = 0
BaseManager.register('LifoQueue', queue.LifoQueue)
manager = BaseManager()
manager.start()
self.results = ExecutionCounter(total=len(self.instances))
pipeline = manager.LifoQueue()
done_queue = manager.LifoQueue()
# Set number of jobs
if self.options.jobs:
self.jobs = self.options.jobs
elif self.options.build_only:
self.jobs = multiprocessing.cpu_count() * 2
else:
self.jobs = multiprocessing.cpu_count()
logger.info("JOBS: %d" % self.jobs)
self.update_counting()
logger.info("%d test scenarios (%d configurations) selected, %d configurations discarded due to filters." %
(len(self.suites), len(self.instances), self.results.skipped_configs))
while True:
completed += 1
if completed > 1:
logger.info("%d Iteration:" % (completed))
time.sleep(self.options.retry_interval) # waiting for the system to settle down
self.results.done = self.results.total - self.results.failed
if self.options.retry_build_errors:
self.results.failed = 0
self.results.error = 0
else:
self.results.failed = self.results.error
self.execute(pipeline, done_queue)
while True:
try:
inst = done_queue.get_nowait()
except queue.Empty:
break
else:
inst.metrics.update(self.instances[inst.name].metrics)
inst.metrics["handler_time"] = inst.execution_time
inst.metrics["unrecognized"] = []
self.instances[inst.name] = inst
print("")
retries = retries - 1
# There are cases where failed == error (only build failures),
# we do not try build failures.
if retries == 0 or (self.results.failed == self.results.error and not self.options.retry_build_errors):
break
def update_counting(self):
for instance in self.instances.values():
results.cases += len(instance.testsuite.testcases)
self.results.cases += len(instance.testsuite.testcases)
if instance.status == 'filtered':
results.skipped_filter += 1
results.skipped_configs += 1
self.results.skipped_filter += 1
self.results.skipped_configs += 1
elif instance.status == 'passed':
results.passed += 1
results.done += 1
self.results.passed += 1
self.results.done += 1
elif instance.status == 'error':
results.error += 1
results.done += 1
self.results.error += 1
self.results.done += 1
def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
@ -797,7 +864,7 @@ class TwisterRunner:
return True
def execute(self, pipeline, done, results):
def execute(self, pipeline, done):
lock = Lock()
logger.info("Adding tasks to the queue...")
self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
@ -807,7 +874,7 @@ class TwisterRunner:
processes = []
for job in range(self.jobs):
logger.debug(f"Launch process {job}")
p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, ))
p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
processes.append(p)
p.start()
@ -819,4 +886,3 @@ class TwisterRunner:
for p in processes:
p.terminate()
return results

View file

@ -171,12 +171,11 @@ import logging
import time
import shutil
from collections import OrderedDict
import multiprocessing
from itertools import islice
import colorama
from colorama import Fore
from pathlib import Path
from multiprocessing.managers import BaseManager
import queue
from zephyr_module import west_projects, parse_modules
@ -1230,67 +1229,11 @@ def main():
if options.short_build_path:
tplan.create_build_dir_links()
retries = options.retry_failed + 1
completed = 0
BaseManager.register('LifoQueue', queue.LifoQueue)
manager = BaseManager()
manager.start()
results = ExecutionCounter(total=len(tplan.instances))
pipeline = manager.LifoQueue()
done_queue = manager.LifoQueue()
# Set number of jobs
if options.jobs:
jobs = options.jobs
elif options.build_only:
jobs = multiprocessing.cpu_count() * 2
else:
jobs = multiprocessing.cpu_count()
logger.info("JOBS: %d" % jobs)
runner = TwisterRunner(tplan.instances, jobs, env)
runner.update_counting(results)
logger.info("%d test scenarios (%d configurations) selected, %d configurations discarded due to filters." %
(len(tplan.testsuites), len(tplan.instances), results.skipped_configs))
while True:
completed += 1
if completed > 1:
logger.info("%d Iteration:" % (completed))
time.sleep(options.retry_interval) # waiting for the system to settle down
results.done = results.total - results.failed
if options.retry_build_errors:
results.failed = 0
results.error = 0
else:
results.failed = results.error
results = runner.execute(pipeline, done_queue, results)
while True:
try:
inst = done_queue.get_nowait()
except queue.Empty:
break
else:
inst.metrics.update(tplan.instances[inst.name].metrics)
inst.metrics["handler_time"] = inst.execution_time
inst.metrics["unrecognized"] = []
tplan.instances[inst.name] = inst
print("")
retries = retries - 1
# There are cases where failed == error (only build failures),
# we do not try build failures.
if retries == 0 or (results.failed == results.error and not options.retry_build_errors):
break
runner = TwisterRunner(tplan.instances, tplan.testsuites, env)
runner.run()
# figure out which report to use for size comparison
report_to_use = None
@ -1307,9 +1250,9 @@ def main():
duration = time.time() - start_time
results.summary()
runner.results.summary()
report.summary(results, options.disable_unrecognized_section_test, duration)
report.summary(runner.results, options.disable_unrecognized_section_test, duration)
if options.coverage:
if not options.gcov_tool:
@ -1353,7 +1296,7 @@ def main():
)
logger.info("Run completed")
if results.failed or results.error or (tplan.warnings and options.warnings_as_errors):
if runner.results.failed or runner.results.error or (tplan.warnings and options.warnings_as_errors):
sys.exit(1)