initial import

The original osmo-gsm-tester was an internal development at sysmocom, mostly by
D. Laszlo Sitzer <>, of which this public osmo-gsm-tester
is a refactoring / rewrite.

This imports an early state of the refactoring and is not functional yet. Bits
from the earlier osmo-gsm-tester will be added as needed. The earlier commit
history is not imported.
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..6b6b46e
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,29 @@
+# osmo_gsm_tester: automated cellular network hardware tests
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+# Authors: D. Lazlo Sitzer <>
+#          Neels Hofmeyr <>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <>.
+__version__ = 'UNKNOWN'
+    from ._version import _version
+    __version__ = _version
+    pass
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..18b209e
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,161 @@
+# osmo_gsm_tester: read and validate config files
+# discussion for choice of config file format:
+# Python syntax is insane, because it allows the config file to run arbitrary
+# python commands.
+# INI file format is nice and simple, but it doesn't allow having the same
+# section numerous times (e.g. to define several modems or BTS models) and does
+# not support nesting.
+# JSON has too much braces and quotes to be easy to type
+# YAML formatting is lean, but too powerful. The normal load() allows arbitrary
+# code execution. There is safe_load(). But YAML also allows several
+# alternative ways of formatting, better to have just one authoritative style.
+# Also it would be better to receive every setting as simple string rather than
+# e.g. an IMSI as an integer.
+# The Python ConfigParserShootout page has numerous contestants, but it we want
+# to use widely used, standardized parsing code without re-inventing the wheel.
+# The optimum would be a stripped down YAML format.
+# In the lack of that, we shall go with yaml.load_safe() + a round trip
+# (feeding back to itself), converting keys to lowercase and values to string.
+import yaml
+import re
+import os
+from . import log
+def read(path, schema=None):
+    with log.Origin(path):
+        with open(path, 'r') as f:
+            config = yaml.safe_load(f)
+        config = _standardize(config)
+        if schema:
+            validate(config, schema)
+        return config
+def tostr(config):
+    return _tostr(_standardize(config))
+def _tostr(config):
+    return yaml.dump(config, default_flow_style=False)
+def _standardize_item(item):
+    if isinstance(item, (tuple, list)):
+        return [_standardize_item(i) for i in item]
+    if isinstance(item, dict):
+        return dict([(key.lower(), _standardize_item(val)) for key,val in item.items()])
+    return str(item)
+def _standardize(config):
+    config = yaml.safe_load(_tostr(_standardize_item(config)))
+    return config
+KEY_RE = re.compile('[a-zA-Z][a-zA-Z0-9_]*')
+def band(val):
+    if val in ('GSM-1800', 'GSM-1900'):
+        return
+    raise ValueError('Unknown GSM band: %r' % val)
+INT = 'int'
+STR = 'str'
+BAND = 'band'
+        INT: int,
+        STR: str,
+        BAND: band,
+    }
+def is_dict(l):
+    return isinstance(l, dict)
+def is_list(l):
+    return isinstance(l, (list, tuple))
+def validate(config, schema):
+    '''Make sure the given config dict adheres to the schema.
+       The schema is a dict of 'dict paths' in dot-notation with permitted
+       value type. All leaf nodes are validated, nesting dicts are implicit.
+       validate( { 'a': 123, 'b': { 'b1': 'foo', 'b2': [ 1, 2, 3 ] } },
+                 { 'a': int,
+                   'b.b1': str,
+                   'b.b2[]': int } )
+       Raise a ValueError in case the schema is violated.
+    '''
+    def validate_item(path, value, schema):
+        want_type = schema.get(path)
+        if is_list(value):
+            if want_type:
+                raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
+            path = path + '[]'
+            want_type = schema.get(path)
+        if not want_type:
+            if is_dict(value):
+                nest(path, value, schema)
+                return
+            if is_list(value) and value:
+                for list_v in value:
+                    validate_item(path, list_v, schema)
+                return
+            raise ValueError('config item not known: %r' % path)
+        if want_type not in SCHEMA_TYPES:
+            raise ValueError('unknown type %r at %r' % (want_type, path))
+        if is_dict(value):
+            raise ValueError('config item is dict but should be a leaf node of type %r: %r'
+                             % (want_type, path))
+        if is_list(value):
+            for list_v in value:
+                validate_item(path, list_v, schema)
+            return
+        with log.Origin(item=path):
+            type_validator = SCHEMA_TYPES.get(want_type)
+            type_validator(value)
+    def nest(parent_path, config, schema):
+        if parent_path:
+            parent_path = parent_path + '.'
+        else:
+            parent_path = ''
+        for k,v in config.items():
+            if not KEY_RE.fullmatch(k):
+                raise ValueError('invalid config key: %r' % k)
+            path = parent_path + k
+            validate_item(path, v, schema)
+    nest(None, config, schema)
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..27194a9
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,405 @@
+# osmo_gsm_tester: global logging
+import os
+import sys
+import time
+import traceback
+import contextlib
+from inspect import getframeinfo, stack
+L_ERR = 30
+L_LOG = 20
+L_DBG = 10
+C_NET = 'net'
+C_RUN = 'run'
+C_TST = 'tst'
+C_CNF = 'cnf'
+C_DEFAULT = '---'
+LONG_DATEFMT = '%Y-%m-%d_%H:%M:%S'
+DATEFMT = '%H:%M:%S'
+class LogTarget:
+    do_log_time = None
+    do_log_category = None
+    do_log_level = None
+    do_log_origin = None
+    do_log_traceback = None
+    do_log_src = None
+    origin_width = None
+    origin_fmt = None
+    # redirected by logging test
+    get_time_str = lambda self: time.strftime(self.log_time_fmt)
+    # sink that gets each complete logging line
+    log_sink = sys.stderr.write
+    category_levels = None
+    def __init__(self):
+        self.category_levels = {}
+    def style(self, time=True, time_fmt=DATEFMT, category=True, level=True, origin=True, origin_width=0, src=True, trace=False):
+        '''
+        set all logging format aspects, to defaults if not passed:
+        time: log timestamps;
+        time_fmt: format of timestamps;
+        category: print the logging category (three letters);
+        level: print the logging level, unless it is L_LOG;
+        origin: print which object(s) the message originated from;
+        origin_width: fill up the origin string with whitespace to this witdh;
+        src: log the source file and line number the log comes from;
+        trace: on exceptions, log the full stack trace;
+        '''
+        self.log_time_fmt = time_fmt
+        self.do_log_time = bool(time)
+        if not self.log_time_fmt:
+            self.do_log_time = False
+        self.do_log_category = bool(category)
+        self.do_log_level = bool(level)
+        self.do_log_origin = bool(origin)
+        self.origin_width = int(origin_width)
+        self.origin_fmt = '{:>%ds}' % self.origin_width
+        self.do_log_src = src
+        self.do_log_traceback = trace
+    def style_change(self, time=None, time_fmt=None, category=None, level=None, origin=None, origin_width=None, src=None, trace=None):
+        'modify only the given aspects of the logging format'
+            time=(time if time is not None else self.do_log_time),
+            time_fmt=(time_fmt if time_fmt is not None else self.log_time_fmt),
+            category=(category if category is not None else self.do_log_category),
+            level=(level if level is not None else self.do_log_level),
+            origin=(origin if origin is not None else self.do_log_origin),
+            origin_width=(origin_width if origin_width is not None else self.origin_width),
+            src=(src if src is not None else self.do_log_src),
+            trace=(trace if trace is not None else self.do_log_traceback),
+            )
+    def set_level(self, category, level):
+        'set global logging log.L_* level for a given log.C_* category'
+        self.category_levels[category] = level
+    def is_enabled(self, category, level):
+        if level == L_TRACEBACK:
+            return self.do_log_traceback
+        is_level = self.category_levels.get(category)
+        if is_level is None:
+            is_level = L_LOG
+        if level < is_level:
+            return False
+        return True
+    def log(self, origin, category, level, src, messages, named_items):
+        if category and len(category) != 3:
+            self.log_sink('WARNING: INVALID LOG SUBSYSTEM %r\n' % category)
+            self.log_sink('origin=%r category=%r level=%r\n' % (origin, category, level));
+        if not category:
+            category = C_DEFAULT
+        if not self.is_enabled(category, level):
+            return
+        log_pre = []
+        if self.do_log_time:
+            log_pre.append(self.get_time_str())
+        if self.do_log_category:
+            log_pre.append(category)
+        if self.do_log_origin:
+            if origin is None:
+                name = '-'
+            elif isinstance(origin, str):
+                name = origin or None
+            elif hasattr(origin, '_name'):
+                name = origin._name
+            if not name:
+                name = str(origin.__class__.__name__)
+            log_pre.append(self.origin_fmt.format(name))
+        if self.do_log_level and level != L_LOG:
+            log_pre.append(level_str(level) or ('loglevel=' + str(level)) )
+        log_line = [str(m) for m in messages]
+        if named_items:
+            # unfortunately needs to be sorted to get deterministic results
+            log_line.append('{%s}' %
+                            (', '.join(['%s=%r' % (k,v)
+                             for k,v in sorted(named_items.items())])))
+        if self.do_log_src and src:
+            log_line.append(' [%s]' % str(src))
+        log_str = '%s%s%s' % (' '.join(log_pre),
+                              ': ' if log_pre else '',
+                              ' '.join(log_line))
+        self.log_sink(log_str.strip() + '\n')
+targets = [ LogTarget() ]
+def level_str(level):
+    if level == L_TRACEBACK:
+        return L_TRACEBACK
+    if level <= L_DBG:
+        return 'DBG'
+    if level <= L_LOG:
+        return 'LOG'
+    return 'ERR'
+def _log_all_targets(origin, category, level, src, messages, named_items=None):
+    global targets
+    if isinstance(src, int):
+        src = get_src_from_caller(src + 1)
+    for target in targets:
+        target.log(origin, category, level, src, messages, named_items)
+def get_src_from_caller(levels_up=1):
+    caller = getframeinfo(stack()[levels_up][0])
+    return '%s:%d' % (os.path.basename(caller.filename), caller.lineno)
+def get_src_from_tb(tb, levels_up=1):
+    ftb = traceback.extract_tb(tb)
+    f,l,m,c = ftb[-levels_up]
+    f = os.path.basename(f)
+    return '%s:%s: %s' % (f, l, c)
+class Origin:
+    '''
+    Base class for all classes that want to log,
+    and to add an origin string to a code path:
+    with log.Origin('my name'):
+        raise Problem()
+    This will log 'my name' as an origin for the Problem.
+    '''
+    _log_category = None
+    _src = None
+    _name = None
+    _log_line_buf = None
+    _prev_stdout = None
+    _global_current_origin = None
+    _parent_origin = None
+    def __init__(self, *name_items, category=None, **detail_items):
+        self.set_log_category(category)
+        self.set_name(*name_items, **detail_items)
+    def set_name(self, *name_items, **detail_items):
+        if name_items:
+            name = '-'.join([str(i) for i in name_items])
+        elif not detail_items:
+            name = self.__class__.__name__
+        else:
+            name = ''
+        if detail_items:
+            details = '(%s)' % (', '.join([("%s=%r" % (k,v))
+                                           for k,v in sorted(detail_items.items())]))
+        else:
+            details = ''
+        self._name = name + details
+    def name(self):
+        return self._name
+    def set_log_category(self, category):
+        self._log_category = category
+    def _log(self, level, messages, named_items=None, src_levels_up=3, origins=None):
+        src = self._src or src_levels_up
+        origin = origins or self.gather_origins()
+        _log_all_targets(origin, self._log_category, level, src, messages, named_items)
+    def dbg(self, *messages, **named_items):
+        self._log(L_DBG, messages, named_items)
+    def log(self, *messages, **named_items):
+        self._log(L_LOG, messages, named_items)
+    def err(self, *messages, **named_items):
+        self._log(L_ERR, messages, named_items)
+    def log_exn(self, exc_info=None):
+        log_exn(self, self._log_category, exc_info)
+    def __enter__(self):
+        if self._parent_origin is not None:
+            return
+        if Origin._global_current_origin == self:
+            return
+        self._parent_origin, Origin._global_current_origin = Origin._global_current_origin, self
+    def __exit__(self, *exc_info):
+        rc = None
+        if exc_info[0] is not None:
+            rc = exn_add_info(exc_info, self)
+        Origin._global_current_origin, self._parent_origin = self._parent_origin, None
+        return rc
+    def redirect_stdout(self):
+        return contextlib.redirect_stdout(self)
+    def write(self, message):
+        'to redirect stdout to the log'
+        lines = message.splitlines()
+        if not lines:
+            return
+        if self._log_line_buf:
+            lines[0] = self._log_line_buf + lines[0]
+            self._log_line_buf = None
+        if not message.endswith('\n'):
+            self._log_line_buf = lines[-1]
+            lines = lines[:-1]
+        origins = self.gather_origins()
+        for line in lines:
+            self._log(L_LOG, (line,), origins=origins)
+    def flush(self):
+        pass
+    def gather_origins(self):
+        origins = Origins()
+        origin = self
+        while origin:
+            origins.add(origin)
+            origin = origin._parent_origin
+        return str(origins)
+def dbg(origin, category, *messages, **named_items):
+    _log_all_targets(origin, category, L_DBG, 2, messages, named_items)
+def log(origin, category, *messages, **named_items):
+    _log_all_targets(origin, category, L_LOG, 2, messages, named_items)
+def err(origin, category, *messages, **named_items):
+    _log_all_targets(origin, category, L_ERR, 2, messages, named_items)
+def trace(origin, category, exc_info):
+    _log_all_targets(origin, category, L_TRACEBACK, None,
+                     traceback.format_exception(*exc_info))
+def resolve_category(origin, category):
+    if category is not None:
+        return category
+    if not hasattr(origin, '_log_category'):
+        return None
+    return origin._log_category
+def exn_add_info(exc_info, origin, category=None):
+    etype, exception, tb = exc_info
+    if not hasattr(exception, 'origins'):
+ = Origins()
+    if not hasattr(exception, 'category'):
+        # only remember the deepest category
+        exception.category = resolve_category(origin, category)
+    if not hasattr(exception, 'src'):
+        exception.src = get_src_from_tb(tb)
+    return False
+def log_exn(origin=None, category=None, exc_info=None):
+    if not (exc_info is not None and len(exc_info) == 3):
+        exc_info = sys.exc_info()
+        if not (exc_info is not None and len(exc_info) == 3):
+            raise RuntimeError('invalid call to log_exn() -- no valid exception info')
+    etype, exception, tb = exc_info
+    # if there are origins recorded with the Exception, prefer that
+    if hasattr(exception, 'origins'):
+        origin = str(
+    # if there is a category recorded with the Exception, prefer that
+    if hasattr(exception, 'category'):
+        category = exception.category
+    if hasattr(exception, 'msg'):
+        msg = exception.msg
+    else:
+        msg = str(exception)
+    if hasattr(exception, 'src'):
+        src = exception.src
+    else:
+        src = 2
+    trace(origin, category, exc_info)
+    _log_all_targets(origin, category, L_ERR, src,
+                     ('%s:' % str(etype.__name__), msg))
+class Origins(list):
+    def __init__(self, origin=None):
+        if origin is not None:
+            self.add(origin)
+    def add(self, origin):
+        if hasattr(origin, '_name'):
+            origin_str = origin._name
+        else:
+            origin_str = str(origin)
+        self.insert(0, origin_str)
+    def __str__(self):
+        return '->'.join(self)
+def set_level(category, level):
+    global targets
+    for target in targets:
+        target.set_level(category, level)
+def style(**kwargs):
+    global targets
+    for target in targets:
+def style_change(**kwargs):
+    global targets
+    for target in targets:
+        target.style_change(**kwargs)
+class TestsTarget(LogTarget):
+    'LogTarget producing deterministic results for regression tests'
+    def __init__(self, out=sys.stdout):
+        super().__init__()
+, src=False)
+        self.log_sink = out.write
+def run_logging_exceptions(func, *func_args, return_on_failure=None, **func_kwargs):
+    try:
+        return func(*func_args, **func_kwargs)
+    except:
+        log_exn()
+        return return_on_failure
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..2e0ff52
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,23 @@
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..bebc82d
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,51 @@
+import os
+from . import log
+from . import config
+from .utils import listdict, FileLock
+class Resources(log.Origin):
+    def __init__(self, config_path, lock_dir):
+        self.config_path = config_path
+        self.lock_dir = lock_dir
+        self.set_name(conf=self.config_path, lock=self.lock_dir)
+    def ensure_lock_dir_exists(self):
+        if not os.path.isdir(self.lock_dir):
+            os.makedirs(self.lock_dir)
+global_resources = listdict()
+def register(kind, instance):
+    global global_resources
+    global_resources.add(kind, instance)
+def reserve(user, config):
+    asdf
+def read_conf(path):
+    with open(path, 'r') as f:
+        conf =
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..fb7c34d
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,150 @@
+import os
+from . import config, log, template, utils
+class Suite(log.Origin):
+    '''A test suite reserves resources for a number of tests.
+       Each test requires a specific number of modems, BTSs etc., which are
+       reserved beforehand by a test suite. This way several test suites can be
+       scheduled dynamically without resource conflicts arising halfway through
+       the tests.'''
+    CONF_FILENAME = 'suite.conf'
+    CONF_SCHEMA = {
+            'resources.nitb_iface': config.INT,
+            'resources.nitb': config.INT,
+            'resources.bts': config.INT,
+            'resources.msisdn': config.INT,
+            'resources.modem': config.INT,
+            'defaults.timeout': config.STR,
+        }
+    class Results:
+        def __init__(self):
+            self.passed = []
+            self.failed = []
+            self.all_passed = None
+        def add_pass(self, test):
+            self.passed.append(test)
+        def add_fail(self, test):
+            self.failed.append(test)
+        def conclude(self):
+            self.all_passed = bool(self.passed) and not bool(self.failed)
+            return self
+    def __init__(self, suite_dir):
+        self.set_log_category(log.C_CNF)
+        self.suite_dir = suite_dir
+        self.set_name(os.path.basename(self.suite_dir))
+        self.read_conf()
+    def read_conf(self):
+        with self:
+            if not os.path.isdir(self.suite_dir):
+                raise RuntimeError('No such directory: %r' % self.suite_dir)
+            self.conf =,
+                                                 Suite.CONF_FILENAME),
+                                    Suite.CONF_SCHEMA)
+            self.load_tests()
+    def load_tests(self):
+        with self:
+            self.tests = []
+            for basename in os.listdir(self.suite_dir):
+                if not basename.endswith('.py'):
+                    continue
+                self.tests.append(Test(self, basename))
+    def add_test(self, test):
+        with self:
+            if not isinstance(test, Test):
+                raise ValueError('add_test(): pass a Test() instance, not %s' % type(test))
+            if test.suite is None:
+                test.suite = self
+            if test.suite is not self:
+                raise ValueError('add_test(): test already belongs to another suite')
+            self.tests.append(test)
+    def run_tests(self):
+        results = Suite.Results()
+        for test in self.tests:
+            self._run_test(test, results)
+        return results.conclude()
+    def run_tests_by_name(self, *names):
+        results = Suite.Results()
+        for name in names:
+            basename = name
+            if not basename.endswith('.py'):
+                basename = name + '.py'
+            for test in self.tests:
+                if basename == test.basename:
+                    self._run_test(test, results)
+                    break
+        return results.conclude()
+    def _run_test(self, test, results):
+        try:
+            with self:
+            results.add_pass(test)
+        except:
+            results.add_fail(test)
+            self.log_exn()
+class Test(log.Origin):
+    def __init__(self, suite, test_basename):
+        self.suite = suite
+        self.basename = test_basename
+        self.set_name(self.basename)
+        self.set_log_category(log.C_TST)
+        self.path = os.path.join(self.suite.suite_dir, self.basename)
+        with self:
+            with open(self.path, 'r') as f:
+                self.script =
+    def run(self):
+        with self:
+            self.code = compile(self.script, self.path, 'exec')
+            with self.redirect_stdout():
+                exec(self.code, self.test_globals())
+                self._success = True
+    def test_globals(self):
+        test_globals = {
+            'this': utils.dict2obj({
+                    'suite': self.suite.suite_dir,
+                    'test': self.basename,
+                }),
+            'resources': utils.dict2obj({
+                }),
+        }
+        return test_globals
+def load(suite_dir):
+    return Suite(suite_dir)
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..434ab62
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,56 @@
+import os, sys
+from mako.template import Template
+from mako.lookup import TemplateLookup
+from . import log
+from .utils import dict2obj
+_lookup = None
+_logger = log.Origin('no templates dir set')
+def set_templates_dir(*templates_dirs):
+    global _lookup
+    global _logger
+    if not templates_dirs:
+        # default templates dir is relative to this source file
+        templates_dirs = [os.path.join(os.path.dirname(__file__), 'templates')]
+    for d in templates_dirs:
+        if not os.path.isdir(d):
+            raise RuntimeError('templates dir is not a dir: %r'
+                               % os.path.abspath(d))
+    _lookup = TemplateLookup(directories=templates_dirs)
+    _logger = log.Origin('Templates', category=log.C_CNF)
+def render(name, values):
+    '''feed values dict into template and return rendered result.
+       ".tmpl" is added to the name to look it up in the templates dir.'''
+    global _lookup
+    if _lookup is None:
+        set_templates_dir()
+    with _logger:
+        tmpl_name = name + '.tmpl'
+        template = _lookup.get_template(tmpl_name)
+        _logger.dbg('rendering', tmpl_name)
+        return template.render(**dict2obj(values))
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/templates/osmo-bts.cfg.tmpl b/src/osmo_gsm_tester/templates/osmo-bts.cfg.tmpl
new file mode 100644
index 0000000..20fa57f
--- /dev/null
+++ b/src/osmo_gsm_tester/templates/osmo-bts.cfg.tmpl
@@ -0,0 +1,21 @@
+! OsmoBTS () configuration saved from vty
+log stderr
+  logging color 1
+  logging timestamp 1
+  logging print extended-timestamp 1
+  logging print category 1
+  logging level all debug
+  logging level l1c info
+  logging level linp info
+phy 0
+ instance 0
+bts 0
+ band {band}
+ ipa unit-id {ipa_unit_id} 0
+ oml remote-ip {oml_remote_ip}
+ trx 0
+  phy 0 instance 0
diff --git a/src/osmo_gsm_tester/templates/osmo-nitb.cfg.tmpl b/src/osmo_gsm_tester/templates/osmo-nitb.cfg.tmpl
new file mode 100644
index 0000000..3404b7f
--- /dev/null
+++ b/src/osmo_gsm_tester/templates/osmo-nitb.cfg.tmpl
@@ -0,0 +1,87 @@
+! OpenBSC configuration saved from vty
+password foo
+log stderr
+ logging filter all 1
+ logging color 0
+ logging print category 0
+ logging print extended-timestamp 1
+ logging level all debug
+line vty
+ no login
+ bind ${vty_bind_ip}
+ e1_line 0 driver ipa
+ ipa bind ${abis_bind_ip}
+ network country code ${mcc}
+ mobile network code ${mnc}
+ short name ${net_name_short}
+ long name ${net_name_long}
+ auth policy ${net_auth_policy}
+ location updating reject cause 13
+ encryption a5 ${encryption}
+ neci 1
+ rrlp mode none
+ mm info 1
+ handover 0
+ handover window rxlev averaging 10
+ handover window rxqual averaging 1
+ handover window rxlev neighbor averaging 10
+ handover power budget interval 6
+ handover power budget hysteresis 3
+ handover maximum distance 9999
+ timer t3101 10
+ timer t3103 0
+ timer t3105 0
+ timer t3107 0
+ timer t3109 4
+ timer t3111 0
+ timer t3113 60
+ timer t3115 0
+ timer t3117 0
+ timer t3119 0
+ timer t3141 0
+ local-tcp-ip ${smpp_bind_ip} 2775
+ system-id test
+ policy closed
+ esme test
+  password test
+  default-route
+ bind ${ctrl_bind_ip}
+%for bts in bts_list:
+ bts ${loop.index}
+  type ${bts.type}
+  band ${}
+  cell_identity 0
+  location_area_code ${bts.location_area_code}
+  training_sequence_code 7
+  base_station_id_code ${bts.base_station_id_code}
+  ms max power 15
+  cell reselection hysteresis 4
+  rxlev access min 0
+  channel allocator ascending
+  rach tx integer 9
+  rach max transmission 7
+  ip.access unit_id ${bts.unit_id} 0
+  oml ip.access stream_id ${bts.stream_id} line 0
+  gprs mode none
+% for trx in bts.trx_list:
+  trx ${loop.index}
+   rf_locked 0
+   arfcn ${trx.arfcn}
+   nominal power 23
+   max_power_red ${trx.max_power_red}
+   rsl e1 tei 0
+%  for ts in trx.timeslot_list:
+   timeslot ${loop.index}
+    phys_chan_config ${ts.phys_chan_config}
+%  endfor
+% endfor
diff --git a/src/osmo_gsm_tester/templates/osmo-pcu.cfg.tmpl b/src/osmo_gsm_tester/templates/osmo-pcu.cfg.tmpl
new file mode 100644
index 0000000..b88e6e7
--- /dev/null
+++ b/src/osmo_gsm_tester/templates/osmo-pcu.cfg.tmpl
@@ -0,0 +1,6 @@
+ flow-control-interval 10
+ cs 2
+ alloc-algorithm dynamic
+ alpha 0
+ gamma 0
diff --git a/src/osmo_gsm_tester/templates/osmo-sgsn.cfg.tmpl b/src/osmo_gsm_tester/templates/osmo-sgsn.cfg.tmpl
new file mode 100644
index 0000000..4955983
--- /dev/null
+++ b/src/osmo_gsm_tester/templates/osmo-sgsn.cfg.tmpl
@@ -0,0 +1,26 @@
+! Osmocom SGSN configuration
+line vty
+ no login
+ gtp local-ip
+ ggsn 0 remote-ip
+ ggsn 0 gtp-version 1
+ timer tns-block 3
+ timer tns-block-retries 3
+ timer tns-reset 3
+ timer tns-reset-retries 3
+ timer tns-test 30
+ timer tns-alive 3
+ timer tns-alive-retries 10
+ encapsulation udp local-ip
+ encapsulation udp local-port 23000
+ encapsulation framerelay-gre enabled 0
diff --git a/src/osmo_gsm_tester/templates/sysmobts-mgr.cfg.tmpl b/src/osmo_gsm_tester/templates/sysmobts-mgr.cfg.tmpl
new file mode 100644
index 0000000..3b28d78
--- /dev/null
+++ b/src/osmo_gsm_tester/templates/sysmobts-mgr.cfg.tmpl
@@ -0,0 +1,24 @@
+! SysmoMgr ( configuration saved from vty
+log stderr
+  logging filter all 1
+  logging color 1
+  logging timestamp 0
+  logging level all everything
+  logging level temp info
+  logging level fw info
+  logging level find info
+  logging level lglobal notice
+  logging level llapd notice
+  logging level linp notice
+  logging level lmux notice
+  logging level lmi notice
+  logging level lmib notice
+  logging level lsms notice
+line vty
+ no login
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..fd5a640
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,43 @@
+import sys, os
+import pprint
+import inspect
+from . import suite as _suite
+from . import log
+from . import resource
+# load the configuration for the test
+suite = _suite.Suite(sys.path[0])
+test = _suite.Test(suite, os.path.basename(inspect.stack()[-1][1]))
+def test_except_hook(*exc_info):
+    log.exn_add_info(exc_info, test)
+    log.exn_add_info(exc_info, suite)
+    log.log_exn(exc_info=exc_info)
+sys.excepthook = test_except_hook
+orig_stdout, sys.stdout = sys.stdout, test
+resources = {}
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ b/src/osmo_gsm_tester/
new file mode 100644
index 0000000..9992d44
--- /dev/null
+++ b/src/osmo_gsm_tester/
@@ -0,0 +1,118 @@
+import os
+import fcntl
+class listdict:
+    'a dict of lists { "a": [1, 2, 3],  "b": [1, 2] }'
+    def __getattr__(ld, name):
+        if name == 'add':
+            return ld.__getattribute__(name)
+        return ld.__dict__.__getattribute__(name)
+    def add(ld, name, item):
+        l = ld.__dict__.get(name)
+        if not l:
+            l = []
+            ld.__dict__[name] = l
+        l.append(item)
+        return l
+    def add_dict(ld, d):
+        for k,v in d.items():
+            ld.add(k, v)
+    def __setitem__(ld, name, val):
+        return ld.__dict__.__setitem__(name, val)
+    def __getitem__(ld, name):
+        return ld.__dict__.__getitem__(name)
+    def __str__(ld):
+        return ld.__dict__.__str__()
+class DictProxy:
+    '''
+    allow accessing dict entries like object members
+    syntactical sugar, adapted from
+    so that e.g. templates can do ${bts.member} instead of ${bts['member']}
+    '''
+    def __init__(self, obj):
+        self.obj = obj
+    def __getitem__(self, key):
+        return dict2obj(self.obj[key])
+    def __getattr__(self, key):
+        try:
+            return dict2obj(getattr(self.obj, key))
+        except AttributeError:
+            try:
+                return self[key]
+            except KeyError:
+                raise AttributeError(key)
+class ListProxy:
+    'allow nesting for DictProxy'
+    def __init__(self, obj):
+        self.obj = obj
+    def __getitem__(self, key):
+        return dict2obj(self.obj[key])
+def dict2obj(value):
+    if isinstance(value, dict):
+        return DictProxy(value)
+    if isinstance(value, (tuple, list)):
+        return ListProxy(value)
+    return value
+class FileLock:
+    def __init__(self, path, owner):
+        self.path = path
+        self.owner = owner
+        self.f = None
+    def __enter__(self):
+        if self.f is not None:
+            return
+        self.fd =, os.O_CREAT | os.O_WRONLY | os.O_TRUNC)
+        fcntl.flock(self.fd, fcntl.LOCK_EX)
+        os.truncate(self.fd, 0)
+        os.write(self.fd, str(self.owner).encode('utf-8'))
+        os.fsync(self.fd)
+    def __exit__(self, *exc_info):
+        #fcntl.flock(self.fd, fcntl.LOCK_UN)
+        os.truncate(self.fd, 0)
+        os.fsync(self.fd)
+        os.close(self.fd)
+        self.fd = -1
+    def lock(self):
+        self.__enter__()
+    def unlock(self):
+        self.__exit__()
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/ b/src/
new file mode 100755
index 0000000..ff15204
--- /dev/null
+++ b/src/
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+# osmo_gsm_tester: invoke a single test run
+'''osmo_gsm_tester: invoke a single test run.
+./ ~/path/to/test_package/
+Upon launch, a 'test_package/run-<date>' directory will be created.
+When complete, a symbolic link 'test_package/last_run' will point at this dir.
+The run dir then contains logs and test results.
+import osmo_gsm_tester
+if __name__ == '__main__':
+    import argparse
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-V', '--version', action='store_true',
+            help='Show version')
+    parser.add_argument('test_package', nargs='*',
+            help='Directory containing binaries to test')
+    args = parser.parse_args()
+    if args.version:
+        print(osmo_gsm_tester.__version__)
+        exit(0)
+# vim: expandtab tabstop=4 shiftwidth=4