schema: Allow objects registering their own schema types
Change-Id: I998c8674a55531909bfeac420064c3f238cea126
diff --git a/src/osmo_gsm_tester/core/schema.py b/src/osmo_gsm_tester/core/schema.py
index 9055c5b..70b4c8c 100644
--- a/src/osmo_gsm_tester/core/schema.py
+++ b/src/osmo_gsm_tester/core/schema.py
@@ -36,12 +36,12 @@
break;
if not regex.fullmatch(val):
break;
- return
+ return True
raise ValueError('Invalid %s: %r' % (name, val))
def band(val):
if val in ('GSM-900', 'GSM-1800', 'GSM-1900'):
- return
+ return True
raise ValueError('Unknown GSM band: %r' % val)
def ipv4(val):
@@ -49,27 +49,30 @@
els = [int(el) for el in val.split('.')]
if not all([el >= 0 and el <= 255 for el in els]):
raise ValueError('Invalid IPv4 address: %r' % val)
+ return True
def hwaddr(val):
- match_re('hardware address', HWADDR_RE, val)
+ return match_re('hardware address', HWADDR_RE, val)
def imsi(val):
- match_re('IMSI', IMSI_RE, val)
+ return match_re('IMSI', IMSI_RE, val)
def ki(val):
- match_re('KI', KI_RE, val)
+ return match_re('KI', KI_RE, val)
def msisdn(val):
- match_re('MSISDN', MSISDN_RE, val)
+ return match_re('MSISDN', MSISDN_RE, val)
def auth_algo(val):
if val not in util.ENUM_OSMO_AUTH_ALGO:
raise ValueError('Unknown Authentication Algorithm: %r' % val)
+ return True
def uint(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
+ return True
def uint8(val):
n = int(val)
@@ -77,6 +80,7 @@
raise ValueError('Positive value expected instead of %d' % n)
if n > 255: # 2^8 - 1
raise ValueError('Value %d too big, max value is 255' % n)
+ return True
def uint16(val):
n = int(val)
@@ -84,57 +88,64 @@
raise ValueError('Positive value expected instead of %d' % n)
if n > 65535: # 2^16 - 1
raise ValueError('Value %d too big, max value is 65535' % n)
+ return True
+
+def bool_str(val):
+ # str2bool will raise an exception if unable to parse it
+ util.str2bool(val)
+ return True
def times(val):
n = int(val)
if n < 1:
raise ValueError('Positive value >0 expected instead of %d' % n)
+ return True
def cipher(val):
if val in ('a5_0', 'a5_1', 'a5_2', 'a5_3', 'a5_4', 'a5_5', 'a5_6', 'a5_7'):
- return
+ return True
raise ValueError('Unknown Cipher value: %r' % val)
def modem_feature(val):
if val in ('sms', 'gprs', 'voice', 'ussd', 'sim', '2g', '3g', '4g'):
- return
+ return True
raise ValueError('Unknown Modem Feature: %r' % val)
def phy_channel_config(val):
if val in ('CCCH', 'CCCH+SDCCH4', 'TCH/F', 'TCH/H', 'SDCCH8', 'PDCH',
'TCH/F_PDCH', 'CCCH+SDCCH4+CBCH', 'SDCCH8+CBCH','TCH/F_TCH/H_PDCH'):
- return
+ return True
raise ValueError('Unknown Physical channel config: %r' % val)
def channel_allocator(val):
if val in ('ascending', 'descending'):
- return
+ return True
raise ValueError('Unknown Channel Allocator Policy %r' % val)
def gprs_mode(val):
if val in ('none', 'gprs', 'egprs'):
- return
+ return True
raise ValueError('Unknown GPRS mode %r' % val)
def codec(val):
if val in ('hr1', 'hr2', 'hr3', 'fr1', 'fr2', 'fr3'):
- return
+ return True
raise ValueError('Unknown Codec value: %r' % val)
def osmo_trx_clock_ref(val):
if val in ('internal', 'external', 'gspdo'):
- return
+ return True
raise ValueError('Unknown OsmoTRX clock reference value: %r' % val)
def lte_transmission_mode(val):
n = int(val)
if n <= 4:
- return
+ return True
raise ValueError('LTE Transmission Mode %d not in expected range' % n)
def duration(val):
if val.isdecimal() or val.endswith('m') or val.endswith('h'):
- return
+ return True
raise ValueError('Invalid duration value: %r' % val)
INT = 'int'
@@ -163,7 +174,7 @@
INT: int,
STR: str,
UINT: uint,
- BOOL_STR: util.str2bool,
+ BOOL_STR: bool_str,
BAND: band,
IPV4: ipv4,
HWADDR: hwaddr,
@@ -310,7 +321,9 @@
log.ctx(path)
type_validator = SCHEMA_TYPES.get(want_type)
- type_validator(value)
+ valid = type_validator(value)
+ if not valid:
+ raise ValueError('Invalid value %r for schema type \'%s\' (validator: %s)' % (value, want_type, type_validator.__name__))
def nest(parent_path, config, schema):
if parent_path:
@@ -369,6 +382,13 @@
_WANT_SCHEMA = None
_ALL_SCHEMA = None
+def register_schema_types(schema_type_attr):
+ """Register schema types to be used by schema attributes.
+ For instance: register_resource_schema_attributes({ 'fruit': lambda val: val in ('banana', 'apple') })
+ """
+ global SCHEMA_TYPES
+ combine(SCHEMA_TYPES, schema_type_attr)
+
def register_resource_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes for a resource type.
For instance: register_resource_schema_attributes('modem', {'type': schema.STR, 'ki': schema.KI})