ci: overhaul check-compliance.py script

- Support sending status to github
- Support license checking
- Make every test a class
- Use junitparser instead of custom xml for junit output

Signed-off-by: Anas Nashif <anas.nashif@intel.com>
This commit is contained in:
Anas Nashif 2018-11-19 18:37:48 -05:00
parent 6d5ac1c089
commit 4c0e3e8697

View file

@ -3,21 +3,26 @@ import sys
import subprocess
import re
import os
import xml.etree.ElementTree as ET
from email.utils import parseaddr
import sh
import logging
import argparse
#from check_identity import verify_signed_off
from junitparser import TestCase, TestSuite, JUnitXml, Skipped, Error, Failure, Attr
from github import Github
from shutil import copyfile, copytree
import json
from pprint import pprint
if "ZEPHYR_BASE" not in os.environ:
logging.error("$ZEPHYR_BASE environment variable undefined.\n")
exit(1)
logging.warn("$ZEPHYR_BASE environment variable undefined.\n")
repository_path = os.getcwd()
else:
repository_path = os.environ['ZEPHYR_BASE']
logger = None
DOCS_WARNING_FILE = "doc.warnings"
repository_path = os.environ['ZEPHYR_BASE']
sh_special_args = {
'_tty_out': False,
'_cwd': repository_path
@ -29,6 +34,240 @@ sys.path.insert(0, os.path.join(repository_path, "scripts/kconfig"))
import kconfiglib
def get_shas(refspec):
sha_list = sh.git("rev-list",
'--max-count={0}'.format(-1 if "." in refspec else 1),
refspec, **sh_special_args).split()
return sha_list
class MyCase(TestCase):
classname = Attr()
doc = Attr()
class ComplianceTest:
_name = ""
_title = ""
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html"
def __init__(self, suite, range):
self.case = None
self.suite = suite
self.commit_range = range
def prepare(self):
self.case = MyCase(self._name)
self.case.classname = "Guidelines"
print("Running {} tests...".format(self._name))
def run(self):
pass
class CheckPatch(ComplianceTest):
_name = "checkpatch"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#coding-style"
def run(self):
self.prepare()
diff = subprocess.Popen(('git', 'diff', '%s' %(self.commit_range)), stdout=subprocess.PIPE)
try:
output = subprocess.check_output(('%s/scripts/checkpatch.pl' %repository_path,
'--mailback', '--no-tree', '-'), stdin=diff.stdout,
stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as ex:
m = re.search("([1-9][0-9]*) errors,", ex.output.decode('utf8'))
if m:
self.case.result = Failure("Checkpatch issues", "failure")
self.case.result._elem.text = (ex.output.decode('utf8'))
class KconfigCheck(ComplianceTest):
_name = "Kconfig"
_doc = "https://docs.zephyrproject.org/latest/application/kconfig-tips.html"
def run(self):
self.prepare()
# Look up Kconfig files relative to ZEPHYR_BASE
os.environ["srctree"] = repository_path
# Parse the entire Kconfig tree, to make sure we see all symbols
os.environ["SOC_DIR"] = "soc/"
os.environ["BOARD_DIR"] = "boards/*/*"
os.environ["ARCH"] = "*"
# Enable strict Kconfig mode in Kconfiglib, which assumes there's just a
# single Kconfig tree and warns for all references to undefined symbols
os.environ["KCONFIG_STRICT"] = "y"
undef_ref_warnings = []
for warning in kconfiglib.Kconfig().warnings:
if "undefined symbol" in warning:
undef_ref_warnings.append(warning)
# Generating multiple JUnit <failure>s would be neater, but Shippable only
# seems to display the first one
if undef_ref_warnings:
self.case.result = Failure("undefined Kconfig symbols", "failure")
self.case.result._elem.text = "\n\n\n".join(undef_ref_warnings)
class Documentation(ComplianceTest):
_name = "Documentation"
_doc = "https://docs.zephyrproject.org/latest/contribute/doc-guidelines.html"
def run(self):
self.prepare()
if os.path.exists(DOCS_WARNING_FILE) and os.path.getsize(DOCS_WARNING_FILE) > 0:
with open(DOCS_WARNING_FILE, "rb") as f:
log = f.read()
self.case.result = Error("Documentation Issues", "failure")
self.case.result._elem.text = log.decode('utf8')
else:
self.case.result = Skipped()
class GitLint(ComplianceTest):
_name = "Gitlint"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines"
def run(self):
self.prepare()
proc = subprocess.Popen('gitlint --commits %s' % (self.commit_range),
shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
msg = ""
if proc.wait() != 0:
msg = proc.stdout.read()
if msg != "":
text = (msg.decode('utf8'))
self.case.result = Failure("commit message syntax issues", "failure")
self.case.result._elem.text = text
class License(ComplianceTest):
_name = "License"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#licensing"
def run(self):
self.prepare()
scancode = "/opt/scancode-toolkit/scancode"
if not os.path.exists(scancode):
self.case.result = Skipped()
return
os.makedirs("scancode-files", exist_ok=True)
new_files = sh.git("diff", "--name-only", "--diff-filter=A", self.commit_range, **sh_special_args)
if len(new_files) == 0:
self.case.result = Skipped()
return
for newf in new_files:
f = str(newf).rstrip()
os.makedirs(os.path.join('scancode-files', os.path.dirname(f)), exist_ok=True)
copy = os.path.join("scancode-files", f)
copyfile(f, copy)
try:
cmd = [scancode, '--verbose', '--copyright', '--license', '--license-diag', '--info',
'--classify', '--summary', '--json', 'scancode.json', 'scancode-files/']
cmd_str = " ".join(cmd)
logging.info(cmd_str)
out = subprocess.check_output(cmd_str, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as e:
logging.error(e.output)
self.case.result = Skipped()
return
report = ""
with open ('scancode.json', 'r') as json_fp:
scancode_results = json.load(json_fp)
for file in scancode_results['files']:
if file['type'] == 'directory':
continue
original_fp = str(file['path']).replace('scancode-files/', '')
licenses = file['licenses']
if (file['is_script'] or file['is_source']) and (file['programming_language'] not in ['CMake']) and (file['extension'] not in ['.yaml']):
if len(file['licenses']) == 0:
report += ("* {} missing license.\n".format(original_fp))
else:
for l in licenses:
if l['key'] != "apache-2.0":
report += ("* {} is not apache-2.0 licensed: {}\n".format(original_fp, l['key']))
if l['category'] != 'Permissive':
report += ("* {} has non-permissive license: {}\n".format(original_fp, l['key']))
if len(file['copyrights']) == 0:
report += ("* {} missing copyright.\n".format(original_fp))
if report != "":
self.case.result = Failure("License/Copyright issues", "failure")
self.case.result._elem.text = report
class Identity(ComplianceTest):
_name = "Identity/Emails"
_doc = "https://docs.zephyrproject.org/latest/contribute/contribute_guidelines.html#commit-guidelines"
def run(self):
self.prepare()
for f in get_shas(self.commit_range):
commit = sh.git("log", "--decorate=short", "-n 1", f, **sh_special_args)
signed = []
author = ""
sha = ""
parsed_addr = None
for line in commit.split("\n"):
match = re.search("^commit\s([^\s]*)", line)
if match:
sha = match.group(1)
match = re.search("^Author:\s(.*)", line)
if match:
author = match.group(1)
parsed_addr = parseaddr(author)
match = re.search("signed-off-by:\s(.*)", line, re.IGNORECASE)
if match:
signed.append(match.group(1))
error1 = "%s: author email (%s) needs to match one of the signed-off-by entries." % (sha, author)
error2 = "%s: author email (%s) does not follow the syntax: First Last <email>." % (sha, author)
failure = None
if author not in signed:
failure = error1
if not parsed_addr or len(parsed_addr[0].split(" ")) < 2:
if not failure:
failure = error2
else:
failure = failure + "\n" + error2
if failure:
self.case.result = Failure("identity/email issues", "failure")
self.case.result._elem.text = failure
def init_logs():
global logger
log_lev = os.environ.get('LOG_LEVEL', None)
@ -52,206 +291,105 @@ def parse_args():
description="Check for coding style and documentation warnings.")
parser.add_argument('-c', '--commits', default=None,
help="Commit range in the form: a..b")
parser.add_argument('-g', '--github', action="store_true",
help="Send results to github as a comment.")
parser.add_argument('-r', '--repo', default=None,
help="Github repository")
parser.add_argument('-p', '--pull-request', default=0, type=int,
help="Pull request number")
parser.add_argument('-s', '--status', action="store_true", help="Set status to pending")
parser.add_argument('-S', '--sha', default=None, help="Commit SHA")
return parser.parse_args()
def get_shas(refspec):
sha_list = sh.git("rev-list",
'--max-count={0}'.format(-1 if "." in refspec else 1),
refspec, **sh_special_args).split()
return sha_list
def set_status(gh, repo, sha):
def run_gitlint(tc, commit_range):
proc = subprocess.Popen('gitlint --commits %s' %(commit_range),
shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
msg = ""
if proc.wait() != 0:
msg = proc.stdout.read()
if msg != "":
failure = ET.SubElement(tc, 'failure', type="failure", message="commit message error on range: %s" %commit_range)
failure.text = (msg.decode('utf8'))
return 1
return 0
def run_checkpatch(tc, commit_range):
output = None
out = ""
diff = subprocess.Popen(('git', 'diff', '%s' %(commit_range)), stdout=subprocess.PIPE)
try:
output = subprocess.check_output(('%s/scripts/checkpatch.pl' %repository_path,
'--mailback', '--no-tree', '-'), stdin=diff.stdout,
stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as ex:
m = re.search("([1-9][0-9]*) errors,", ex.output.decode('utf8'))
if m:
failure = ET.SubElement(tc, 'failure', type="failure", message="checkpatch issues")
failure.text = (ex.output.decode('utf8'))
return 1
return 0
def run_kconfig_undef_ref_check(tc, commit_range):
# Look up Kconfig files relative to ZEPHYR_BASE
os.environ["srctree"] = repository_path
# Parse the entire Kconfig tree, to make sure we see all symbols
os.environ["SOC_DIR"] = "soc/"
os.environ["BOARD_DIR"] = "boards/*/*"
os.environ["ARCH"] = "*"
# Enable strict Kconfig mode in Kconfiglib, which assumes there's just a
# single Kconfig tree and warns for all references to undefined symbols
os.environ["KCONFIG_STRICT"] = "y"
undef_ref_warnings = []
for warning in kconfiglib.Kconfig().warnings:
if "undefined symbol" in warning:
undef_ref_warnings.append(warning)
# Generating multiple JUnit <failure>s would be neater, but Shippable only
# seems to display the first one
if undef_ref_warnings:
failure = ET.SubElement(tc, "failure", type="failure",
message="undefined Kconfig symbols")
failure.text = "\n\n\n".join(undef_ref_warnings)
return 1
return 0
def verify_signed_off(tc, commit):
signed = []
author = ""
sha = ""
parsed_addr = None
for line in commit.split("\n"):
match = re.search("^commit\s([^\s]*)", line)
if match:
sha = match.group(1)
match = re.search("^Author:\s(.*)", line)
if match:
author = match.group(1)
parsed_addr = parseaddr(author)
match = re.search("signed-off-by:\s(.*)", line, re.IGNORECASE)
if match:
signed.append(match.group(1))
error1 = "%s: author email (%s) needs to match one of the signed-off-by entries." %(sha, author)
error2 = "%s: author email (%s) does not follow the syntax: First Last <email>." %(sha, author)
error = 0
failure = None
if author not in signed:
failure = ET.SubElement(tc, 'failure', type="failure", message="identity error")
failure.text = error1
error = 1
if not parsed_addr or len(parsed_addr[0].split(" ")) < 2:
if not failure:
failure = ET.SubElement(tc, 'failure', type="failure", message="identity error")
failure.text = error2
else:
failure.text = failure.text + "\n" + error2
error = 1
return error
def run_check_identity(tc, range):
error = 0
for f in get_shas(range):
commit = sh.git("log","--decorate=short", "-n 1", f, **sh_special_args)
error += verify_signed_off(tc, commit)
return error
def check_doc(tc, range):
if os.path.exists(DOCS_WARNING_FILE) and os.path.getsize(DOCS_WARNING_FILE) > 0:
with open(DOCS_WARNING_FILE, "rb") as f:
log = f.read()
failure = ET.SubElement(tc, 'failure', type="failure",
message="documentation issues")
failure.text = (log.decode('utf8'))
return 1
return 0
tests = {
"gitlint": {
"call": run_gitlint,
"name": "Commit message style",
},
"identity": {
"call": run_check_identity,
"name": "Author Identity verification",
},
"checkpatch": {
"call": run_checkpatch,
"name": "Code style check using checkpatch",
},
"checkkconfig": {
"call": run_kconfig_undef_ref_check,
"name": "Check Kconfig files for references to undefined symbols",
},
"documentation": {
"call": check_doc,
"name": "New warnings and errors when building documentation",
}
}
def run_tests(range):
run = "Commit Message / Documentation / Coding Style"
eleTestsuite = None
fails = 0
passes = 0
errors = 0
total = 0
filename = "compliance.xml"
eleTestsuites = ET.Element('testsuites')
eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', name=run, time="0",
tests="%d" %(errors + passes + fails), failures="%d" %fails, errors="%d" %errors, skip="0")
for test in tests.keys():
total += 1
eleTestcase = ET.SubElement(eleTestsuite, 'testcase', classname="%s"
%(test), name="%s" %(tests[test]['name']), time="0")
fails += tests[test]['call'](eleTestcase, range)
eleTestsuite.set("tests", "%s" %total)
eleTestsuite.set("failures", "%s" %fails)
result = ET.tostring(eleTestsuites)
f = open(filename, 'wb')
f.write(result)
f.close()
return fails
repo = gh.get_repo(repo)
commit = repo.get_commit(sha)
for Test in ComplianceTest.__subclasses__():
t = Test(None, "")
print("Creating status for %s" %(t._name))
commit.create_status('pending',
'%s' %t._doc,
'Verification in progress',
'{}'.format(t._name))
def main():
args = parse_args()
github_token = ''
gh = None
if args.github:
github_token = os.environ['GH_TOKEN']
gh = Github(github_token)
if args.status and args.sha != None and args.repo and gh:
set_status(gh, args.repo, args.sha)
sys.exit(0)
if not args.commits:
exit(1)
fails = run_tests(args.commits)
print(fails)
sys.exit(fails)
sys.exit(1)
suite = TestSuite("Compliance")
docs = {}
for Test in ComplianceTest.__subclasses__():
t = Test(suite, args.commits)
t.run()
suite.add_testcase(t.case)
docs[t.case.name] = t._doc
xml = JUnitXml()
xml.add_testsuite(suite)
xml.update_statistics()
xml.write('compliance.xml')
if args.github:
repo = gh.get_repo(args.repo)
pr = repo.get_pull(int(args.pull_request))
commit = repo.get_commit(args.sha)
comment = "Found the following issues, please fix and resubmit:\n\n"
comment_count = 0
print("Processing results...")
for case in suite:
if case.result and case.result != Skipped:
comment_count += 1
comment += ("## {}\n".format(case.result.message))
comment += "\n"
if case.name not in ['Gitlint', 'Identity/Emails', 'License']:
comment += "```\n"
comment += ("{}\n".format(case.result._elem.text))
if case.name not in ['Gitlint', 'Identity/Emails', 'License']:
comment += "```\n"
commit.create_status('failure',
docs[case.name],
'Verification failed',
'{}'.format(case.name))
else:
commit.create_status('success',
docs[case.name],
'Verifications passed',
'{}'.format(case.name))
if args.repo and args.pull_request and comment_count > 0:
comments = pr.get_issue_comments()
commented = False
for c in comments:
if 'Found the following issues, please fix and resubmit' in c.body:
c.edit(comment)
commented = True
break
if not commented:
pr.create_issue_comment(comment)
if __name__ == "__main__":
#init_logs()
main()