Create core directory to contain most of code not in object classes used by tests
Change-Id: I9aec6c55ccd71894182057d36e0025b69925d314
diff --git a/src/osmo_gsm_tester/core/__init__.py b/src/osmo_gsm_tester/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/osmo_gsm_tester/core/__init__.py
diff --git a/src/osmo_gsm_tester/core/config.py b/src/osmo_gsm_tester/core/config.py
new file mode 100644
index 0000000..9333601
--- /dev/null
+++ b/src/osmo_gsm_tester/core/config.py
@@ -0,0 +1,394 @@
+# osmo_gsm_tester: read and manage config files and global config
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# discussion for choice of config file format:
+#
+# Python syntax is insane, because it allows the config file to run arbitrary
+# python commands.
+#
+# INI file format is nice and simple, but it doesn't allow having the same
+# section numerous times (e.g. to define several modems or BTS models) and does
+# not support nesting.
+#
+# JSON has too much braces and quotes to be easy to type
+#
+# YAML formatting is lean, but:
+# - too powerful. The normal load() allows arbitrary code execution. There is
+# safe_load().
+# - allows several alternative ways of formatting, better to have just one
+# authoritative style.
+# - tries to detect types. It would be better to receive every setting as
+# simple string rather than e.g. an IMSI as an integer.
+# - e.g. an IMSI starting with a zero is interpreted as octal value, resulting
+# in super confusing error messages if the user merely forgets to quote it.
+# - does not tell me which line a config item came from, so no detailed error
+# message is possible.
+#
+# The Python ConfigParserShootout page has numerous contestants, but many of
+# those seem to be not widely used / standardized or even tested.
+# https://wiki.python.org/moin/ConfigParserShootout
+#
+# The optimum would be a stripped down YAML format.
+# In the lack of that, we shall go with yaml.load_safe() + a round trip
+# (feeding back to itself), converting keys to lowercase and values to string.
+# There is no solution for octal interpretations nor config file source lines
+# unless, apparently, we implement our own config parser.
+
+import yaml
+import os
+import copy
+
+from . import log, schema, util, template
+from .util import is_dict, is_list, Dir, get_tempdir
+
+ENV_PREFIX = 'OSMO_GSM_TESTER_'
+ENV_CONF = os.getenv(ENV_PREFIX + 'CONF')
+
+override_conf = None
+
+DEFAULT_CONFIG_LOCATIONS = [
+ '.',
+ os.path.join(os.getenv('HOME'), '.config', 'osmo-gsm-tester'),
+ '/usr/local/etc/osmo-gsm-tester',
+ '/etc/osmo-gsm-tester'
+ ]
+
+PATHS_CONF = 'paths.conf'
+DEFAULT_SUITES_CONF = 'default-suites.conf'
+DEFAULTS_CONF = 'defaults.conf'
+RESOURCES_CONF = 'resources.conf'
+
+PATH_STATE_DIR = 'state_dir'
+PATH_SUITES_DIR = 'suites_dir'
+PATH_SCENARIOS_DIR = 'scenarios_dir'
+PATHS_SCHEMA = {
+ PATH_STATE_DIR: schema.STR,
+ PATH_SUITES_DIR: schema.STR,
+ PATH_SCENARIOS_DIR: schema.STR,
+ }
+
+PATHS_TEMPDIR_STR = '$TEMPDIR'
+
+PATHS = None
+
+def _get_config_file(basename, fail_if_missing=True):
+ if override_conf:
+ locations = [ override_conf ]
+ elif ENV_CONF:
+ locations = [ ENV_CONF ]
+ else:
+ locations = DEFAULT_CONFIG_LOCATIONS
+
+ for l in locations:
+ real_l = os.path.realpath(l)
+ p = os.path.realpath(os.path.join(real_l, basename))
+ if os.path.isfile(p):
+ log.dbg('Found config file', basename, 'as', p, 'in', l, 'which is', real_l, _category=log.C_CNF)
+ return (p, real_l)
+ if not fail_if_missing:
+ return None, None
+ raise RuntimeError('configuration file not found: %r in %r' % (basename,
+ [os.path.abspath(p) for p in locations]))
+
+def get_config_file(basename, fail_if_missing=True):
+ path, found_in = _get_config_file(basename, fail_if_missing)
+ return path
+
+def read_config_file(basename, validation_schema=None, if_missing_return=False):
+ fail_if_missing = True
+ if if_missing_return is not False:
+ fail_if_missing = False
+ path = get_config_file(basename, fail_if_missing=fail_if_missing)
+ if path is None:
+ return if_missing_return
+ return read(path, validation_schema=validation_schema, if_missing_return=if_missing_return)
+
+def get_configured_path(label, allow_unset=False):
+ global PATHS
+
+ env_name = ENV_PREFIX + label.upper()
+ env_path = os.getenv(env_name)
+ if env_path:
+ real_env_path = os.path.realpath(env_path)
+ log.dbg('Found path', label, 'as', env_path, 'in', '$' + env_name, 'which is', real_env_path, _category=log.C_CNF)
+ return real_env_path
+
+ if PATHS is None:
+ paths_file, found_in = _get_config_file(PATHS_CONF)
+ PATHS = read(paths_file, PATHS_SCHEMA)
+ # sorted for deterministic regression test results
+ for key, path in sorted(PATHS.items()):
+ if not path.startswith(os.pathsep):
+ PATHS[key] = os.path.realpath(os.path.join(found_in, path))
+ log.dbg(paths_file + ': relative path', path, 'is', PATHS[key], _category=log.C_CNF)
+ p = PATHS.get(label)
+ if p is None and not allow_unset:
+ raise RuntimeError('missing configuration in %s: %r' % (PATHS_CONF, label))
+
+ log.dbg('Found path', label, 'as', p, _category=log.C_CNF)
+ if p.startswith(PATHS_TEMPDIR_STR):
+ p = os.path.join(get_tempdir(), p[len(PATHS_TEMPDIR_STR):])
+ log.dbg('Path', label, 'contained', PATHS_TEMPDIR_STR, 'and becomes', p, _category=log.C_CNF)
+ return p
+
+def get_state_dir():
+ return Dir(get_configured_path(PATH_STATE_DIR))
+
+def get_suites_dir():
+ return Dir(get_configured_path(PATH_SUITES_DIR))
+
+def get_scenarios_dir():
+ return Dir(get_configured_path(PATH_SCENARIOS_DIR))
+
+def read(path, validation_schema=None, if_missing_return=False):
+ log.ctx(path)
+ if not os.path.isfile(path) and if_missing_return is not False:
+ return if_missing_return
+ with open(path, 'r') as f:
+ config = yaml.safe_load(f)
+ config = _standardize(config)
+ if validation_schema:
+ schema.validate(config, validation_schema)
+ return config
+
+def write(path, config):
+ log.ctx(path)
+ with open(path, 'w') as f:
+ f.write(tostr(config))
+
+def tostr(config):
+ return _tostr(_standardize(config))
+
+def _tostr(config):
+ return yaml.dump(config, default_flow_style=False)
+
+def _standardize_item(item):
+ if item is None:
+ return None
+ if isinstance(item, (tuple, list)):
+ return [_standardize_item(i) for i in item]
+ if isinstance(item, dict):
+ return dict([(key.lower(), _standardize_item(val)) for key,val in item.items()])
+ return str(item)
+
+def _standardize(config):
+ config = yaml.safe_load(_tostr(_standardize_item(config)))
+ return config
+
+def get_defaults(for_kind):
+ defaults = read_config_file(DEFAULTS_CONF, if_missing_return={})
+ return defaults.get(for_kind, {})
+
+class Scenario(log.Origin, dict):
+ def __init__(self, name, path, param_list=[]):
+ super().__init__(log.C_TST, name)
+ self.path = path
+ self.param_list = param_list
+
+ @classmethod
+ def count_cont_char_backward(cls, str, before_pos, c):
+ n = 0
+ i = before_pos - 1
+ while i >= 0:
+ if str[i] != c:
+ break
+ n += 1
+ i -= 1
+ return n
+
+ @classmethod
+ def split_scenario_parameters(cls, str):
+ cur_pos = 0
+ param_li = []
+ saved = ''
+ # Split into a list, but we want to escape '\,' to avoid splitting parameters containing commas.
+ while True:
+ prev_pos = cur_pos
+ cur_pos = str.find(',', prev_pos)
+ if cur_pos == -1:
+ param_li.append(str[prev_pos:])
+ break
+ if cur_pos == 0:
+ param_li.append('')
+ elif cur_pos != 0 and str[cur_pos - 1] == '\\' and cls.count_cont_char_backward(str, cur_pos, '\\') % 2 == 1:
+ saved += str[prev_pos:cur_pos - 1] + ','
+ else:
+ param_li.append(saved + str[prev_pos:cur_pos])
+ saved = ''
+ cur_pos += 1
+ i = 0
+ # Also escape '\\' -> '\'
+ while i < len(param_li):
+ param_li[i] = param_li[i].replace('\\\\', '\\')
+ i += 1
+ return param_li
+
+ @classmethod
+ def from_param_list_str(cls, name, path, param_list_str):
+ param_list = cls.split_scenario_parameters(param_list_str)
+ return cls(name, path, param_list)
+
+ def read_from_file(self, validation_schema):
+ with open(self.path, 'r') as f:
+ config_str = f.read()
+ if len(self.param_list) != 0:
+ param_dict = {}
+ i = 1
+ for param in self.param_list:
+ param_dict['param' + str(i)] = param
+ i += 1
+ self.dbg(param_dict=param_dict)
+ config_str = template.render_strbuf_inline(config_str, param_dict)
+ config = yaml.safe_load(config_str)
+ config = _standardize(config)
+ if validation_schema:
+ schema.validate(config, validation_schema)
+ self.update(config)
+
+def get_scenario(name, validation_schema=None):
+ scenarios_dir = get_scenarios_dir()
+ if not name.endswith('.conf'):
+ name = name + '.conf'
+ is_parametrized_file = '@' in name
+ param_list = []
+ path = scenarios_dir.child(name)
+ if not is_parametrized_file:
+ if not os.path.isfile(path):
+ raise RuntimeError('No such scenario file: %r' % path)
+ sc = Scenario(name, path)
+ else: # parametrized scenario:
+ # Allow first matching complete matching names (eg: scenario@param1,param2.conf),
+ # this allows setting specific content in different files for specific values.
+ if not os.path.isfile(path):
+ # get "scenario@.conf" from "scenario@param1,param2.conf":
+ prefix_name = name[:name.index("@")+1] + '.conf'
+ path = scenarios_dir.child(prefix_name)
+ if not os.path.isfile(path):
+ raise RuntimeError('No such scenario file: %r (nor %s)' % (path, name))
+ # At this point, we have existing file path. Let's now scrap the parameter(s):
+ # get param1,param2 str from scenario@param1,param2.conf
+ param_list_str = name.split('@', 1)[1][:-len('.conf')]
+ sc = Scenario.from_param_list_str(name, path, param_list_str)
+ sc.read_from_file(validation_schema)
+ return sc
+
+def add(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot add to dict a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ log.ctx(key=key)
+ add(dest_val, val)
+ return
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot add to list a value of type: %r' % type(src))
+ dest.extend(src)
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
+ % (dest, src))
+
+def combine(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot combine dict with a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ log.ctx(key=key)
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ combine(dest_val, val)
+ return
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot combine list with a value of type: %r' % type(src))
+ # Validate that all elements in both lists are of the same type:
+ t = util.list_validate_same_elem_type(src + dest)
+ if t is None:
+ return # both lists are empty, return
+ # For lists of complex objects, we expect them to be sorted lists:
+ if t in (dict, list, tuple):
+ for i in range(len(dest)):
+ log.ctx(idx=i)
+ src_it = src[i] if i < len(src) else util.empty_instance_type(t)
+ combine(dest[i], src_it)
+ for i in range(len(dest), len(src)):
+ log.ctx(idx=i)
+ dest.append(src[i])
+ else: # for lists of basic elements, we handle them as unsorted sets:
+ for elem in src:
+ if elem not in dest:
+ dest.append(elem)
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
+ % (dest, src))
+
+def overlay(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot combine dict with a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ log.ctx(key=key)
+ dest_val = dest.get(key)
+ dest[key] = overlay(dest_val, val)
+ return dest
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot combine list with a value of type: %r' % type(src))
+ copy_len = min(len(src),len(dest))
+ for i in range(copy_len):
+ log.ctx(idx=i)
+ dest[i] = overlay(dest[i], src[i])
+ for i in range(copy_len, len(src)):
+ dest.append(src[i])
+ return dest
+ return src
+
+def replicate_times(d):
+ '''
+ replicate items that have a "times" > 1
+
+ 'd' is a dict matching WANT_SCHEMA, which is the same as
+ the RESOURCES_SCHEMA, except each entity that can be reserved has a 'times'
+ field added, to indicate how many of those should be reserved.
+ '''
+ d = copy.deepcopy(d)
+ for key, item_list in d.items():
+ idx = 0
+ while idx < len(item_list):
+ item = item_list[idx]
+ times = int(item.pop('times', 1))
+ for j in range(1, times):
+ item_list.insert(idx + j, copy.deepcopy(item))
+ idx += times
+ return d
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/event_loop.py b/src/osmo_gsm_tester/core/event_loop.py
new file mode 100644
index 0000000..fe88ef4
--- /dev/null
+++ b/src/osmo_gsm_tester/core/event_loop.py
@@ -0,0 +1,121 @@
+# osmo_gsm_tester: Event loop
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Pau Espin Pedrol <pespin@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import time
+from gi.repository import GLib, GObject
+
+from . import log
+
+class DeferredHandling:
+
+ def __init__(self):
+ self.defer_queue = []
+
+ def handle_queue(self):
+ while self.defer_queue:
+ handler, args, kwargs = self.defer_queue.pop(0)
+ handler(*args, **kwargs)
+
+ def defer(self, handler, *args, **kwargs):
+ self.defer_queue.append((handler, args, kwargs))
+
+class WaitRequest:
+
+ def __init__(self, condition, condition_args, condition_kwargs, timeout, timestep):
+ self.timeout_ack = False
+ self.condition_ack = False
+ self.timeout_started = time.time()
+ self.timeout = timeout
+ self.condition = condition
+ self.condition_args = condition_args
+ self.condition_kwargs = condition_kwargs
+
+ def condition_check(self):
+ #print("_wait_condition_check")
+ waited = time.time() - self.timeout_started
+ if self.condition(*self.condition_args, **self.condition_kwargs):
+ self.condition_ack = True
+ elif waited > self.timeout:
+ self.timeout_ack = True
+
+class EventLoop:
+
+ def __init__(self):
+ self.poll_funcs = []
+ self.gloop = GLib.MainLoop()
+ self.gctx = self.gloop.get_context()
+ self.deferred_handling = DeferredHandling()
+
+ def _trigger_cb_func(self, user_data):
+ self.defer(user_data)
+ return True #to retrigger the timeout
+
+ def defer(self, handler, *args, **kwargs):
+ self.deferred_handling.defer(handler, *args, **kwargs)
+
+ def register_poll_func(self, func, timestep=1):
+ id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, func) # in 1/1000th of a sec
+ self.poll_funcs.append((func, id))
+
+ def unregister_poll_func(self, func):
+ for pair in self.poll_funcs:
+ f, id = pair
+ if f == func:
+ GObject.source_remove(id)
+ self.poll_funcs.remove(pair)
+ return
+
+ def poll(self, may_block=False):
+ self.gctx.iteration(may_block)
+ self.deferred_handling.handle_queue()
+
+ def wait_no_raise(self, log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+ if not timeout or timeout < 0:
+ self = log_obj
+ raise log.Error('wait() *must* time out at some point.', timeout=timeout)
+ if timestep < 0.1:
+ timestep = 0.1
+
+ wait_req = WaitRequest(condition, condition_args, condition_kwargs, timeout, timestep)
+ wait_id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, wait_req.condition_check)
+ while True:
+ try:
+ self.poll(may_block=True)
+ except Exception: # cleanup of temporary resources in the wait scope
+ GObject.source_remove(wait_id)
+ raise
+ if wait_req.condition_ack or wait_req.timeout_ack:
+ GObject.source_remove(wait_id)
+ success = wait_req.condition_ack
+ return success
+
+ def wait(self, log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
+ if not self.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+ log.ctx(log_obj)
+ raise log.Error('Wait timeout', condition=condition, timeout=timeout, timestep=timestep)
+
+ def sleep(self, log_obj, seconds):
+ assert seconds > 0.
+ self.wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=seconds)
+
+
+MainLoop = EventLoop()
+
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/log.py b/src/osmo_gsm_tester/core/log.py
new file mode 100644
index 0000000..d60bf0b
--- /dev/null
+++ b/src/osmo_gsm_tester/core/log.py
@@ -0,0 +1,604 @@
+# osmo_gsm_tester: global logging
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import time
+import traceback
+import contextlib
+import atexit
+from datetime import datetime # we need this for strftime as the one from time doesn't carry microsecond info
+from inspect import getframeinfo, stack
+
+from .util import is_dict
+
+L_ERR = 30
+L_LOG = 20
+L_DBG = 10
+L_TRACEBACK = 'TRACEBACK'
+
+LEVEL_STRS = {
+ 'err': L_ERR,
+ 'log': L_LOG,
+ 'dbg': L_DBG,
+ }
+
+C_NET = 'net'
+C_RUN = 'run'
+C_TST = 'tst'
+C_CNF = 'cnf'
+C_BUS = 'bus'
+C_DEFAULT = '---'
+
+LOG_CTX_VAR = '_log_ctx_'
+
+def dbg(*messages, _origin=None, _category=None, _src=None, **named_items):
+ '''Log on debug level. See also log()'''
+ _log(messages, named_items, origin=_origin, category=_category, level=L_DBG, src=_src)
+
+def log(*messages, _origin=None, _category=None, _level=L_LOG, _src=None, **named_items):
+ '''Log a message. The origin, an Origin class instance, is normally
+ determined by stack magic, only pass _origin to override. The category is
+ taken from the origin. _src is normally an integer indicating how many
+ levels up the stack sits the interesting source file to log about, can also
+ be a string. The log message is composed of all *messages and
+ **named_items, for example:
+ log('frobnicate:', thing, key=current_key, prop=erty)
+ '''
+ _log(messages, named_items, origin=_origin, category=_category, level=_level, src=_src)
+
+def err(*messages, _origin=None, _category=None, _src=None, **named_items):
+ '''Log on error level. See also log()'''
+ _log(messages, named_items, origin=_origin, category=_category, level=L_ERR, src=_src)
+
+def _log(messages=[], named_items={}, origin=None, category=None, level=L_LOG, src=None):
+ if origin is None:
+ origin = Origin.find_on_stack()
+ if category is None and isinstance(origin, Origin):
+ category = origin._log_category
+ if src is None:
+ # two levels up
+ src = 2
+ if isinstance(src, int):
+ src = get_src_from_caller(src + 1)
+ for target in LogTarget.all_targets:
+ target.log(origin, category, level, src, messages, named_items)
+
+
+LONG_DATEFMT = '%Y-%m-%d_%H:%M:%S.%f'
+DATEFMT = '%H:%M:%S.%f'
+
+# may be overridden by regression tests
+get_process_id = lambda: '%d-%d' % (os.getpid(), time.time())
+
+class Error(Exception):
+ def __init__(self, *messages, **named_items):
+ super().__init__(compose_message(messages, named_items))
+
+class LogTarget:
+ all_targets = []
+
+ do_log_time = None
+ do_log_category = None
+ do_log_level = None
+ do_log_origin = None
+ do_log_all_origins_on_levels = None
+ do_log_traceback = None
+ do_log_src = None
+ origin_width = None
+ origin_fmt = None
+ all_levels = None
+
+ # redirected by logging test
+ get_time_str = lambda self: datetime.now().strftime(self.log_time_fmt)
+
+ # sink that gets each complete logging line
+ log_write_func = None
+
+ category_levels = None
+
+ def __init__(self, log_write_func=None):
+ if log_write_func is None:
+ log_write_func = sys.__stdout__.write
+ self.log_write_func = log_write_func
+ self.category_levels = {}
+ self.style()
+ LogTarget.all_targets.append(self)
+
+ def remove(self):
+ LogTarget.all_targets.remove(self)
+
+ def style(self, time=True, time_fmt=DATEFMT, category=True, level=True, origin=True, origin_width=32, src=True, trace=False, all_origins_on_levels=(L_ERR, L_LOG, L_DBG, L_TRACEBACK)):
+ '''
+ set all logging format aspects, to defaults if not passed:
+ time: log timestamps;
+ time_fmt: format of timestamps;
+ category: print the logging category (three letters);
+ level: print the logging level, unless it is L_LOG;
+ origin: print which object(s) the message originated from;
+ origin_width: fill up the origin string with whitespace to this witdh;
+ src: log the source file and line number the log comes from;
+ trace: on exceptions, log the full stack trace;
+ all_origins_on_levels: pass a tuple of logging levels that should have a full trace of origins
+ '''
+ self.log_time_fmt = time_fmt
+ self.do_log_time = bool(time)
+ if not self.log_time_fmt:
+ self.do_log_time = False
+ self.do_log_category = bool(category)
+ self.do_log_level = bool(level)
+ self.do_log_origin = bool(origin)
+ self.origin_width = int(origin_width)
+ self.origin_fmt = '{:>%ds}' % self.origin_width
+ self.do_log_src = src
+ self.do_log_traceback = trace
+ self.do_log_all_origins_on_levels = tuple(all_origins_on_levels or [])
+ return self
+
+ def style_change(self, time=None, time_fmt=None, category=None, level=None, origin=None, origin_width=None, src=None, trace=None, all_origins_on_levels=None):
+ 'modify only the given aspects of the logging format'
+ self.style(
+ time=(time if time is not None else self.do_log_time),
+ time_fmt=(time_fmt if time_fmt is not None else self.log_time_fmt),
+ category=(category if category is not None else self.do_log_category),
+ level=(level if level is not None else self.do_log_level),
+ origin=(origin if origin is not None else self.do_log_origin),
+ origin_width=(origin_width if origin_width is not None else self.origin_width),
+ src=(src if src is not None else self.do_log_src),
+ trace=(trace if trace is not None else self.do_log_traceback),
+ all_origins_on_levels=(all_origins_on_levels if all_origins_on_levels is not None else self.do_log_all_origins_on_levels),
+ )
+ return self
+
+ def set_level(self, category, level):
+ 'set global logging log.L_* level for a given log.C_* category'
+ self.category_levels[category] = level
+ return self
+
+ def set_all_levels(self, level):
+ self.all_levels = level
+ return self
+
+ def is_enabled(self, category, level):
+ if level == L_TRACEBACK:
+ return self.do_log_traceback
+ if self.all_levels is not None:
+ is_level = self.all_levels
+ else:
+ is_level = self.category_levels.get(category)
+ if is_level is None:
+ is_level = L_LOG
+ if level < is_level:
+ return False
+ return True
+
+ def log(self, origin, category, level, src, messages, named_items):
+ if category and len(category) != 3:
+ self.log_write_func('WARNING: INVALID LOGGING CATEGORY %r\n' % category)
+ self.log_write_func('origin=%r category=%r level=%r\n' % (origin, category, level));
+
+ if not category:
+ category = C_DEFAULT
+ if not self.is_enabled(category, level):
+ return
+
+ log_pre = []
+ if self.do_log_time:
+ log_pre.append(self.get_time_str())
+
+ if self.do_log_category:
+ log_pre.append(category)
+
+ deeper_origins = ''
+ if self.do_log_origin:
+ if origin is None:
+ name = '-'
+ elif isinstance(origin, Origin):
+ name = origin.name()
+ # only log ancestry when there is more than one
+ if origin._parent is not None:
+ deeper_origins = origin.ancestry_str()
+ elif isinstance(origin, str):
+ name = origin or None
+ if not name:
+ name = str(origin.__class__.__name__)
+ log_pre.append(self.origin_fmt.format(name))
+
+ if self.do_log_level and level != L_LOG:
+ loglevel = '%s: ' % (level_str(level) or ('loglevel=' + str(level)))
+ else:
+ loglevel = ''
+
+ log_line = [compose_message(messages, named_items)]
+
+ if deeper_origins and (level in self.do_log_all_origins_on_levels):
+ log_line.append(' [%s]' % deeper_origins)
+
+ if self.do_log_src and src:
+ log_line.append(' [%s]' % str(src))
+
+ log_str = '%s%s%s%s' % (' '.join(log_pre),
+ ': ' if log_pre else '',
+ loglevel,
+ ' '.join(log_line))
+
+ if not log_str.endswith('\n'):
+ log_str = log_str + '\n'
+ self.log_write_func(log_str)
+
+ def large_separator(self, *msgs, sublevel=1, space_above=True):
+ sublevel = max(1, min(3, sublevel))
+ msg = ' '.join(msgs)
+ sep = '-' * int(23 * (5 - sublevel))
+ if not msg:
+ msg = sep
+ lines = [sep, msg, sep, '']
+ if space_above:
+ lines.insert(0, '')
+ self.log_write_func('\n'.join(lines))
+
+def level_str(level):
+ if level == L_TRACEBACK:
+ return L_TRACEBACK
+ if level <= L_DBG:
+ return 'DBG'
+ if level <= L_LOG:
+ return 'LOG'
+ return 'ERR'
+
+def _log_all_targets(origin, category, level, src, messages, named_items=None):
+ if origin is None:
+ origin = Origin.find_on_stack()
+ if isinstance(src, int):
+ src = get_src_from_caller(src + 1)
+ for target in LogTarget.all_targets:
+ target.log(origin, category, level, src, messages, named_items)
+
+def large_separator(*msgs, sublevel=1, space_above=True):
+ for target in LogTarget.all_targets:
+ target.large_separator(*msgs, sublevel=sublevel, space_above=space_above)
+
+def get_src_from_caller(levels_up=1):
+ # Poke into internal to avoid hitting the linecache which will make one or
+ # more calls to stat(2).
+ frame = sys._getframe(levels_up)
+ return '%s:%d' % (os.path.basename(frame.f_code.co_filename), frame.f_lineno)
+
+def get_src_from_exc_info(exc_info=None, levels_up=1):
+ if exc_info is None:
+ exc_info = sys.exc_info()
+ ftb = traceback.extract_tb(exc_info[2])
+ f,l,m,c = ftb[-levels_up]
+ f = os.path.basename(f)
+ return '%s:%s: %s' % (f, l, c)
+
+def get_line_for_src(src_path):
+ '''find a given source file on the stack and return the line number for
+ that file. (Used to indicate the position in a test script.)'''
+ etype, exception, tb = sys.exc_info()
+ if tb:
+ ftb = traceback.extract_tb(tb)
+ for f,l,m,c in ftb:
+ if f.endswith(src_path):
+ return l
+
+ for frame in stack():
+ caller = getframeinfo(frame[0])
+ if caller.filename.endswith(src_path):
+ return caller.lineno
+ return None
+
+def ctx(*name_items, **detail_items):
+ '''Store log context in the current frame. This string will appear as
+ origin information for exceptions thrown within the calling scope.'''
+ if not name_items and not detail_items:
+ ctx_obj(None)
+ if not detail_items and len(name_items) == 1 and isinstance(name_items[0], Origin):
+ ctx_obj(name_items[0])
+ else:
+ ctx_obj(compose_message(name_items, detail_items))
+
+def ctx_obj(origin_or_str):
+ f = sys._getframe(2)
+ if origin_or_str is None:
+ f.f_locals.pop(LOG_CTX_VAR, None)
+ else:
+ f.f_locals[LOG_CTX_VAR] = origin_or_str
+
+class OriginLoopError(Error):
+ pass
+
+class Origin:
+ '''
+ Base class for all classes that want to appear in the log.
+ It is a simple named marker to find in the stack frames.
+ This depends on the object instance named 'self' in each member class.
+
+ In addition, it provides a logging category and a globally unique ID for
+ each instance.
+
+ Each child class *must* call super().__init__(category, name), to allow
+ noting its parent origins.
+ '''
+
+ _global_id = None
+
+ _name = None
+ _origin_id = None
+ _log_category = None
+ _parent = None
+
+ @staticmethod
+ def find_on_stack(except_obj=None, f=None):
+ if f is None:
+ f = sys._getframe(2)
+ log_ctx_obj = None
+ origin = None
+ while f is not None:
+ l = f.f_locals
+
+ # if there is a log_ctx in the scope, add it, pointing to the next
+ # actual Origin class in the stack
+ log_ctx = l.get(LOG_CTX_VAR)
+ if log_ctx:
+ if isinstance(log_ctx, Origin):
+ new_log_ctx_obj = log_ctx
+ else:
+ new_log_ctx_obj = Origin(None, log_ctx, find_parent=False)
+ if log_ctx_obj is None:
+ log_ctx_obj = new_log_ctx_obj
+ else:
+ log_ctx_obj.highest_ancestor()._set_parent(new_log_ctx_obj)
+
+ obj = l.get('self')
+ if obj and isinstance(obj, Origin) and (except_obj is not obj):
+ origin = obj
+ break
+ f = f.f_back
+
+ if (origin is not None) and (log_ctx_obj is not None):
+ log_ctx_obj.highest_ancestor()._set_parent(origin)
+ p = log_ctx_obj
+ while p:
+ p._set_log_category(origin._log_category)
+ p = p._parent
+ if log_ctx_obj is not None:
+ return log_ctx_obj
+ # may return None
+ return origin
+
+ @staticmethod
+ def find_in_exc_info(exc_info):
+ tb = exc_info[2]
+ # get last tb ... I hope that's right
+ while tb.tb_next:
+ tb = tb.tb_next
+ return Origin.find_on_stack(f=tb.tb_frame)
+
+ def __init__(self, category, *name_items, find_parent=True, **detail_items):
+ self._set_log_category(category)
+ self.set_name(*name_items, **detail_items)
+ if find_parent:
+ self._set_parent(Origin.find_on_stack(except_obj=self))
+
+ def _set_parent(self, parent):
+ # make sure to avoid loops
+ p = parent
+ while p:
+ if p is self:
+ raise OriginLoopError('Origin parent loop')
+ p = p._parent
+ self._parent = parent
+
+ def set_name(self, *name_items, **detail_items):
+ '''Change the origin's name for log output; rather use the constructor.
+ This function can be used to change the name in case naming info
+ becomes available only after class creation (like a pid)'''
+ if name_items:
+ name = '-'.join([str(i) for i in name_items])
+ elif not detail_items:
+ name = self.__class__.__name__
+ else:
+ name = ''
+ if detail_items:
+ details = '(%s)' % (', '.join([("%s=%r" % (k,v))
+ for k,v in sorted(detail_items.items())]))
+ else:
+ details = ''
+ self._name = name + details
+
+ def name(self):
+ return self._name or self.__class__.__name__
+
+ __str__ = name
+ __repr__ = name
+
+ def origin_id(self):
+ if not self._origin_id:
+ if not Origin._global_id:
+ Origin._global_id = get_process_id()
+ self._origin_id = '%s-%s' % (self.name(), Origin._global_id)
+ return self._origin_id
+
+ def _set_log_category(self, category):
+ self._log_category = category
+
+ def redirect_stdout(self):
+ return contextlib.redirect_stdout(SafeRedirectStdout(self))
+
+ def ancestry(self):
+ origins = []
+ n = 10
+ origin = self
+ while origin:
+ origins.insert(0, origin)
+ origin = origin._parent
+ n -= 1
+ if n < 0:
+ break
+ return origins
+
+ def ancestry_str(self):
+ return '↪'.join([o.name() for o in self.ancestry()])
+
+ def highest_ancestor(self):
+ if self._parent:
+ return self._parent.highest_ancestor()
+ return self
+
+ def log(self, *messages, _src=3, **named_items):
+ '''same as log.log() but passes this object to skip looking up an origin'''
+ log(*messages, _origin=self, _src=_src, **named_items)
+
+ def dbg(self, *messages, _src=3, **named_items):
+ '''same as log.dbg() but passes this object to skip looking up an origin'''
+ dbg(*messages, _origin=self, _src=_src, **named_items)
+
+ def err(self, *messages, _src=3, **named_items):
+ '''same as log.err() but passes this object to skip looking up an origin'''
+ err(*messages, _origin=self, _src=_src, **named_items)
+
+class SafeRedirectStdout:
+ '''
+ To be able to use 'print' in test scripts, this is used to redirect stdout
+ to a test class' log() function. However, it turns out doing that breaks
+ python debugger sessions -- it uses extended features of stdout, and will
+ fail dismally if it finds this wrapper in sys.stdout. Luckily, overriding
+ __getattr__() to return the original sys.__stdout__ attributes for anything
+ else than write() makes the debugger session work nicely again!
+ '''
+ _log_line_buf = None
+
+ def __init__(self, origin):
+ self.origin = origin
+
+ def write(self, message):
+ lines = message.splitlines()
+ if not lines:
+ return
+ if self._log_line_buf:
+ lines[0] = self._log_line_buf + lines[0]
+ self._log_line_buf = None
+ if not message.endswith('\n'):
+ self._log_line_buf = lines[-1]
+ lines = lines[:-1]
+ for line in lines:
+ _log(messages=(line,),
+ origin=self.origin, level=L_LOG, src=2)
+
+ def __getattr__(self, name):
+ return sys.__stdout__.__getattribute__(name)
+
+def trace(exc_info=None, origin=None):
+ if exc_info is None:
+ exc_info = sys.exc_info()
+ if origin is None:
+ origin = Origin.find_in_exc_info(exc_info)
+ _log(messages=traceback.format_exception(*exc_info),
+ origin=origin, level=L_TRACEBACK)
+
+def log_exn():
+ exc_info = sys.exc_info()
+ origin = Origin.find_in_exc_info(exc_info)
+
+ etype, exception, tb = exc_info
+ if hasattr(exception, 'msg'):
+ msg = exception.msg
+ else:
+ msg = str(exception)
+
+ trace(exc_info, origin=origin)
+ _log(messages=('%s:' % str(etype.__name__), msg),
+ origin=origin, level=L_ERR, src=get_src_from_exc_info(exc_info))
+
+
+def set_all_levels(level):
+ for target in LogTarget.all_targets:
+ target.set_all_levels(level)
+
+def set_level(category, level):
+ for target in LogTarget.all_targets:
+ target.set_level(category, level)
+
+def style(**kwargs):
+ for target in LogTarget.all_targets:
+ target.style(**kwargs)
+
+def style_change(**kwargs):
+ for target in LogTarget.all_targets:
+ target.style_change(**kwargs)
+
+class TestsTarget(LogTarget):
+ 'LogTarget producing deterministic results for regression tests'
+ def __init__(self, log_write_func=None):
+ super().__init__(log_write_func)
+ self.style(time=False, src=False, origin_width=0)
+
+class FileLogTarget(LogTarget):
+ 'LogTarget to log to a file system path'
+ log_file = None
+
+ def __init__(self, log_path):
+ atexit.register(self.at_exit)
+ self.path = log_path
+ self.log_file = open(log_path, 'a')
+ super().__init__(self.write_to_log_and_flush)
+
+ def remove(self):
+ super().remove()
+ self.log_file.close()
+ self.log_file = None
+
+ def write_to_log_and_flush(self, msg):
+ self.log_file.write(msg)
+ self.log_file.flush()
+
+ def at_exit(self):
+ if self.log_file is not None:
+ self.log_file.flush()
+ self.log_file.close()
+
+ def log_file_path(self):
+ return self.path
+
+def run_logging_exceptions(func, *func_args, return_on_failure=None, **func_kwargs):
+ try:
+ return func(*func_args, **func_kwargs)
+ except:
+ log_exn()
+ return return_on_failure
+
+def _compose_named_items(item):
+ 'make sure dicts are output sorted, for test expectations'
+ if is_dict(item):
+ return '{%s}' % (', '.join(
+ ['%s=%s' % (k, _compose_named_items(v))
+ for k,v in sorted(item.items())]))
+ return repr(item)
+
+def compose_message(messages, named_items):
+ msgs = [str(m) for m in messages]
+
+ if named_items:
+ # unfortunately needs to be sorted to get deterministic results
+ msgs.append(_compose_named_items(named_items))
+
+ return ' '.join(msgs)
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/process.py b/src/osmo_gsm_tester/core/process.py
new file mode 100644
index 0000000..5d02ab5
--- /dev/null
+++ b/src/osmo_gsm_tester/core/process.py
@@ -0,0 +1,424 @@
+# osmo_gsm_tester: process management
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import subprocess
+import signal
+from abc import ABCMeta, abstractmethod
+from datetime import datetime
+
+from . import log
+from .event_loop import MainLoop
+from .util import Dir
+
+class TerminationStrategy(log.Origin, metaclass=ABCMeta):
+ """A baseclass for terminating a collection of processes."""
+
+ def __init__(self):
+ self._processes = []
+
+ def add_process(self, process):
+ """Remembers a process that needs to be terminated."""
+ self._processes.append(process)
+
+ @abstractmethod
+ def terminate_all(self):
+ "Terminates all scheduled processes and waits for the termination."""
+ pass
+
+
+class ParallelTerminationStrategy(TerminationStrategy):
+ """Processes will be terminated in parallel."""
+
+ def _prune_dead_processes(self, poll_first):
+ """Removes all dead processes from the list."""
+ # Remove all processes that terminated!
+ self._processes = list(filter(lambda proc: proc.is_running(poll_first), self._processes))
+
+ def _build_process_map(self):
+ """Builds a mapping from pid to process."""
+ self._process_map = {}
+ for process in self._processes:
+ pid = process.pid()
+ if pid is None:
+ continue
+ self._process_map[pid] = process
+
+ def _poll_once(self):
+ """Polls for to be collected children once."""
+ pid, result = os.waitpid(0, os.WNOHANG)
+ # Did some other process die?
+ if pid == 0:
+ return False
+ proc = self._process_map.get(pid)
+ if proc is None:
+ self.dbg("Unknown process with pid(%d) died." % pid)
+ return False
+ # Update the process state and forget about it
+ self.log("PID %d died..." % pid)
+ proc.result = result
+ proc.cleanup()
+ self._processes.remove(proc)
+ del self._process_map[pid]
+ return True
+
+ def _poll_for_termination(self, time_to_wait_for_term=5):
+ """Waits for the termination of processes until timeout|all ended."""
+
+ wait_step = 0.001
+ waited_time = 0
+ while len(self._processes) > 0:
+ # Collect processes until there are none to be collected.
+ while True:
+ try:
+ if not self._poll_once():
+ break
+ except ChildProcessError:
+ break
+
+ # All processes died and we can return before sleeping
+ if len(self._processes) == 0:
+ break
+ waited_time += wait_step
+ # make wait_step approach 1.0
+ wait_step = (1. + 5. * wait_step) / 6.
+ if waited_time >= time_to_wait_for_term:
+ break
+ time.sleep(wait_step)
+
+ def terminate_all(self):
+ num_processes = len(self._processes)
+ self.dbg("Scheduled to terminate %d processes." % num_processes)
+ if num_processes == 0:
+ return
+ self._prune_dead_processes(True)
+ self._build_process_map()
+
+ # Iterate through all signals.
+ for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGKILL]:
+ self.dbg("Starting to kill with %s" % sig.name)
+ for process in self._processes:
+ process.kill(sig)
+ if sig == signal.SIGKILL:
+ continue
+ self._poll_for_termination()
+ if len(self._processes) == 0:
+ return
+
+
+class Process(log.Origin):
+
+ def __init__(self, name, run_dir, popen_args, **popen_kwargs):
+ super().__init__(log.C_RUN, name)
+ self.process_obj = None
+ self.result = None
+ self.killed = None
+ self.name_str = name
+ self.run_dir = run_dir
+ self.popen_args = popen_args
+ self.popen_kwargs = popen_kwargs
+ self.outputs = {}
+ if not isinstance(self.run_dir, Dir):
+ self.run_dir = Dir(os.path.abspath(str(self.run_dir)))
+
+ def set_env(self, key, value):
+ env = self.popen_kwargs.get('env') or {}
+ env[key] = value
+ self.popen_kwargs['env'] = env
+
+ def make_output_log(self, name):
+ '''
+ create a non-existing log output file in run_dir to pipe stdout and
+ stderr from this process to.
+ '''
+ path = self.run_dir.new_child(name)
+ f = open(path, 'w')
+ self.dbg(path)
+ f.write('(launched: %s)\n' % datetime.now().strftime(log.LONG_DATEFMT))
+ f.flush()
+ self.outputs[name] = (path, f)
+ return f
+
+ def launch(self):
+ preexec_fn = None
+ log.dbg('cd %r; %s %s' % (
+ os.path.abspath(str(self.run_dir)),
+ ' '.join(['%s=%r'%(k,v) for k,v in self.popen_kwargs.get('env', {}).items()]),
+ ' '.join(self.popen_args)))
+
+ if self.popen_args[0] == "sudo":
+ # sudo drops forwarding of signals sent by processes of the same
+ # process group, which means by default will drop signals from
+ # parent and children processes. By moving it to another group, we
+ # will later be able to kill it.
+ # Note: sudo documentation is wrong, since it states it only drops
+ # signals from children.
+ preexec_fn = os.setpgrp
+
+ self.process_obj = subprocess.Popen(
+ self.popen_args,
+ stdout=self.make_output_log('stdout'),
+ stderr=self.make_output_log('stderr'),
+ stdin=subprocess.PIPE,
+ preexec_fn=preexec_fn,
+ shell=False,
+ cwd=self.run_dir.path,
+ **self.popen_kwargs)
+ self.set_name(self.name_str, pid=self.process_obj.pid)
+ self.log('Launched')
+
+ def launch_sync(self, raise_nonsuccess=True):
+ '''
+ calls launch() method and block waiting for it to finish, serving the
+ mainloop meanwhile.
+ '''
+ try:
+ self.launch()
+ self.wait()
+ except Exception as e:
+ self.terminate()
+ raise e
+ if raise_nonsuccess and self.result != 0:
+ log.ctx(self)
+ raise log.Error('Exited in error %d' % self.result)
+ return self.result
+
+ def respawn(self):
+ self.dbg('respawn')
+ assert not self.is_running()
+ self.result = None
+ self.killed = None
+ return self.launch()
+
+ def respawn_sync(self, raise_nonsuccess=True):
+ self.dbg('respawn_sync')
+ assert not self.is_running()
+ self.result = None
+ self.killed = None
+ return self.launch_sync(raise_nonsuccess)
+
+ def _poll_termination(self, time_to_wait_for_term=5):
+ wait_step = 0.001
+ waited_time = 0
+ while True:
+ # poll returns None if proc is still running
+ self.result = self.process_obj.poll()
+ if self.result is not None:
+ return True
+ waited_time += wait_step
+ # make wait_step approach 1.0
+ wait_step = (1. + 5. * wait_step) / 6.
+ if waited_time >= time_to_wait_for_term:
+ break
+ time.sleep(wait_step)
+ return False
+
+ def send_signal(self, sig):
+ os.kill(self.process_obj.pid, sig)
+
+ def pid(self):
+ if self.process_obj is None:
+ return None
+ return self.process_obj.pid
+
+ def kill(self, sig):
+ """Kills the process with the given signal and remembers it."""
+ self.log('Terminating (%s)' % sig.name)
+ self.send_signal(sig)
+ self.killed = sig
+
+ def terminate(self):
+ if self.process_obj is None:
+ return
+ if self.result is not None:
+ return
+
+ while True:
+ # first try SIGINT to allow stdout+stderr flushing
+ self.kill(signal.SIGINT)
+ if self._poll_termination():
+ break
+
+ # SIGTERM maybe?
+ self.kill(signal.SIGTERM)
+ if self._poll_termination():
+ break
+
+ # out of patience
+ self.kill(signal.SIGKILL)
+ break;
+
+ self.process_obj.wait()
+ self.cleanup()
+
+ def cleanup(self):
+ self.dbg('Cleanup')
+ self.close_output_logs()
+ if self.result == 0:
+ self.log('Terminated: ok', rc=self.result)
+ elif self.killed:
+ self.log('Terminated', rc=self.result)
+ else:
+ self.err('Terminated: ERROR', rc=self.result)
+ #self.log_stdout_tail()
+ self.log_stderr_tail()
+
+ def log_stdout_tail(self):
+ m = self.get_stdout_tail(prefix='| ')
+ if not m:
+ return
+ self.log('stdout:\n', m, '\n')
+
+ def log_stderr_tail(self):
+ m = self.get_stderr_tail(prefix='| ')
+ if not m:
+ return
+ self.log('stderr:\n', m, '\n')
+
+ def close_output_logs(self):
+ for k, v in self.outputs.items():
+ path, f = v
+ if f:
+ f.flush()
+ f.close()
+ self.outputs[k] = (path, None)
+
+ def poll(self):
+ if self.process_obj is None:
+ return
+ if self.result is not None:
+ return
+ self.result = self.process_obj.poll()
+ if self.result is not None:
+ self.cleanup()
+
+ def is_running(self, poll_first=True):
+ if poll_first:
+ self.poll()
+ return self.process_obj is not None and self.result is None
+
+ def get_output(self, which):
+ v = self.outputs.get(which)
+ if not v:
+ return None
+ path, f = v
+ with open(path, 'r') as f2:
+ return f2.read()
+
+ def get_output_tail(self, which, tail=10, prefix=''):
+ out = self.get_output(which)
+ if not out:
+ return None
+ out = out.splitlines()
+ tail = min(len(out), tail)
+ return prefix + ('\n' + prefix).join(out[-tail:])
+
+ def get_stdout(self):
+ return self.get_output('stdout')
+
+ def get_stderr(self):
+ return self.get_output('stderr')
+
+ def get_stdout_tail(self, tail=10, prefix=''):
+ return self.get_output_tail('stdout', tail, prefix)
+
+ def get_stderr_tail(self, tail=10, prefix=''):
+ return self.get_output_tail('stderr', tail, prefix)
+
+ def terminated(self, poll_first=True):
+ if poll_first:
+ self.poll()
+ return self.result is not None
+
+ def wait(self, timeout=300):
+ MainLoop.wait(self, self.terminated, timeout=timeout)
+
+ def stdin_write(self, cmd):
+ '''
+ Send a cmd to the stdin of a process (convert to byte before)
+ '''
+ if self.process_obj.stdin is not None:
+ self.process_obj.stdin.write(cmd.encode("utf-8"))
+ self.process_obj.stdin.flush()
+
+class RemoteProcess(Process):
+
+ def __init__(self, name, run_dir, remote_user, remote_host, remote_cwd, popen_args, remote_env={}, **popen_kwargs):
+ super().__init__(name, run_dir, popen_args, **popen_kwargs)
+ self.remote_user = remote_user
+ self.remote_host = remote_host
+ self.remote_cwd = remote_cwd
+ self.remote_env = remote_env
+
+ # hacky: instead of just prepending ssh, i.e. piping stdout and stderr
+ # over the ssh link, we should probably run on the remote side,
+ # monitoring the process remotely.
+ if self.remote_cwd:
+ cd = 'cd "%s";' % self.remote_cwd
+ else:
+ cd = ''
+ # We need double -t to force tty and be able to forward signals to
+ # processes (SIGHUP) when we close ssh on the local side. As a result,
+ # stderr seems to be merged into stdout in ssh client.
+ self.popen_args = ['ssh', '-t', '-t', self.remote_user+'@'+self.remote_host,
+ '%s %s %s' % (cd,
+ ' '.join(['%s=%r'%(k,v) for k,v in self.remote_env.items()]),
+ ' '.join(self.popen_args))]
+ self.dbg(self.popen_args, dir=self.run_dir, conf=self.popen_kwargs, remote_env=self.remote_env)
+
+class NetNSProcess(Process):
+ NETNS_EXEC_BIN = 'osmo-gsm-tester_netns_exec.sh'
+ def __init__(self, name, run_dir, netns, popen_args, **popen_kwargs):
+ super().__init__(name, run_dir, popen_args, **popen_kwargs)
+ self.netns = netns
+
+ self.popen_args = ['sudo', self.NETNS_EXEC_BIN, self.netns] + list(popen_args)
+ self.dbg(self.popen_args, dir=self.run_dir, conf=self.popen_kwargs)
+
+ # HACK: Since we run under sudo, only way to kill root-owned process is to kill as root...
+ # This function is overwritten from Process.
+ def send_signal(self, sig):
+ if sig == signal.SIGKILL:
+ # if we kill sudo, its children (bash running NETNS_EXEC_BIN +
+ # tcpdump under it) are kept alive. Let's instead tell the script to
+ # kill tcpdump:
+ sig = signal.SIGUSR1
+ kill_cmd = ('kill', '-%d' % int(sig), str(self.process_obj.pid))
+ run_local_netns_sync(self.run_dir, self.name()+"-kill"+str(sig), self.netns, kill_cmd)
+
+class RemoteNetNSProcess(RemoteProcess):
+ NETNS_EXEC_BIN = 'osmo-gsm-tester_netns_exec.sh'
+ def __init__(self, name, run_dir, remote_user, remote_host, remote_cwd, netns, popen_args, **popen_kwargs):
+ self.netns = netns
+ args = ['sudo', self.NETNS_EXEC_BIN, self.netns] + list(popen_args)
+ super().__init__(name, run_dir, remote_user, remote_host, remote_cwd, args, **popen_kwargs)
+
+def run_local_sync(run_dir, name, popen_args):
+ run_dir =run_dir.new_dir(name)
+ proc = Process(name, run_dir, popen_args)
+ proc.launch_sync()
+ return proc
+
+def run_local_netns_sync(run_dir, name, netns, popen_args):
+ run_dir =run_dir.new_dir(name)
+ proc = NetNSProcess(name, run_dir, netns, popen_args)
+ proc.launch_sync()
+ return proc
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/remote.py b/src/osmo_gsm_tester/core/remote.py
new file mode 100644
index 0000000..95b8967
--- /dev/null
+++ b/src/osmo_gsm_tester/core/remote.py
@@ -0,0 +1,193 @@
+# osmo_gsm_tester: specifics for remote nodes
+#
+# Copyright (C) 2020 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Pau Espin Pedrol <pespin@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import stat
+import os
+import re
+import pprint
+
+from . import log, util, config, template, process
+
+class RemoteHost(log.Origin):
+
+ WRAPPER_SCRIPT = 'ssh_sigkiller.sh'
+
+ def __init__(self, run_dir, remote_user = 'root', remote_host = 'localhost', remote_cwd=None):
+ super().__init__(log.C_RUN, 'host-' + remote_user + '@' + remote_host)
+ self.run_dir = util.Dir(run_dir.new_dir(self.name()))
+ self.remote_user = remote_user
+ self.remote_host = remote_host
+ self.remote_cwd = remote_cwd
+ self.remote_env = {}
+
+ def user(self):
+ return self.remote_user
+
+ def host(self):
+ return self.remote_host
+
+ def cwd(self):
+ return self.remote_cwd
+
+ def set_remote_env(self, remote_env_dict):
+ self.remote_env = remote_env_dict
+
+ def get_remote_env(self):
+ return self.remote_env
+
+ def RemoteProcess(self, name, popen_args, remote_env={}, **popen_kwargs):
+ run_dir = self.run_dir.new_dir(name)
+ return process.RemoteProcess(name, run_dir, self.user(), self.host(), self.cwd(), popen_args, remote_env=remote_env, **popen_kwargs)
+
+ def generate_wrapper_script(self):
+ wrapper_script = self.run_dir.new_file(RemoteHost.WRAPPER_SCRIPT)
+ with open(wrapper_script, 'w') as f:
+ r = """#!/bin/bash
+ LOGFILE=/dev/null
+ kill_pid(){
+ mypid=$1
+ kill $mypid
+ if ! kill -0 $mypid; then
+ return
+ fi
+ echo "sleeping some time waiting for child to die..." >>$LOGFILE
+ sleep 5
+ if ! kill -0 $mypid; then
+ return
+ fi
+ echo "kill -9 the process and wait!" >>$LOGFILE
+ kill -9 $mypid
+ wait $mypid
+ }
+ prep_sighandler() {
+ unset term_child_pid
+ unset term_kill_needed
+ trap 'sign_handler SIGTERM' SIGTERM
+ trap 'sign_handler SIGINT' SIGINT
+ trap 'sign_handler SIGHUP' SIGHUP
+ echo "script started, traps set" >$LOGFILE
+ }
+ sign_handler() {
+ sig=$1
+ echo "$sig -> ${term_child_pid}" >>$LOGFILE
+ echo "received signal handler $sig, killing ${term_child_pid}" >>$LOGFILE
+ kill_pid ${term_child_pid}
+ }
+ wait_sighandler()
+ {
+ term_child_pid=$!
+ if [ "${term_kill_needed}" ]; then
+ kill_pid "${term_child_pid}"
+ fi
+ echo "waiting for ${term_child_pid}" >>$LOGFILE
+ wait ${term_child_pid}
+ echo "process ${term_child_pid} finished" >>$LOGFILE
+ }
+ prep_sighandler
+ $@ &
+ wait_sighandler
+ """
+ f.write(r)
+ st = os.stat(wrapper_script)
+ os.chmod(wrapper_script, st.st_mode | stat.S_IEXEC)
+ return wrapper_script
+
+ def RemoteProcessFixIgnoreSIGHUP(self, name, remote_dir, popen_args, remote_env={}, **popen_kwargs):
+ # Run remotely through ssh. We need to run binary under a wrapper
+ # script since osmo-trx ignores SIGHUP and will keep running after
+ # we close local ssh session. The wrapper script catches SIGHUP and
+ # sends SIGINT to it.
+ self.create_remote_dir(remote_dir)
+
+ wrapper_script = self.generate_wrapper_script()
+ remote_wrapper_script = remote_dir.child(RemoteHost.WRAPPER_SCRIPT)
+ self.scp('scp-wrapper-to-remote', wrapper_script, remote_wrapper_script)
+
+ args = (remote_wrapper_script,) + popen_args
+ return self.RemoteProcess(name, args, remote_env, **popen_kwargs)
+
+ def RemoteNetNSProcess(self, name, netns, popen_args, **popen_kwargs):
+ run_dir = self.run_dir.new_dir(name)
+ return process.RemoteNetNSProcess(name, run_dir, self.user(), self.host(), self.cwd(), netns, popen_args, **popen_kwargs)
+
+ def run_remote_sync(self, name, popen_args):
+ proc = self.RemoteProcess(name, popen_args, remote_env=self.remote_env)
+ proc.launch_sync()
+ return proc
+
+ def rm_remote_dir(self, remote_dir):
+ remote_dir_str = str(remote_dir)
+ self.run_remote_sync('rm-remote-dir', ('test', '!', '-d', remote_dir_str, '||', 'rm', '-rf', remote_dir_str))
+
+ def create_remote_dir(self, remote_dir):
+ remote_dir_str = str(remote_dir)
+ self.run_remote_sync('mk-remote-dir', ('mkdir', '-p', remote_dir_str))
+
+ def recreate_remote_dir(self, remote_dir):
+ self.rm_remote_dir(remote_dir)
+ self.create_remote_dir(remote_dir)
+
+ def inst_compatible_for_remote(self):
+ proc = self.run_remote_sync('uname-m', ('uname', '-m'))
+ if "x86_64" in (proc.get_stdout() or ''):
+ return True
+ return False
+
+ def scp(self, name, local_path, remote_path):
+ process.run_local_sync(self.run_dir, name, ('scp', '-r', local_path, '%s@%s:%s' % (self.user(), self.host(), remote_path)))
+
+ def scpfrom(self, name, remote_path, local_path):
+ process.run_local_sync(self.run_dir, name, ('scp', '-r', '%s@%s:%s' % (self.user(), self.host(), remote_path), local_path))
+
+ def setcap_net_admin(self, binary_path):
+ '''
+ This functionality requires specific setup on the host running
+ osmo-gsm-tester. See osmo-gsm-tester manual for more information.
+ '''
+ SETCAP_NET_ADMIN_BIN = 'osmo-gsm-tester_setcap_net_admin.sh'
+ self.run_remote_sync('setcap-netadm', ('sudo', SETCAP_NET_ADMIN_BIN, binary_path))
+
+ def setcap_netsys_admin(self, binary_path):
+ '''
+ This functionality requires specific setup on the host running
+ osmo-gsm-tester. See osmo-gsm-tester manual for more information.
+ '''
+ SETCAP_NETSYS_ADMIN_BIN = 'osmo-gsm-tester_setcap_netsys_admin.sh'
+ self.run_remote_sync('setcap-netsysadm', ('sudo', SETCAP_NETSYS_ADMIN_BIN, binary_path))
+
+ def create_netns(self, netns):
+ '''
+ It creates the netns if it doesn't already exist.
+ '''
+ NETNS_SETUP_BIN = 'osmo-gsm-tester_netns_setup.sh'
+ self.run_remote_sync('create_netns', ('sudo', NETNS_SETUP_BIN, netns))
+
+ def change_elf_rpath(self, binary_path, paths):
+ '''
+ Change RPATH field in ELF executable binary.
+ This feature can be used to tell the loader to load the trial libraries, as
+ LD_LIBRARY_PATH is disabled for paths with modified capabilities.
+ '''
+ patchelf_bin = self.remote_env.get('PATCHELF_BIN', None)
+ if not patchelf_bin:
+ patchelf_bin = 'patchelf'
+ else:
+ self.dbg('Using specific patchelf from %s', patchelf_bin)
+
+ self.run_remote_sync('patchelf', (patchelf_bin, '--set-rpath', paths, binary_path))
diff --git a/src/osmo_gsm_tester/core/schema.py b/src/osmo_gsm_tester/core/schema.py
new file mode 100644
index 0000000..d343bef
--- /dev/null
+++ b/src/osmo_gsm_tester/core/schema.py
@@ -0,0 +1,246 @@
+# osmo_gsm_tester: validate dict structures
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+
+from . import log
+from .util import is_dict, is_list, str2bool, ENUM_OSMO_AUTH_ALGO
+
+KEY_RE = re.compile('[a-zA-Z][a-zA-Z0-9_]*')
+IPV4_RE = re.compile('([0-9]{1,3}.){3}[0-9]{1,3}')
+HWADDR_RE = re.compile('([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}')
+IMSI_RE = re.compile('[0-9]{6,15}')
+KI_RE = re.compile('[0-9a-fA-F]{32}')
+MSISDN_RE = re.compile('[0-9]{1,15}')
+
+def match_re(name, regex, val):
+ while True:
+ if not isinstance(val, str):
+ break;
+ if not regex.fullmatch(val):
+ break;
+ return
+ raise ValueError('Invalid %s: %r' % (name, val))
+
+def band(val):
+ if val in ('GSM-900', 'GSM-1800', 'GSM-1900'):
+ return
+ raise ValueError('Unknown GSM band: %r' % val)
+
+def ipv4(val):
+ match_re('IPv4 address', IPV4_RE, val)
+ els = [int(el) for el in val.split('.')]
+ if not all([el >= 0 and el <= 255 for el in els]):
+ raise ValueError('Invalid IPv4 address: %r' % val)
+
+def hwaddr(val):
+ match_re('hardware address', HWADDR_RE, val)
+
+def imsi(val):
+ match_re('IMSI', IMSI_RE, val)
+
+def ki(val):
+ match_re('KI', KI_RE, val)
+
+def msisdn(val):
+ match_re('MSISDN', MSISDN_RE, val)
+
+def auth_algo(val):
+ if val not in ENUM_OSMO_AUTH_ALGO:
+ raise ValueError('Unknown Authentication Algorithm: %r' % val)
+
+def uint(val):
+ n = int(val)
+ if n < 0:
+ raise ValueError('Positive value expected instead of %d' % n)
+
+def uint8(val):
+ n = int(val)
+ if n < 0:
+ raise ValueError('Positive value expected instead of %d' % n)
+ if n > 255: # 2^8 - 1
+ raise ValueError('Value %d too big, max value is 255' % n)
+
+def uint16(val):
+ n = int(val)
+ if n < 0:
+ raise ValueError('Positive value expected instead of %d' % n)
+ if n > 65535: # 2^16 - 1
+ raise ValueError('Value %d too big, max value is 65535' % n)
+
+def times(val):
+ n = int(val)
+ if n < 1:
+ raise ValueError('Positive value >0 expected instead of %d' % n)
+
+def cipher(val):
+ if val in ('a5_0', 'a5_1', 'a5_2', 'a5_3', 'a5_4', 'a5_5', 'a5_6', 'a5_7'):
+ return
+ raise ValueError('Unknown Cipher value: %r' % val)
+
+def modem_feature(val):
+ if val in ('sms', 'gprs', 'voice', 'ussd', 'sim', '2g', '3g', '4g'):
+ return
+ raise ValueError('Unknown Modem Feature: %r' % val)
+
+def phy_channel_config(val):
+ if val in ('CCCH', 'CCCH+SDCCH4', 'TCH/F', 'TCH/H', 'SDCCH8', 'PDCH',
+ 'TCH/F_PDCH', 'CCCH+SDCCH4+CBCH', 'SDCCH8+CBCH','TCH/F_TCH/H_PDCH'):
+ return
+ raise ValueError('Unknown Physical channel config: %r' % val)
+
+def channel_allocator(val):
+ if val in ('ascending', 'descending'):
+ return
+ raise ValueError('Unknown Channel Allocator Policy %r' % val)
+
+def gprs_mode(val):
+ if val in ('none', 'gprs', 'egprs'):
+ return
+ raise ValueError('Unknown GPRS mode %r' % val)
+
+def codec(val):
+ if val in ('hr1', 'hr2', 'hr3', 'fr1', 'fr2', 'fr3'):
+ return
+ raise ValueError('Unknown Codec value: %r' % val)
+
+def osmo_trx_clock_ref(val):
+ if val in ('internal', 'external', 'gspdo'):
+ return
+ raise ValueError('Unknown OsmoTRX clock reference value: %r' % val)
+
+def lte_transmission_mode(val):
+ n = int(val)
+ if n <= 4:
+ return
+ raise ValueError('LTE Transmission Mode %d not in expected range' % n)
+
+def duration(val):
+ if val.isdecimal() or val.endswith('m') or val.endswith('h'):
+ return
+ raise ValueError('Invalid duration value: %r' % val)
+
+INT = 'int'
+STR = 'str'
+UINT = 'uint'
+BOOL_STR = 'bool_str'
+BAND = 'band'
+IPV4 = 'ipv4'
+HWADDR = 'hwaddr'
+IMSI = 'imsi'
+KI = 'ki'
+MSISDN = 'msisdn'
+AUTH_ALGO = 'auth_algo'
+TIMES='times'
+CIPHER = 'cipher'
+MODEM_FEATURE = 'modem_feature'
+PHY_CHAN = 'chan'
+CHAN_ALLOCATOR = 'chan_allocator'
+GPRS_MODE = 'gprs_mode'
+CODEC = 'codec'
+OSMO_TRX_CLOCK_REF = 'osmo_trx_clock_ref'
+LTE_TRANSMISSION_MODE = 'lte_transmission_mode'
+DURATION = 'duration'
+
+SCHEMA_TYPES = {
+ INT: int,
+ STR: str,
+ UINT: uint,
+ BOOL_STR: str2bool,
+ BAND: band,
+ IPV4: ipv4,
+ HWADDR: hwaddr,
+ IMSI: imsi,
+ KI: ki,
+ MSISDN: msisdn,
+ AUTH_ALGO: auth_algo,
+ TIMES: times,
+ CIPHER: cipher,
+ MODEM_FEATURE: modem_feature,
+ PHY_CHAN: phy_channel_config,
+ CHAN_ALLOCATOR: channel_allocator,
+ GPRS_MODE: gprs_mode,
+ CODEC: codec,
+ OSMO_TRX_CLOCK_REF: osmo_trx_clock_ref,
+ LTE_TRANSMISSION_MODE: lte_transmission_mode,
+ DURATION: duration,
+ }
+
+def validate(config, schema):
+ '''Make sure the given config dict adheres to the schema.
+ The schema is a dict of 'dict paths' in dot-notation with permitted
+ value type. All leaf nodes are validated, nesting dicts are implicit.
+
+ validate( { 'a': 123, 'b': { 'b1': 'foo', 'b2': [ 1, 2, 3 ] } },
+ { 'a': int,
+ 'b.b1': str,
+ 'b.b2[]': int } )
+
+ Raise a ValueError in case the schema is violated.
+ '''
+
+ def validate_item(path, value, schema):
+ want_type = schema.get(path)
+
+ if is_list(value):
+ if want_type:
+ raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
+ path = path + '[]'
+ want_type = schema.get(path)
+
+ if not want_type:
+ if is_dict(value):
+ nest(path, value, schema)
+ return
+ if is_list(value) and value:
+ for list_v in value:
+ validate_item(path, list_v, schema)
+ return
+ raise ValueError('config item not known: %r' % path)
+
+ if want_type not in SCHEMA_TYPES:
+ raise ValueError('unknown type %r at %r' % (want_type, path))
+
+ if is_dict(value):
+ raise ValueError('config item is dict but should be a leaf node of type %r: %r'
+ % (want_type, path))
+
+ if is_list(value):
+ for list_v in value:
+ validate_item(path, list_v, schema)
+ return
+
+ log.ctx(path)
+ type_validator = SCHEMA_TYPES.get(want_type)
+ type_validator(value)
+
+ def nest(parent_path, config, schema):
+ if parent_path:
+ parent_path = parent_path + '.'
+ else:
+ parent_path = ''
+ for k,v in config.items():
+ if not KEY_RE.fullmatch(k):
+ raise ValueError('invalid config key: %r' % k)
+ path = parent_path + k
+ validate_item(path, v, schema)
+
+ nest(None, config, schema)
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/template.py b/src/osmo_gsm_tester/core/template.py
new file mode 100644
index 0000000..2bf4fed
--- /dev/null
+++ b/src/osmo_gsm_tester/core/template.py
@@ -0,0 +1,62 @@
+# osmo_gsm_tester: automated cellular network hardware tests
+# Proxy to templating engine to handle files
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+from mako.lookup import TemplateLookup, Template
+
+from . import log
+from .util import dict2obj
+
+_lookup = None
+_logger = log.Origin(log.C_CNF, 'no templates dir set')
+
+def set_templates_dir(*templates_dirs):
+ global _lookup
+ global _logger
+ if not templates_dirs:
+ # default templates dir is relative to this source file
+ templates_dirs = [os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')]
+ for d in templates_dirs:
+ if not os.path.isdir(d):
+ raise RuntimeError('templates dir is not a dir: %r'
+ % os.path.abspath(d))
+ _lookup = TemplateLookup(directories=templates_dirs)
+ _logger = log.Origin(log.C_CNF, 'Templates')
+
+def render(name, values):
+ '''feed values dict into template and return rendered result.
+ ".tmpl" is added to the name to look it up in the templates dir.'''
+ global _lookup
+ if _lookup is None:
+ set_templates_dir()
+ tmpl_name = name + '.tmpl'
+ log.ctx(tmpl_name)
+ template = _lookup.get_template(tmpl_name)
+ _logger.dbg('rendering', tmpl_name)
+
+ return template.render(**dict2obj(values))
+
+def render_strbuf_inline(strbuf, values):
+ '''Receive a string containing template syntax, and generate output using
+ passed values.'''
+ mytemplate = Template(strbuf)
+ return mytemplate.render(**dict2obj(values))
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/util.py b/src/osmo_gsm_tester/core/util.py
new file mode 100644
index 0000000..a5b2bbf
--- /dev/null
+++ b/src/osmo_gsm_tester/core/util.py
@@ -0,0 +1,433 @@
+# osmo_gsm_tester: language snippets
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Neels Hofmeyr <neels@hofmeyr.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import time
+import fcntl
+import hashlib
+import tempfile
+import shutil
+import atexit
+import threading
+import importlib.util
+import subprocess
+
+# This mirrors enum osmo_auth_algo in libosmocore/include/osmocom/crypt/auth.h
+# so that the index within the tuple matches the enum value.
+OSMO_AUTH_ALGO_NONE = 'none'
+ENUM_OSMO_AUTH_ALGO = (OSMO_AUTH_ALGO_NONE, 'comp128v1', 'comp128v2', 'comp128v3', 'xor', 'milenage')
+
+def osmo_auth_algo_by_name(algo_str):
+ 'Return enum osmo_auth_algo numeric value as from libosmocore, raise ValueError if not defined.'
+ return ENUM_OSMO_AUTH_ALGO.index(algo_str.lower())
+
+def prepend_library_path(path):
+ lp = os.getenv('LD_LIBRARY_PATH')
+ if not lp:
+ return path
+ return path + ':' + lp
+
+def change_elf_rpath(binary, paths, run_dir):
+ '''
+ Change RPATH field in ELF executable binary.
+ This feature can be used to tell the loaded to load the trial libraries, as
+ LD_LIBRARY_PATH is disabled for paths with modified capabilities.
+ '''
+ from .process import Process
+ proc = Process('patchelf', run_dir, ['patchelf', '--set-rpath', paths, binary])
+ proc.launch_sync()
+
+def ip_to_iface(ip):
+ try:
+ for iface in os.listdir('/sys/class/net'):
+ proc = subprocess.Popen(['ip', 'addr', 'show', 'dev', iface], stdout=subprocess.PIPE, universal_newlines=True)
+ for line in proc.stdout.readlines():
+ if 'inet' in line and ' ' + ip + '/' in line:
+ return line.split()[-1]
+ except Exception:
+ pass
+ return None
+
+def dst_ip_get_local_bind(ip):
+ '''Retrieve default IP addr to bind to in order to route traffic to dst addr'''
+ try:
+ proc = subprocess.Popen(['ip', 'route', 'get', ip], stdout=subprocess.PIPE, universal_newlines=True)
+ output = proc.stdout.readlines()
+ words = output[0].split()
+ i = 0
+ while i < len(words):
+ if words[i] == 'src':
+ return words[i+1]
+ i += 1
+ except Exception:
+ pass
+ return None
+
+def setcap_net_raw(binary, run_dir):
+ '''
+ This functionality requires specific setup on the host running
+ osmo-gsm-tester. See osmo-gsm-tester manual for more information.
+ '''
+ from .process import Process
+ SETCAP_NET_RAW_BIN = 'osmo-gsm-tester_setcap_net_raw.sh'
+ proc = Process(SETCAP_NET_RAW_BIN, run_dir, ['sudo', SETCAP_NET_RAW_BIN, binary])
+ proc.launch_sync()
+
+def setcap_net_admin(binary, run_dir):
+ '''
+ This functionality requires specific setup on the host running
+ osmo-gsm-tester. See osmo-gsm-tester manual for more information.
+ '''
+ from .process import Process
+ SETCAP_NET_ADMIN_BIN = 'osmo-gsm-tester_setcap_net_admin.sh'
+ proc = Process(SETCAP_NET_ADMIN_BIN, run_dir, ['sudo', SETCAP_NET_ADMIN_BIN, binary])
+ proc.launch_sync()
+
+def setcap_netsys_admin(self, binary, run_dir):
+ '''
+ This functionality requires specific setup on the host running
+ osmo-gsm-tester. See osmo-gsm-tester manual for more information.
+ '''
+ from .process import Process
+ SETCAP_NETSYS_ADMIN_BIN = 'osmo-gsm-tester_setcap_netsys_admin.sh'
+ proc = Process(SETCAP_NETSYS_ADMIN_BIN, run_dir, ['sudo', SETCAP_NETSYS_ADMIN_BIN, binary])
+ proc.launch_sync()
+
+def create_netns(netns, run_dir):
+ '''
+ It creates the netns if it doesn't already exist.
+ '''
+ from .process import Process
+ NETNS_SETUP_BIN = 'osmo-gsm-tester_netns_setup.sh'
+ proc = Process('create_netns', ('sudo', NETNS_SETUP_BIN, netns))
+ proc.launch_sync()
+
+def move_iface_to_netns(ifname, netns, run_dir):
+ '''
+ Moves an iface to a netns. It creates the netns if it doesn't exist.
+ '''
+ from .process import Process
+ NETNS_SETUP_BIN = 'osmo-gsm-tester_netns_setup.sh'
+ proc = Process('move_netns', run_dir, ['sudo', NETNS_SETUP_BIN, netns, ifname])
+ proc.launch_sync()
+
+def import_path_prepend(pathname):
+ dir = os.path.realpath(pathname)
+ if dir not in sys.path:
+ sys.path.insert(0, dir)
+
+def import_path_remove(pathname):
+ dir = os.path.realpath(pathname)
+ if dir in sys.path:
+ sys.path.remove(dir)
+
+class listdict(dict):
+ 'a dict of lists { "a": [1, 2, 3], "b": [1, 2] }'
+
+ def add(self, name, item):
+ l = self.get(name)
+ if not l:
+ l = []
+ self[name] = l
+ l.append(item)
+ return l
+
+ def add_dict(self, d):
+ for k,v in d.items():
+ self.add(k, v)
+
+class DictProxy:
+ '''
+ allow accessing dict entries like object members
+ syntactical sugar, adapted from http://stackoverflow.com/a/31569634
+ so that e.g. templates can do ${bts.member} instead of ${bts['member']}
+ '''
+ def __init__(self, obj):
+ self.obj = obj
+
+ def __getitem__(self, key):
+ return dict2obj(self.obj[key])
+
+ def __getattr__(self, key):
+ 'provide error information to know which template item was missing'
+ try:
+ return dict2obj(getattr(self.obj, key))
+ except AttributeError:
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+def dict2obj(value):
+ if is_list(value) or is_dict(value):
+ return DictProxy(value)
+ return value
+
+
+class FileLock:
+ def __init__(self, path, owner):
+ self.path = path
+ self.owner = owner
+ self.f = None
+
+ def __enter__(self):
+ if self.f is not None:
+ return
+ self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
+ fcntl.flock(self.fd, fcntl.LOCK_EX)
+ os.truncate(self.fd, 0)
+ os.write(self.fd, str(self.owner).encode('utf-8'))
+ os.fsync(self.fd)
+
+ def __exit__(self, *exc_info):
+ #fcntl.flock(self.fd, fcntl.LOCK_UN)
+ os.truncate(self.fd, 0)
+ os.fsync(self.fd)
+ os.close(self.fd)
+ self.fd = -1
+
+ def lock(self):
+ self.__enter__()
+
+ def unlock(self):
+ self.__exit__()
+
+
+class Dir():
+ LOCK_FILE = 'lock'
+
+ def __init__(self, path):
+ self.path = path
+ self.lock_path = os.path.join(self.path, Dir.LOCK_FILE)
+
+ def lock(self, origin_id):
+ '''
+ return lock context, usage:
+
+ with my_dir.lock(origin):
+ read_from(my_dir.child('foo.txt'))
+ write_to(my_dir.child('bar.txt'))
+ '''
+ self.mkdir()
+ return FileLock(self.lock_path, origin_id)
+
+ @staticmethod
+ def ensure_abs_dir_exists(*path_elements):
+ l = len(path_elements)
+ if l < 1:
+ raise RuntimeError('Cannot create empty path')
+ if l == 1:
+ path = path_elements[0]
+ else:
+ path = os.path.join(*path_elements)
+ if not os.path.isdir(path):
+ os.makedirs(path)
+
+ def child(self, *rel_path):
+ if not rel_path:
+ return self.path
+ return os.path.join(self.path, *rel_path)
+
+ def mk_parentdir(self, *rel_path):
+ child = self.child(*rel_path)
+ child_parent = os.path.dirname(child)
+ Dir.ensure_abs_dir_exists(child_parent)
+ return child
+
+ def mkdir(self, *rel_path):
+ child = self.child(*rel_path)
+ Dir.ensure_abs_dir_exists(child)
+ return child
+
+ def children(self):
+ return os.listdir(self.path)
+
+ def exists(self, *rel_path):
+ return os.path.exists(self.child(*rel_path))
+
+ def isdir(self, *rel_path):
+ return os.path.isdir(self.child(*rel_path))
+
+ def isfile(self, *rel_path):
+ return os.path.isfile(self.child(*rel_path))
+
+ def new_child(self, *rel_path):
+ attempt = 1
+ prefix, suffix = os.path.splitext(self.child(*rel_path))
+ rel_path_fmt = '%s%%s%s' % (prefix, suffix)
+ while True:
+ path = rel_path_fmt % (('_%d'%attempt) if attempt > 1 else '')
+ if not os.path.exists(path):
+ break
+ attempt += 1
+ continue
+ Dir.ensure_abs_dir_exists(os.path.dirname(path))
+ return path
+
+ def rel_path(self, path):
+ return os.path.relpath(path, self.path)
+
+ def touch(self, *rel_path):
+ touch_file(self.child(*rel_path))
+
+ def new_file(self, *rel_path):
+ path = self.new_child(*rel_path)
+ touch_file(path)
+ return path
+
+ def new_dir(self, *rel_path):
+ path = self.new_child(*rel_path)
+ Dir.ensure_abs_dir_exists(path)
+ return path
+
+ def __str__(self):
+ return self.path
+ def __repr__(self):
+ return self.path
+
+def touch_file(path):
+ with open(path, 'a') as f:
+ f.close()
+
+def is_dict(l):
+ return isinstance(l, dict)
+
+def is_list(l):
+ return isinstance(l, (list, tuple))
+
+
+def dict_add(a, *b, **c):
+ for bb in b:
+ a.update(bb)
+ a.update(c)
+ return a
+
+def _hash_recurse(acc, obj, ignore_keys):
+ if is_dict(obj):
+ for key, val in sorted(obj.items()):
+ if key in ignore_keys:
+ continue
+ _hash_recurse(acc, val, ignore_keys)
+ return
+
+ if is_list(obj):
+ for item in obj:
+ _hash_recurse(acc, item, ignore_keys)
+ return
+
+ acc.update(str(obj).encode('utf-8'))
+
+def hash_obj(obj, *ignore_keys):
+ acc = hashlib.sha1()
+ _hash_recurse(acc, obj, ignore_keys)
+ return acc.hexdigest()
+
+
+def md5(of_content):
+ if isinstance(of_content, str):
+ of_content = of_content.encode('utf-8')
+ return hashlib.md5(of_content).hexdigest()
+
+def md5_of_file(path):
+ with open(path, 'rb') as f:
+ return md5(f.read())
+
+_tempdir = None
+
+def get_tempdir(remove_on_exit=True):
+ global _tempdir
+ if _tempdir is not None:
+ return _tempdir
+ _tempdir = tempfile.mkdtemp()
+ if remove_on_exit:
+ atexit.register(lambda: shutil.rmtree(_tempdir))
+ return _tempdir
+
+
+if hasattr(importlib.util, 'module_from_spec'):
+ def run_python_file(module_name, path):
+ spec = importlib.util.spec_from_file_location(module_name, path)
+ spec.loader.exec_module( importlib.util.module_from_spec(spec) )
+else:
+ from importlib.machinery import SourceFileLoader
+ def run_python_file(module_name, path):
+ SourceFileLoader(module_name, path).load_module()
+
+def msisdn_inc(msisdn_str):
+ 'add 1 and preserve leading zeros'
+ return ('%%0%dd' % len(msisdn_str)) % (int(msisdn_str) + 1)
+
+class InputThread(threading.Thread):
+ def __init__(self, prompt):
+ super().__init__()
+ self.prompt = prompt
+ self.result = None
+
+ def run(self):
+ self.result = input(self.prompt)
+
+def input_polling(prompt, poll_func):
+ input_thread = InputThread(prompt)
+ input_thread.start()
+
+ while input_thread.is_alive():
+ poll_func()
+ time.sleep(1)
+
+ input_thread.join()
+ return input_thread.result
+
+def str2bool(val):
+ if val is None or not val:
+ return False
+ if val.upper() in ['FALSE', 'NO', 'OFF']:
+ return False
+ if val.upper() in ['TRUE','YES', 'ON']:
+ return True
+ raise ValueError('Invalid BOOL field: %r' % val)
+
+def list_validate_same_elem_type(li):
+ '''
+ Checks that all elements in the list are of the same type and returns that type.
+ If the list is empty, returns None
+ If one of the elements is not of the same type, it throws a ValueError exception.
+ '''
+ if len(li) == 0:
+ return None
+ t = type(li[0])
+ for elem in li:
+ if type(elem) != t:
+ raise ValueError('List contains elements of different types: %r vs %r' % (t, type(elem)))
+ return t
+
+def empty_instance_type(t):
+ if t == dict:
+ return {}
+ elif t == list:
+ return []
+ elif t == tuple:
+ return ()
+ raise ValueError('type %r not supported!' % t)
+
+def encryption2osmovty(val):
+ assert val[:3] == 'a5_'
+ return 'a5 ' + val[3:]
+
+# vim: expandtab tabstop=4 shiftwidth=4