Move test.py and report.py to core/
Change-Id: Ibb7fa5ab40bcf1e59705bdd2c2c5a76025b2b544
diff --git a/src/osmo_gsm_tester/core/report.py b/src/osmo_gsm_tester/core/report.py
new file mode 100644
index 0000000..f781695
--- /dev/null
+++ b/src/osmo_gsm_tester/core/report.py
@@ -0,0 +1,160 @@
+# osmo_gsm_tester: report: directory of binaries to be tested
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Pau Espin Pedrol <pespin@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# junit xml format: https://llg.cubic.org/docs/junit/
+
+import math
+import sys
+import re
+from datetime import datetime
+import xml.etree.ElementTree as et
+from xml.sax.saxutils import escape
+from . import test
+
+invalid_xml_char_ranges = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
+ (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
+if sys.maxunicode >= 0x10000: # not narrow build
+ invalid_xml_char_ranges.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
+ (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
+ (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
+ (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
+ (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
+ (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
+ (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
+ (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
+invalid_xml_char_ranges_str = ['%s-%s' % (chr(low), chr(high))
+ for (low, high) in invalid_xml_char_ranges]
+invalid_xml_char_ranges_regex = re.compile('[%s]' % ''.join(invalid_xml_char_ranges_str))
+
+def escape_xml_invalid_characters(str):
+ replacement_char = '\uFFFD' # Unicode replacement character
+ return invalid_xml_char_ranges_regex.sub(replacement_char, escape(str))
+
+def trial_to_junit_write(trial, junit_path):
+ elements = et.ElementTree(element=trial_to_junit(trial))
+ elements.write(junit_path)
+
+def trial_to_junit(trial):
+ testsuites = et.Element('testsuites')
+ num_tests = 0
+ num_failures = 0
+ num_errors = 0
+ time = 0
+ id = 0
+ for suite in trial.suites:
+ testsuite = suite_to_junit(suite)
+ testsuite.set('id', str(id))
+ id += 1
+ testsuites.append(testsuite)
+ num_tests += int(testsuite.get('tests'))
+ num_failures += int(testsuite.get('failures'))
+ num_errors += int(testsuite.get('errors'))
+ time += suite.duration
+ testsuites.set('tests', str(num_tests))
+ testsuites.set('errors', str(num_errors))
+ testsuites.set('failures', str(num_failures))
+ testsuites.set('time', str(math.ceil(time)))
+ return testsuites
+
+def suite_to_junit(suite):
+ testsuite = et.Element('testsuite')
+ testsuite.set('name', suite.name())
+ testsuite.set('hostname', 'localhost')
+ if suite.start_timestamp:
+ testsuite.set('timestamp', datetime.fromtimestamp(round(suite.start_timestamp)).isoformat())
+ testsuite.set('time', str(math.ceil(suite.duration)))
+ testsuite.set('tests', str(len(suite.tests)))
+ passed, skipped, failed, errors = suite.count_test_results()
+ testsuite.set('errors', str(errors))
+ testsuite.set('failures', str(failed))
+ testsuite.set('skipped', str(skipped))
+ testsuite.set('disabled', str(skipped))
+ for suite_test in suite.tests:
+ testcase = test_to_junit(suite_test)
+ testcase.set('classname', suite.name())
+ testsuite.append(testcase)
+ return testsuite
+
+def test_to_junit(t):
+ testcase = et.Element('testcase')
+ testcase.set('name', t.name())
+ testcase.set('time', str(math.ceil(t.duration)))
+ if t.status == test.Test.SKIP:
+ et.SubElement(testcase, 'skipped')
+ elif t.status == test.Test.FAIL:
+ failure = et.SubElement(testcase, 'failure')
+ failure.set('type', t.fail_type or 'failure')
+ failure.text = t.fail_message
+ if t.fail_tb:
+ system_err = et.SubElement(testcase, 'system-err')
+ system_err.text = t.fail_tb
+ elif t.status != test.Test.PASS:
+ error = et.SubElement(testcase, 'error')
+ error.text = 'could not run'
+ sout = et.SubElement(testcase, 'system-out')
+ sout.text = escape_xml_invalid_characters(t.report_stdout())
+ return testcase
+
+def trial_to_text(trial):
+ suite_failures = []
+ count_fail = 0
+ count_pass = 0
+ for suite in trial.suites:
+ if suite.passed():
+ count_pass += 1
+ else:
+ count_fail += 1
+ suite_failures.append(suite_to_text(suite))
+
+ summary = ['%s: %s' % (trial.name(), trial.status)]
+ if count_fail:
+ summary.append('%d suites failed' % count_fail)
+ if count_pass:
+ summary.append('%d suites passed' % count_pass)
+ msg = [', '.join(summary)]
+ msg.extend(suite_failures)
+ return '\n'.join(msg)
+
+def suite_to_text(suite):
+ if not suite.tests:
+ return 'no tests were run.'
+
+ passed, skipped, failed, errors = suite.count_test_results()
+ details = []
+ if failed:
+ details.append('fail: %d' % failed)
+ if errors:
+ details.append('errors: %d' % errors)
+ if passed:
+ details.append('pass: %d' % passed)
+ if skipped:
+ details.append('skip: %d' % skipped)
+ msgs = ['%s: %s (%s)' % (suite.status, suite.name(), ', '.join(details))]
+ msgs.extend([test_to_text(t) for t in suite.tests])
+ return '\n '.join(msgs)
+
+def test_to_text(t):
+ msgs = ['%s: %s' % (t.status, t.name())]
+ if t.start_timestamp:
+ msgs.append('(%.1f sec)' % t.duration)
+ if t.status == test.Test.FAIL:
+ msgs.append('%s: %s' % (t.fail_type, t.fail_message))
+ return ' '.join(msgs)
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/test.py b/src/osmo_gsm_tester/core/test.py
new file mode 100644
index 0000000..93dbf6a
--- /dev/null
+++ b/src/osmo_gsm_tester/core/test.py
@@ -0,0 +1,135 @@
+# osmo_gsm_tester: test class
+#
+# Copyright (C) 2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Pau Espin Pedrol <pespin@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import time
+import traceback
+from .. import testenv
+
+from . import log, util, resource
+
+class Test(log.Origin):
+ UNKNOWN = 'UNKNOWN' # matches junit 'error'
+ SKIP = 'skip'
+ PASS = 'pass'
+ FAIL = 'FAIL'
+
+ def __init__(self, suite_run, test_basename):
+ self.basename = test_basename
+ super().__init__(log.C_TST, self.basename)
+ self._run_dir = None
+ self.suite_run = suite_run
+ self.path = os.path.join(self.suite_run.definition.suite_dir, self.basename)
+ self.status = Test.UNKNOWN
+ self.start_timestamp = 0
+ self.duration = 0
+ self.fail_type = None
+ self.fail_message = None
+ self.log_target = None
+ self._report_stdout = None
+
+ def get_run_dir(self):
+ if self._run_dir is None:
+ self._run_dir = util.Dir(self.suite_run.get_run_dir().new_dir(self._name))
+ return self._run_dir
+
+ def run(self):
+ try:
+ self.log_target = log.FileLogTarget(self.get_run_dir().new_child('log')).set_all_levels(log.L_DBG).style_change(trace=True)
+ log.large_separator(self.suite_run.trial.name(), self.suite_run.name(), self.name(), sublevel=3)
+ self.status = Test.UNKNOWN
+ self.start_timestamp = time.time()
+ testenv.setup(self.suite_run, self)
+ with self.redirect_stdout():
+ util.run_python_file('%s.%s' % (self.suite_run.definition.name(), self.basename),
+ self.path)
+ if self.status == Test.UNKNOWN:
+ self.set_pass()
+ except Exception as e:
+ if hasattr(e, 'msg'):
+ msg = e.msg
+ else:
+ msg = str(e)
+ if isinstance(e, AssertionError):
+ # AssertionError lacks further information on what was
+ # asserted. Find the line where the code asserted:
+ msg += log.get_src_from_exc_info(sys.exc_info())
+ # add source file information to failure report
+ if hasattr(e, 'origins'):
+ msg += ' [%s]' % e.origins
+ tb_str = traceback.format_exc()
+ if isinstance(e, resource.NoResourceExn):
+ tb_str += self.suite_run.resource_status_str()
+ self.set_fail(type(e).__name__, msg, tb_str, log.get_src_from_exc_info())
+ except BaseException as e:
+ # when the program is aborted by a signal (like Ctrl-C), escalate to abort all.
+ self.err('TEST RUN ABORTED: %s' % type(e).__name__)
+ raise
+ finally:
+ if self.log_target:
+ self.log_target.remove()
+
+ def name(self):
+ l = log.get_line_for_src(self.path)
+ if l is not None:
+ return '%s:%s' % (self._name, l)
+ return super().name()
+
+ def set_fail(self, fail_type, fail_message, tb_str=None, src=4):
+ self.status = Test.FAIL
+ self.duration = time.time() - self.start_timestamp
+ self.fail_type = fail_type
+ self.fail_message = fail_message
+
+ if tb_str is None:
+ # populate an exception-less call to set_fail() with traceback info
+ tb_str = ''.join(traceback.format_stack()[:-1])
+
+ self.fail_tb = tb_str
+ self.err('%s: %s' % (self.fail_type, self.fail_message), _src=src)
+ if self.fail_tb:
+ self.log(self.fail_tb, _level=log.L_TRACEBACK)
+ self.log('Test FAILED (%.1f sec)' % self.duration)
+
+ def set_pass(self):
+ self.status = Test.PASS
+ self.duration = time.time() - self.start_timestamp
+ self.log('Test passed (%.1f sec)' % self.duration)
+
+ def set_skip(self):
+ self.status = Test.SKIP
+ self.duration = 0
+
+ def set_report_stdout(self, text):
+ 'Overwrite stdout text stored in report from inside a test'
+ self._report_stdout = text
+
+ def report_stdout(self):
+ # If test overwrote the text, provide it:
+ if self._report_stdout is not None:
+ return self._report_stdout
+ # Otherwise vy default provide the entire test log:
+ if self.log_target is not None and self.log_target.log_file_path() is not None:
+ with open(self.log_target.log_file_path(), 'r') as myfile:
+ return myfile.read()
+ else:
+ return 'test log file not available'
+
+# vim: expandtab tabstop=4 shiftwidth=4