core implementation
code bomb implementing the bulk of the osmo-gsm-tester
Change-Id: I53610becbf643ed51b90cfd9debc6992fe211ec9
diff --git a/src/osmo_gsm_tester/config.py b/src/osmo_gsm_tester/config.py
index 18b209e..0c820c3 100644
--- a/src/osmo_gsm_tester/config.py
+++ b/src/osmo_gsm_tester/config.py
@@ -1,4 +1,4 @@
-# osmo_gsm_tester: read and validate config files
+# osmo_gsm_tester: read and manage config files and global config
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
@@ -28,35 +28,124 @@
#
# JSON has too much braces and quotes to be easy to type
#
-# YAML formatting is lean, but too powerful. The normal load() allows arbitrary
-# code execution. There is safe_load(). But YAML also allows several
-# alternative ways of formatting, better to have just one authoritative style.
-# Also it would be better to receive every setting as simple string rather than
-# e.g. an IMSI as an integer.
+# YAML formatting is lean, but:
+# - too powerful. The normal load() allows arbitrary code execution. There is
+# safe_load().
+# - allows several alternative ways of formatting, better to have just one
+# authoritative style.
+# - tries to detect types. It would be better to receive every setting as
+# simple string rather than e.g. an IMSI as an integer.
+# - e.g. an IMSI starting with a zero is interpreted as octal value, resulting
+# in super confusing error messages if the user merely forgets to quote it.
+# - does not tell me which line a config item came from, so no detailed error
+# message is possible.
#
-# The Python ConfigParserShootout page has numerous contestants, but it we want
-# to use widely used, standardized parsing code without re-inventing the wheel.
+# The Python ConfigParserShootout page has numerous contestants, but many of
+# those seem to be not widely used / standardized or even tested.
# https://wiki.python.org/moin/ConfigParserShootout
#
# The optimum would be a stripped down YAML format.
# In the lack of that, we shall go with yaml.load_safe() + a round trip
# (feeding back to itself), converting keys to lowercase and values to string.
+# There is no solution for octal interpretations nor config file source lines
+# unless, apparently, we implement our own config parser.
import yaml
-import re
import os
-from . import log
+from . import log, schema, util
+from .util import is_dict, is_list, Dir, get_tempdir
-def read(path, schema=None):
+ENV_PREFIX = 'OSMO_GSM_TESTER_'
+ENV_CONF = os.getenv(ENV_PREFIX + 'CONF')
+
+DEFAULT_CONFIG_LOCATIONS = [
+ '.',
+ os.path.join(os.getenv('HOME'), '.config', 'osmo_gsm_tester'),
+ '/usr/local/etc/osmo_gsm_tester',
+ '/etc/osmo_gsm_tester'
+ ]
+
+PATHS_CONF = 'paths.conf'
+PATH_STATE_DIR = 'state_dir'
+PATH_SUITES_DIR = 'suites_dir'
+PATH_SCENARIOS_DIR = 'scenarios_dir'
+PATHS_SCHEMA = {
+ PATH_STATE_DIR: schema.STR,
+ PATH_SUITES_DIR: schema.STR,
+ PATH_SCENARIOS_DIR: schema.STR,
+ }
+
+PATHS_TEMPDIR_STR = '$TEMPDIR'
+
+PATHS = None
+
+def get_config_file(basename, fail_if_missing=True):
+ if ENV_CONF:
+ locations = [ ENV_CONF ]
+ else:
+ locations = DEFAULT_CONFIG_LOCATIONS
+
+ for l in locations:
+ p = os.path.join(l, basename)
+ if os.path.isfile(p):
+ return p
+ if not fail_if_missing:
+ return None
+ raise RuntimeError('configuration file not found: %r in %r' % (basename,
+ [os.path.abspath(p) for p in locations]))
+
+def read_config_file(basename, validation_schema=None, if_missing_return=False):
+ fail_if_missing = True
+ if if_missing_return is not False:
+ fail_if_missing = False
+ path = get_config_file(basename, fail_if_missing=fail_if_missing)
+ return read(path, validation_schema=validation_schema, if_missing_return=if_missing_return)
+
+def get_configured_path(label, allow_unset=False):
+ global PATHS
+
+ env_name = ENV_PREFIX + label.upper()
+ env_path = os.getenv(env_name)
+ if env_path:
+ return env_path
+
+ if PATHS is None:
+ paths_file = get_config_file(PATHS_CONF)
+ PATHS = read(paths_file, PATHS_SCHEMA)
+ p = PATHS.get(label)
+ if p is None and not allow_unset:
+ raise RuntimeError('missing configuration in %s: %r' % (PATHS_CONF, label))
+
+ if p.startswith(PATHS_TEMPDIR_STR):
+ p = os.path.join(get_tempdir(), p[len(PATHS_TEMPDIR_STR):])
+ return p
+
+def get_state_dir():
+ return Dir(get_configured_path(PATH_STATE_DIR))
+
+def get_suites_dir():
+ return Dir(get_configured_path(PATH_SUITES_DIR))
+
+def get_scenarios_dir():
+ return Dir(get_configured_path(PATH_SCENARIOS_DIR))
+
+def read(path, validation_schema=None, if_missing_return=False):
with log.Origin(path):
+ if not os.path.isfile(path) and if_missing_return is not False:
+ return if_missing_return
with open(path, 'r') as f:
config = yaml.safe_load(f)
config = _standardize(config)
- if schema:
- validate(config, schema)
+ if validation_schema:
+ schema.validate(config, validation_schema)
return config
+def write(path, config):
+ with log.Origin(path):
+ with open(path, 'w') as f:
+ f.write(tostr(config))
+
def tostr(config):
return _tostr(_standardize(config))
@@ -74,88 +163,84 @@
config = yaml.safe_load(_tostr(_standardize_item(config)))
return config
+def get_defaults(for_kind):
+ defaults = read_config_file('default.conf', if_missing_return={})
+ return defaults.get(for_kind, {})
-KEY_RE = re.compile('[a-zA-Z][a-zA-Z0-9_]*')
+def get_scenario(name, validation_schema=None):
+ scenarios_dir = get_scenarios_dir()
+ if not name.endswith('.conf'):
+ name = name + '.conf'
+ path = scenarios_dir.child(name)
+ if not os.path.isfile(path):
+ raise RuntimeError('No such scenario file: %r' % path)
+ return read(path, validation_schema=validation_schema)
-def band(val):
- if val in ('GSM-1800', 'GSM-1900'):
+def add(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot add to dict a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ with log.Origin(key=key):
+ add(dest_val, val)
return
- raise ValueError('Unknown GSM band: %r' % val)
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot add to list a value of type: %r' % type(src))
+ dest.extend(src)
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
+ % (dest, src))
-INT = 'int'
-STR = 'str'
-BAND = 'band'
-SCHEMA_TYPES = {
- INT: int,
- STR: str,
- BAND: band,
- }
+def combine(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot combine dict with a value of type: %r' % type(src))
-def is_dict(l):
- return isinstance(l, dict)
+ for key, val in src.items():
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ with log.Origin(key=key):
+ combine(dest_val, val)
+ return
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot combine list with a value of type: %r' % type(src))
+ for i in range(len(src)):
+ with log.Origin(idx=i):
+ combine(dest[i], src[i])
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
+ % (dest, src))
-def is_list(l):
- return isinstance(l, (list, tuple))
+def overlay(dest, src):
+ if is_dict(dest):
+ if not is_dict(src):
+ raise ValueError('cannot combine dict with a value of type: %r' % type(src))
-def validate(config, schema):
- '''Make sure the given config dict adheres to the schema.
- The schema is a dict of 'dict paths' in dot-notation with permitted
- value type. All leaf nodes are validated, nesting dicts are implicit.
-
- validate( { 'a': 123, 'b': { 'b1': 'foo', 'b2': [ 1, 2, 3 ] } },
- { 'a': int,
- 'b.b1': str,
- 'b.b2[]': int } )
-
- Raise a ValueError in case the schema is violated.
- '''
-
- def validate_item(path, value, schema):
- want_type = schema.get(path)
-
- if is_list(value):
- if want_type:
- raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
- path = path + '[]'
- want_type = schema.get(path)
-
- if not want_type:
- if is_dict(value):
- nest(path, value, schema)
- return
- if is_list(value) and value:
- for list_v in value:
- validate_item(path, list_v, schema)
- return
- raise ValueError('config item not known: %r' % path)
-
- if want_type not in SCHEMA_TYPES:
- raise ValueError('unknown type %r at %r' % (want_type, path))
-
- if is_dict(value):
- raise ValueError('config item is dict but should be a leaf node of type %r: %r'
- % (want_type, path))
-
- if is_list(value):
- for list_v in value:
- validate_item(path, list_v, schema)
- return
-
- with log.Origin(item=path):
- type_validator = SCHEMA_TYPES.get(want_type)
- type_validator(value)
-
- def nest(parent_path, config, schema):
- if parent_path:
- parent_path = parent_path + '.'
- else:
- parent_path = ''
- for k,v in config.items():
- if not KEY_RE.fullmatch(k):
- raise ValueError('invalid config key: %r' % k)
- path = parent_path + k
- validate_item(path, v, schema)
-
- nest(None, config, schema)
+ for key, val in src.items():
+ dest_val = dest.get(key)
+ with log.Origin(key=key):
+ dest[key] = overlay(dest_val, val)
+ return dest
+ if is_list(dest):
+ if not is_list(src):
+ raise ValueError('cannot combine list with a value of type: %r' % type(src))
+ for i in range(len(src)):
+ with log.Origin(idx=i):
+ dest[i] = overlay(dest[i], src[i])
+ return dest
+ return src
# vim: expandtab tabstop=4 shiftwidth=4