Generate schemas dynamically from pieces provided by each object class
This way we benefit from:
* knowing which attributes are used/required by each object class and
subclass
* Having validation function definitions near the class going to use them
Change-Id: I8fd6773c51d19405a585977af4ed72cad2b21db1
diff --git a/src/osmo_gsm_tester/core/config.py b/src/osmo_gsm_tester/core/config.py
index 9333601..6730807 100644
--- a/src/osmo_gsm_tester/core/config.py
+++ b/src/osmo_gsm_tester/core/config.py
@@ -54,7 +54,8 @@
import os
import copy
-from . import log, schema, util, template
+from . import log, util, template
+from . import schema
from .util import is_dict, is_list, Dir, get_tempdir
ENV_PREFIX = 'OSMO_GSM_TESTER_'
@@ -288,68 +289,6 @@
sc.read_from_file(validation_schema)
return sc
-def add(dest, src):
- if is_dict(dest):
- if not is_dict(src):
- raise ValueError('cannot add to dict a value of type: %r' % type(src))
-
- for key, val in src.items():
- dest_val = dest.get(key)
- if dest_val is None:
- dest[key] = val
- else:
- log.ctx(key=key)
- add(dest_val, val)
- return
- if is_list(dest):
- if not is_list(src):
- raise ValueError('cannot add to list a value of type: %r' % type(src))
- dest.extend(src)
- return
- if dest == src:
- return
- raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
- % (dest, src))
-
-def combine(dest, src):
- if is_dict(dest):
- if not is_dict(src):
- raise ValueError('cannot combine dict with a value of type: %r' % type(src))
-
- for key, val in src.items():
- log.ctx(key=key)
- dest_val = dest.get(key)
- if dest_val is None:
- dest[key] = val
- else:
- combine(dest_val, val)
- return
- if is_list(dest):
- if not is_list(src):
- raise ValueError('cannot combine list with a value of type: %r' % type(src))
- # Validate that all elements in both lists are of the same type:
- t = util.list_validate_same_elem_type(src + dest)
- if t is None:
- return # both lists are empty, return
- # For lists of complex objects, we expect them to be sorted lists:
- if t in (dict, list, tuple):
- for i in range(len(dest)):
- log.ctx(idx=i)
- src_it = src[i] if i < len(src) else util.empty_instance_type(t)
- combine(dest[i], src_it)
- for i in range(len(dest), len(src)):
- log.ctx(idx=i)
- dest.append(src[i])
- else: # for lists of basic elements, we handle them as unsorted sets:
- for elem in src:
- if elem not in dest:
- dest.append(elem)
- return
- if dest == src:
- return
- raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
- % (dest, src))
-
def overlay(dest, src):
if is_dict(dest):
if not is_dict(src):
diff --git a/src/osmo_gsm_tester/core/schema.py b/src/osmo_gsm_tester/core/schema.py
index d343bef..588c432 100644
--- a/src/osmo_gsm_tester/core/schema.py
+++ b/src/osmo_gsm_tester/core/schema.py
@@ -18,9 +18,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
+import os
from . import log
-from .util import is_dict, is_list, str2bool, ENUM_OSMO_AUTH_ALGO
+from . import util
KEY_RE = re.compile('[a-zA-Z][a-zA-Z0-9_]*')
IPV4_RE = re.compile('([0-9]{1,3}.){3}[0-9]{1,3}')
@@ -62,7 +63,7 @@
match_re('MSISDN', MSISDN_RE, val)
def auth_algo(val):
- if val not in ENUM_OSMO_AUTH_ALGO:
+ if val not in util.ENUM_OSMO_AUTH_ALGO:
raise ValueError('Unknown Authentication Algorithm: %r' % val)
def uint(val):
@@ -162,7 +163,7 @@
INT: int,
STR: str,
UINT: uint,
- BOOL_STR: str2bool,
+ BOOL_STR: util.str2bool,
BAND: band,
IPV4: ipv4,
HWADDR: hwaddr,
@@ -182,6 +183,87 @@
DURATION: duration,
}
+def add(dest, src):
+ if util.is_dict(dest):
+ if not util.is_dict(src):
+ raise ValueError('cannot add to dict a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ log.ctx(key=key)
+ add(dest_val, val)
+ return
+ if util.is_list(dest):
+ if not util.is_list(src):
+ raise ValueError('cannot add to list a value of type: %r' % type(src))
+ dest.extend(src)
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
+ % (dest, src))
+
+def combine(dest, src):
+ if util.is_dict(dest):
+ if not util.is_dict(src):
+ raise ValueError('cannot combine dict with a value of type: %r' % type(src))
+
+ for key, val in src.items():
+ log.ctx(key=key)
+ dest_val = dest.get(key)
+ if dest_val is None:
+ dest[key] = val
+ else:
+ combine(dest_val, val)
+ return
+ if util.is_list(dest):
+ if not util.is_list(src):
+ raise ValueError('cannot combine list with a value of type: %r' % type(src))
+ # Validate that all elements in both lists are of the same type:
+ t = util.list_validate_same_elem_type(src + dest)
+ if t is None:
+ return # both lists are empty, return
+ # For lists of complex objects, we expect them to be sorted lists:
+ if t in (dict, list, tuple):
+ for i in range(len(dest)):
+ log.ctx(idx=i)
+ src_it = src[i] if i < len(src) else util.empty_instance_type(t)
+ combine(dest[i], src_it)
+ for i in range(len(dest), len(src)):
+ log.ctx(idx=i)
+ dest.append(src[i])
+ else: # for lists of basic elements, we handle them as unsorted sets:
+ for elem in src:
+ if elem not in dest:
+ dest.append(elem)
+ return
+ if dest == src:
+ return
+ raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
+ % (dest, src))
+
+def replicate_times(d):
+ '''
+ replicate items that have a "times" > 1
+
+ 'd' is a dict matching WANT_SCHEMA, which is the same as
+ the RESOURCES_SCHEMA, except each entity that can be reserved has a 'times'
+ field added, to indicate how many of those should be reserved.
+ '''
+ d = copy.deepcopy(d)
+ for key, item_list in d.items():
+ idx = 0
+ while idx < len(item_list):
+ item = item_list[idx]
+ times = int(item.pop('times', 1))
+ for j in range(1, times):
+ item_list.insert(idx + j, copy.deepcopy(item))
+ idx += times
+ return d
+
def validate(config, schema):
'''Make sure the given config dict adheres to the schema.
The schema is a dict of 'dict paths' in dot-notation with permitted
@@ -198,17 +280,17 @@
def validate_item(path, value, schema):
want_type = schema.get(path)
- if is_list(value):
+ if util.is_list(value):
if want_type:
raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
path = path + '[]'
want_type = schema.get(path)
if not want_type:
- if is_dict(value):
+ if util.is_dict(value):
nest(path, value, schema)
return
- if is_list(value) and value:
+ if util.is_list(value) and value:
for list_v in value:
validate_item(path, list_v, schema)
return
@@ -217,11 +299,11 @@
if want_type not in SCHEMA_TYPES:
raise ValueError('unknown type %r at %r' % (want_type, path))
- if is_dict(value):
+ if util.is_dict(value):
raise ValueError('config item is dict but should be a leaf node of type %r: %r'
% (want_type, path))
- if is_list(value):
+ if util.is_list(value):
for list_v in value:
validate_item(path, list_v, schema)
return
@@ -243,4 +325,73 @@
nest(None, config, schema)
+def generate_schemas():
+ "Generate supported schemas dynamically from objects"
+ obj_dir = '%s/../obj/' % os.path.dirname(os.path.abspath(__file__))
+ for filename in os.listdir(obj_dir):
+ if not filename.endswith(".py"):
+ continue
+ module_name = 'osmo_gsm_tester.obj.%s' % filename[:-3]
+ util.run_python_file_method(module_name, 'on_register_schemas', False)
+
+
+_RESOURCE_TYPES = ['ip_address', 'arfcn']
+
+_RESOURCES_SCHEMA = {
+ 'ip_address[].addr': IPV4,
+ 'arfcn[].arfcn': INT,
+ 'arfcn[].band': BAND,
+ }
+
+_CONFIG_SCHEMA = {}
+
+_WANT_SCHEMA = None
+_ALL_SCHEMA = None
+
+def register_resource_schema(obj_class_str, obj_attr_dict):
+ """Register schema attributes for a resource type.
+ For instance: register_resource_schema_attributes('modem', {'type': schema.STR, 'ki': schema.KI})
+ """
+ global _RESOURCES_SCHEMA
+ global _RESOURCE_TYPES
+ tmpdict = {}
+ for key, val in obj_attr_dict.items():
+ new_key = '%s[].%s' % (obj_class_str, key)
+ tmpdict[new_key] = val
+ combine(_RESOURCES_SCHEMA, tmpdict)
+ if obj_class_str not in _RESOURCE_TYPES:
+ _RESOURCE_TYPES.append(obj_class_str)
+
+def register_config_schema(obj_class_str, obj_attr_dict):
+ """Register schema attributes to configure all instances of an object class.
+ For instance: register_resource_schema_attributes('bsc', {'net.codec_list[]': schema.CODEC})
+ """
+ global _CONFIG_SCHEMA
+ tmpdict = {}
+ for key, val in obj_attr_dict.items():
+ new_key = '%s.%s' % (obj_class_str, key)
+ tmpdict[new_key] = val
+ combine(_CONFIG_SCHEMA, tmpdict)
+
+def get_resources_schema():
+ return _RESOURCES_SCHEMA;
+
+def get_want_schema():
+ global _WANT_SCHEMA
+ if _WANT_SCHEMA is None:
+ _WANT_SCHEMA = util.dict_add(
+ dict([('%s[].times' % r, TIMES) for r in _RESOURCE_TYPES]),
+ get_resources_schema())
+ return _WANT_SCHEMA
+
+def get_all_schema():
+ global _ALL_SCHEMA
+ if _ALL_SCHEMA is None:
+ want_schema = get_want_schema()
+ _ALL_SCHEMA = util.dict_add({ 'defaults.timeout': STR },
+ dict([('config.%s' % key, val) for key, val in _CONFIG_SCHEMA.items()]),
+ dict([('resources.%s' % key, val) for key, val in want_schema.items()]),
+ dict([('modifiers.%s' % key, val) for key, val in want_schema.items()]))
+ return _ALL_SCHEMA
+
# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/core/util.py b/src/osmo_gsm_tester/core/util.py
index a5b2bbf..4c7b1dd 100644
--- a/src/osmo_gsm_tester/core/util.py
+++ b/src/osmo_gsm_tester/core/util.py
@@ -370,6 +370,17 @@
def run_python_file(module_name, path):
SourceFileLoader(module_name, path).load_module()
+def run_python_file_method(module_name, func_name, fail_if_missing=True):
+ module_obj = __import__(module_name, globals(), locals(), [func_name])
+ try:
+ func = getattr(module_obj, func_name)
+ except AttributeError as e:
+ if fail_if_missing:
+ raise RuntimeError('function %s not found in %s (%s)' % (func_name, module_name))
+ else:
+ return None
+ return func()
+
def msisdn_inc(msisdn_str):
'add 1 and preserve leading zeros'
return ('%%0%dd' % len(msisdn_str)) % (int(msisdn_str) + 1)