| # osmo_gsm_tester: test suite |
| # |
| # Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH |
| # |
| # Author: Neels Hofmeyr <neels@hofmeyr.de> |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU Affero General Public License as |
| # published by the Free Software Foundation, either version 3 of the |
| # License, or (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU Affero General Public License for more details. |
| # |
| # You should have received a copy of the GNU Affero General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| import os |
| import sys |
| import time |
| import copy |
| from . import config, log, template, util, resource, schema, ofono_client, osmo_nitb |
| from . import test |
| |
| class Timeout(Exception): |
| pass |
| |
| class SuiteDefinition(log.Origin): |
| '''A test suite reserves resources for a number of tests. |
| Each test requires a specific number of modems, BTSs etc., which are |
| reserved beforehand by a test suite. This way several test suites can be |
| scheduled dynamically without resource conflicts arising halfway through |
| the tests.''' |
| |
| CONF_FILENAME = 'suite.conf' |
| |
| CONF_SCHEMA = util.dict_add( |
| { |
| 'defaults.timeout': schema.STR, |
| }, |
| dict([('resources.%s' % k, t) for k,t in resource.WANT_SCHEMA.items()]) |
| ) |
| |
| |
| def __init__(self, suite_dir): |
| self.set_log_category(log.C_CNF) |
| self.suite_dir = suite_dir |
| self.set_name(os.path.basename(self.suite_dir)) |
| self.read_conf() |
| |
| def read_conf(self): |
| with self: |
| self.dbg('reading %s' % SuiteDefinition.CONF_FILENAME) |
| if not os.path.isdir(self.suite_dir): |
| raise RuntimeError('No such directory: %r' % self.suite_dir) |
| self.conf = config.read(os.path.join(self.suite_dir, |
| SuiteDefinition.CONF_FILENAME), |
| SuiteDefinition.CONF_SCHEMA) |
| self.load_tests() |
| |
| def load_tests(self): |
| with self: |
| self.tests = [] |
| for basename in sorted(os.listdir(self.suite_dir)): |
| if not basename.endswith('.py'): |
| continue |
| self.tests.append(Test(self, basename)) |
| |
| def add_test(self, test): |
| with self: |
| if not isinstance(test, Test): |
| raise ValueError('add_test(): pass a Test() instance, not %s' % type(test)) |
| if test.suite is None: |
| test.suite = self |
| if test.suite is not self: |
| raise ValueError('add_test(): test already belongs to another suite') |
| self.tests.append(test) |
| |
| |
| |
| class Test(log.Origin): |
| |
| def __init__(self, suite, test_basename): |
| self.suite = suite |
| self.basename = test_basename |
| self.path = os.path.join(self.suite.suite_dir, self.basename) |
| super().__init__(self.path) |
| self.set_name(self.basename) |
| self.set_log_category(log.C_TST) |
| |
| def run(self, suite_run): |
| assert self.suite is suite_run.definition |
| with self: |
| test.setup(suite_run, self, ofono_client, sys.modules[__name__]) |
| success = False |
| try: |
| self.log('START') |
| with self.redirect_stdout(): |
| util.run_python_file('%s.%s' % (self.suite.name(), self.name()), |
| self.path) |
| success = True |
| except resource.NoResourceExn: |
| self.err('Current resource state:\n', repr(suite_run.reserved_resources)) |
| raise |
| finally: |
| if success: |
| self.log('PASS') |
| else: |
| self.log('FAIL') |
| |
| def name(self): |
| l = log.get_line_for_src(self.path) |
| if l is not None: |
| return '%s:%s' % (self._name, l) |
| return super().name() |
| |
| class SuiteRun(log.Origin): |
| |
| trial = None |
| resources_pool = None |
| reserved_resources = None |
| _resource_requirements = None |
| _config = None |
| _processes = None |
| |
| def __init__(self, current_trial, suite_scenario_str, suite_definition, scenarios=[]): |
| self.trial = current_trial |
| self.definition = suite_definition |
| self.scenarios = scenarios |
| self.set_name(suite_scenario_str) |
| self.set_log_category(log.C_TST) |
| self.resources_pool = resource.ResourcesPool() |
| |
| def combined(self, conf_name): |
| self.dbg(combining=conf_name) |
| with log.Origin(combining_scenarios=conf_name): |
| combination = copy.deepcopy(self.definition.conf.get(conf_name) or {}) |
| self.dbg(definition_conf=combination) |
| for scenario in self.scenarios: |
| with scenario: |
| c = scenario.get(conf_name) |
| self.dbg(scenario=scenario.name(), conf=c) |
| if c is None: |
| continue |
| config.combine(combination, c) |
| return combination |
| |
| def resource_requirements(self): |
| if self._resource_requirements is None: |
| self._resource_requirements = self.combined('resources') |
| return self._resource_requirements |
| |
| def config(self): |
| if self._config is None: |
| self._config = self.combined('config') |
| return self._config |
| |
| class Results: |
| def __init__(self): |
| self.passed = [] |
| self.failed = [] |
| self.all_passed = None |
| |
| def add_pass(self, test): |
| self.passed.append(test) |
| |
| def add_fail(self, test): |
| self.failed.append(test) |
| |
| def conclude(self): |
| self.all_passed = bool(self.passed) and not bool(self.failed) |
| return self |
| |
| def __str__(self): |
| if self.failed: |
| return 'FAIL: %d of %d tests failed:\n %s' % ( |
| len(self.failed), |
| len(self.failed) + len(self.passed), |
| '\n '.join([t.name() for t in self.failed])) |
| if not self.passed: |
| return 'no tests were run.' |
| return 'pass: all %d tests passed.' % len(self.passed) |
| |
| def reserve_resources(self): |
| if self.reserved_resources: |
| raise RuntimeError('Attempt to reserve resources twice for a SuiteRun') |
| self.log('reserving resources in', self.resources_pool.state_dir, '...') |
| with self: |
| self.reserved_resources = self.resources_pool.reserve(self, self.resource_requirements()) |
| |
| def run_tests(self, names=None): |
| self.log('Suite run start') |
| if not self.reserved_resources: |
| self.reserve_resources() |
| results = SuiteRun.Results() |
| for test in self.definition.tests: |
| if names and not test.name() in names: |
| continue |
| self._run_test(test, results) |
| self.stop_processes() |
| return results.conclude() |
| |
| def _run_test(self, test, results): |
| try: |
| with self: |
| test.run(self) |
| results.add_pass(test) |
| except: |
| results.add_fail(test) |
| self.log_exn() |
| |
| def remember_to_stop(self, process): |
| if self._processes is None: |
| self._processes = [] |
| self._processes.insert(0, process) |
| |
| def stop_processes(self): |
| if not self._processes: |
| return |
| for process in self._processes: |
| process.terminate() |
| |
| def nitb_iface(self): |
| return self.reserved_resources.get(resource.R_NITB_IFACE) |
| |
| def nitb(self, nitb_iface=None): |
| if nitb_iface is None: |
| nitb_iface = self.nitb_iface() |
| return osmo_nitb.OsmoNitb(self, nitb_iface) |
| |
| def bts(self): |
| return bts_obj(self, self.reserved_resources.get(resource.R_BTS)) |
| |
| def modem(self): |
| return modem_obj(self.reserved_resources.get(resource.R_MODEM)) |
| |
| def modems(self, count): |
| l = [] |
| for i in range(count): |
| l.append(self.modem()) |
| return l |
| |
| def msisdn(self): |
| msisdn = self.resources_pool.next_msisdn(self.origin) |
| self.log('using MSISDN', msisdn) |
| return msisdn |
| |
| def _wait(self, condition, condition_args, condition_kwargs, timeout, timestep): |
| if not timeout or timeout < 0: |
| raise RuntimeError('wait() *must* time out at some point. timeout=%r' % timeout) |
| if timestep < 0.1: |
| timestep = 0.1 |
| |
| started = time.time() |
| while True: |
| self.poll() |
| if condition(*condition_args, **condition_kwargs): |
| return True |
| waited = time.time() - started |
| if waited > timeout: |
| return False |
| time.sleep(timestep) |
| |
| def wait(self, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs): |
| if not self._wait(condition, condition_args, condition_kwargs, timeout, timestep): |
| raise Timeout('Timeout expired') |
| |
| def sleep(self, seconds): |
| assert seconds > 0. |
| self._wait(lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1)) |
| |
| def poll(self): |
| ofono_client.poll() |
| if self._processes: |
| for process in self._processes: |
| if process.terminated(): |
| process.log_stdout_tail() |
| process.log_stderr_tail() |
| process.raise_exn('Process ended prematurely') |
| |
| def prompt(self, *msgs, **msg_details): |
| 'ask for user interaction. Do not use in tests that should run automatically!' |
| if msg_details: |
| msgs = list(msgs) |
| msgs.append('{%s}' % |
| (', '.join(['%s=%r' % (k,v) |
| for k,v in sorted(msg_details.items())]))) |
| msg = ' '.join(msgs) or 'Hit Enter to continue' |
| self.log('prompt:', msg) |
| sys.__stdout__.write('\n\n--- PROMPT ---\n') |
| sys.__stdout__.write(msg) |
| sys.__stdout__.write('\n') |
| sys.__stdout__.flush() |
| entered = util.input_polling('> ', self.poll) |
| self.log('prompt entered:', repr(entered)) |
| return entered |
| |
| loaded_suite_definitions = {} |
| |
| def load(suite_name): |
| global loaded_suite_definitions |
| |
| suite = loaded_suite_definitions.get(suite_name) |
| if suite is not None: |
| return suite |
| |
| suites_dir = config.get_suites_dir() |
| suite_dir = suites_dir.child(suite_name) |
| if not suites_dir.exists(suite_name): |
| raise RuntimeError('Suite not found: %r in %r' % (suite_name, suites_dir)) |
| if not suites_dir.isdir(suite_name): |
| raise RuntimeError('Suite name found, but not a directory: %r' % (suite_dir)) |
| |
| suite_def = SuiteDefinition(suite_dir) |
| loaded_suite_definitions[suite_name] = suite_def |
| return suite_def |
| |
| def parse_suite_scenario_str(suite_scenario_str): |
| tokens = suite_scenario_str.split(':') |
| if len(tokens) > 2: |
| raise RuntimeError('invalid combination string: %r' % suite_scenario_str) |
| |
| suite_name = tokens[0] |
| if len(tokens) <= 1: |
| scenario_names = [] |
| else: |
| scenario_names = tokens[1].split('+') |
| |
| return suite_name, scenario_names |
| |
| def load_suite_scenario_str(suite_scenario_str): |
| suite_name, scenario_names = parse_suite_scenario_str(suite_scenario_str) |
| suite = load(suite_name) |
| scenarios = [config.get_scenario(scenario_name) for scenario_name in scenario_names] |
| return (suite_scenario_str, suite, scenarios) |
| |
| def bts_obj(suite_run, conf): |
| bts_type = conf.get('type') |
| log.dbg(None, None, 'create BTS object', type=bts_type) |
| bts_class = resource.KNOWN_BTS_TYPES.get(bts_type) |
| if bts_class is None: |
| raise RuntimeError('No such BTS type is defined: %r' % bts_type) |
| return bts_class(suite_run, conf) |
| |
| def modem_obj(conf): |
| log.dbg(None, None, 'create Modem object', conf=conf) |
| return ofono_client.Modem(conf) |
| |
| # vim: expandtab tabstop=4 shiftwidth=4 |