core implementation
code bomb implementing the bulk of the osmo-gsm-tester
Change-Id: I53610becbf643ed51b90cfd9debc6992fe211ec9
diff --git a/src/osmo_gsm_tester/suite.py b/src/osmo_gsm_tester/suite.py
index fb7c34d..0b8927f 100644
--- a/src/osmo_gsm_tester/suite.py
+++ b/src/osmo_gsm_tester/suite.py
@@ -18,9 +18,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
-from . import config, log, template, utils
+import sys
+import time
+from . import config, log, template, util, resource, schema, ofono_client, osmo_nitb
+from . import test
-class Suite(log.Origin):
+class SuiteDefinition(log.Origin):
'''A test suite reserves resources for a number of tests.
Each test requires a specific number of modems, BTSs etc., which are
reserved beforehand by a test suite. This way several test suites can be
@@ -29,14 +32,122 @@
CONF_FILENAME = 'suite.conf'
- CONF_SCHEMA = {
- 'resources.nitb_iface': config.INT,
- 'resources.nitb': config.INT,
- 'resources.bts': config.INT,
- 'resources.msisdn': config.INT,
- 'resources.modem': config.INT,
- 'defaults.timeout': config.STR,
- }
+ CONF_SCHEMA = util.dict_add(
+ {
+ 'defaults.timeout': schema.STR,
+ },
+ dict([('resources.%s' % k, t) for k,t in resource.WANT_SCHEMA.items()])
+ )
+
+
+ def __init__(self, suite_dir):
+ self.set_log_category(log.C_CNF)
+ self.suite_dir = suite_dir
+ self.set_name(os.path.basename(self.suite_dir))
+ self.read_conf()
+
+ def read_conf(self):
+ with self:
+ self.dbg('reading %s' % SuiteDefinition.CONF_FILENAME)
+ if not os.path.isdir(self.suite_dir):
+ raise RuntimeError('No such directory: %r' % self.suite_dir)
+ self.conf = config.read(os.path.join(self.suite_dir,
+ SuiteDefinition.CONF_FILENAME),
+ SuiteDefinition.CONF_SCHEMA)
+ self.load_tests()
+
+
+ def load_tests(self):
+ with self:
+ self.tests = []
+ for basename in sorted(os.listdir(self.suite_dir)):
+ if not basename.endswith('.py'):
+ continue
+ self.tests.append(Test(self, basename))
+
+ def add_test(self, test):
+ with self:
+ if not isinstance(test, Test):
+ raise ValueError('add_test(): pass a Test() instance, not %s' % type(test))
+ if test.suite is None:
+ test.suite = self
+ if test.suite is not self:
+ raise ValueError('add_test(): test already belongs to another suite')
+ self.tests.append(test)
+
+
+
+class Test(log.Origin):
+
+ def __init__(self, suite, test_basename):
+ self.suite = suite
+ self.basename = test_basename
+ self.path = os.path.join(self.suite.suite_dir, self.basename)
+ super().__init__(self.path)
+ self.set_name(self.basename)
+ self.set_log_category(log.C_TST)
+
+ def run(self, suite_run):
+ assert self.suite is suite_run.definition
+ with self:
+ test.setup(suite_run, self, ofono_client)
+ success = False
+ try:
+ self.log('START')
+ with self.redirect_stdout():
+ util.run_python_file('%s.%s' % (self.suite.name(), self.name()),
+ self.path)
+ success = True
+ except resource.NoResourceExn:
+ self.err('Current resource state:\n', repr(reserved_resources))
+ raise
+ finally:
+ if success:
+ self.log('PASS')
+ else:
+ self.log('FAIL')
+
+ def name(self):
+ l = log.get_line_for_src(self.path)
+ if l is not None:
+ return '%s:%s' % (self._name, l)
+ return super().name()
+
+class SuiteRun(log.Origin):
+
+ trial = None
+ resources_pool = None
+ reserved_resources = None
+ _resource_requirements = None
+ _config = None
+ _processes = None
+
+ def __init__(self, current_trial, suite_definition, scenarios=[]):
+ self.trial = current_trial
+ self.definition = suite_definition
+ self.scenarios = scenarios
+ self.set_name(suite_definition.name())
+ self.set_log_category(log.C_TST)
+ self.resources_pool = resource.ResourcesPool()
+
+ def combined(self, conf_name):
+ combination = self.definition.conf.get(conf_name) or {}
+ for scenario in self.scenarios:
+ c = scenario.get(conf_name)
+ if c is None:
+ continue
+ config.combine(combination, c)
+ return combination
+
+ def resource_requirements(self):
+ if self._resource_requirements is None:
+ self._resource_requirements = self.combined('resources')
+ return self._resource_requirements
+
+ def config(self):
+ if self._config is None:
+ self._config = self.combined('config')
+ return self._config
class Results:
def __init__(self):
@@ -54,97 +165,162 @@
self.all_passed = bool(self.passed) and not bool(self.failed)
return self
- def __init__(self, suite_dir):
- self.set_log_category(log.C_CNF)
- self.suite_dir = suite_dir
- self.set_name(os.path.basename(self.suite_dir))
- self.read_conf()
+ def __str__(self):
+ if self.failed:
+ return 'FAIL: %d of %d tests failed:\n %s' % (
+ len(self.failed),
+ len(self.failed) + len(self.passed),
+ '\n '.join([t.name() for t in self.failed]))
+ if not self.passed:
+ return 'no tests were run.'
+ return 'pass: all %d tests passed.' % len(self.passed)
- def read_conf(self):
+ def reserve_resources(self):
+ if self.reserved_resources:
+ raise RuntimeError('Attempt to reserve resources twice for a SuiteRun')
+ self.log('reserving resources...')
with self:
- if not os.path.isdir(self.suite_dir):
- raise RuntimeError('No such directory: %r' % self.suite_dir)
- self.conf = config.read(os.path.join(self.suite_dir,
- Suite.CONF_FILENAME),
- Suite.CONF_SCHEMA)
- self.load_tests()
+ self.reserved_resources = self.resources_pool.reserve(self, self.resource_requirements())
- def load_tests(self):
- with self:
- self.tests = []
- for basename in os.listdir(self.suite_dir):
- if not basename.endswith('.py'):
- continue
- self.tests.append(Test(self, basename))
-
- def add_test(self, test):
- with self:
- if not isinstance(test, Test):
- raise ValueError('add_test(): pass a Test() instance, not %s' % type(test))
- if test.suite is None:
- test.suite = self
- if test.suite is not self:
- raise ValueError('add_test(): test already belongs to another suite')
- self.tests.append(test)
-
- def run_tests(self):
- results = Suite.Results()
- for test in self.tests:
+ def run_tests(self, names=None):
+ if not self.reserved_resources:
+ self.reserve_resources()
+ results = SuiteRun.Results()
+ for test in self.definition.tests:
+ if names and not test.name() in names:
+ continue
self._run_test(test, results)
- return results.conclude()
-
- def run_tests_by_name(self, *names):
- results = Suite.Results()
- for name in names:
- basename = name
- if not basename.endswith('.py'):
- basename = name + '.py'
- for test in self.tests:
- if basename == test.basename:
- self._run_test(test, results)
- break
+ self.stop_processes()
return results.conclude()
def _run_test(self, test, results):
try:
with self:
- test.run()
+ test.run(self)
results.add_pass(test)
except:
results.add_fail(test)
self.log_exn()
-class Test(log.Origin):
+ def remember_to_stop(self, process):
+ if self._processes is None:
+ self._processes = []
+ self._processes.append(process)
- def __init__(self, suite, test_basename):
- self.suite = suite
- self.basename = test_basename
- self.set_name(self.basename)
- self.set_log_category(log.C_TST)
- self.path = os.path.join(self.suite.suite_dir, self.basename)
- with self:
- with open(self.path, 'r') as f:
- self.script = f.read()
+ def stop_processes(self):
+ if not self._processes:
+ return
+ for process in self._processes:
+ process.terminate()
- def run(self):
- with self:
- self.code = compile(self.script, self.path, 'exec')
- with self.redirect_stdout():
- exec(self.code, self.test_globals())
- self._success = True
+ def nitb_iface(self):
+ return self.reserved_resources.get(resource.R_NITB_IFACE)
- def test_globals(self):
- test_globals = {
- 'this': utils.dict2obj({
- 'suite': self.suite.suite_dir,
- 'test': self.basename,
- }),
- 'resources': utils.dict2obj({
- }),
- }
- return test_globals
+ def nitb(self, nitb_iface=None):
+ if nitb_iface is None:
+ nitb_iface = self.nitb_iface()
+ return osmo_nitb.OsmoNitb(self, nitb_iface)
-def load(suite_dir):
- return Suite(suite_dir)
+ def bts(self):
+ return bts_obj(self, self.reserved_resources.get(resource.R_BTS))
+
+ def modem(self):
+ return modem_obj(self.reserved_resources.get(resource.R_MODEM))
+
+ def msisdn(self):
+ msisdn = self.resources_pool.next_msisdn(self.origin)
+ self.log('using MSISDN', msisdn)
+ return msisdn
+
+ def wait(self, condition, *condition_args, timeout=300, **condition_kwargs):
+ if not timeout or timeout < 0:
+ raise RuntimeError('wait() *must* time out at some point. timeout=%r' % timeout)
+
+ started = time.time()
+ while True:
+ self.poll()
+ if condition(*condition_args, **condition_kwargs):
+ return True
+ waited = time.time() - started
+ if waited > timeout:
+ return False
+ time.sleep(.1)
+
+ def sleep(self, seconds):
+ self.wait(lambda: False, timeout=seconds)
+
+ def poll(self):
+ if self._processes:
+ for process in self._processes:
+ process.poll()
+ ofono_client.poll()
+
+ def prompt(self, *msgs, **msg_details):
+ 'ask for user interaction. Do not use in tests that should run automatically!'
+ if msg_details:
+ msgs = list(msgs)
+ msgs.append('{%s}' %
+ (', '.join(['%s=%r' % (k,v)
+ for k,v in sorted(msg_details.items())])))
+ msg = ' '.join(msgs) or 'Hit Enter to continue'
+ self.log('prompt:', msg)
+ sys.__stdout__.write(msg)
+ sys.__stdout__.write('\n> ')
+ sys.__stdout__.flush()
+ entered = util.input_polling(self.poll)
+ self.log('prompt entered:', entered)
+ return entered
+
+
+loaded_suite_definitions = {}
+
+def load(suite_name):
+ global loaded_suite_definitions
+
+ suite = loaded_suite_definitions.get(suite_name)
+ if suite is not None:
+ return suite
+
+ suites_dir = config.get_suites_dir()
+ suite_dir = suites_dir.child(suite_name)
+ if not suites_dir.exists(suite_name):
+ raise RuntimeError('Suite not found: %r in %r' % (suite_name, suites_dir))
+ if not suites_dir.isdir(suite_name):
+ raise RuntimeError('Suite name found, but not a directory: %r' % (suite_dir))
+
+ suite_def = SuiteDefinition(suite_dir)
+ loaded_suite_definitions[suite_name] = suite_def
+ return suite_def
+
+def parse_suite_scenario_str(suite_scenario_str):
+ tokens = suite_scenario_str.split(':')
+ if len(tokens) > 2:
+ raise RuntimeError('invalid combination string: %r' % suite_scenario_str)
+
+ suite_name = tokens[0]
+ if len(tokens) <= 1:
+ scenario_names = []
+ else:
+ scenario_names = tokens[1].split('+')
+
+ return suite_name, scenario_names
+
+def load_suite_scenario_str(suite_scenario_str):
+ suite_name, scenario_names = parse_suite_scenario_str(suite_scenario_str)
+ suite = load(suite_name)
+ scenarios = [config.get_scenario(scenario_name) for scenario_name in scenario_names]
+ return (suite, scenarios)
+
+def bts_obj(suite_run, conf):
+ bts_type = conf.get('type')
+ log.dbg(None, None, 'create BTS object', type=bts_type)
+ bts_class = resource.KNOWN_BTS_TYPES.get(bts_type)
+ if bts_class is None:
+ raise RuntimeError('No such BTS type is defined: %r' % bts_type)
+ return bts_class(suite_run, conf)
+
+def modem_obj(conf):
+ log.dbg(None, None, 'create Modem object', conf=conf)
+ return ofono_client.Modem(conf)
# vim: expandtab tabstop=4 shiftwidth=4