construct: Add Construct for variable-length int 'GreedyInteger'
We have a number of integers with variable-length encoding, so
add a Construct for this. Naming inspired by GreedyBytes.
Related to https://github.com/construct/construct/issues/962
Change-Id: Ic6049b74ea3705fda24855f34b4a1d5f2c9327f7
diff --git a/pySim/construct.py b/pySim/construct.py
index e3f6a88..6daa66a 100644
--- a/pySim/construct.py
+++ b/pySim/construct.py
@@ -2,12 +2,14 @@
from construct.core import EnumIntegerString
import typing
from construct import *
+from construct.core import evaluate, bytes2integer, integer2bytes
+from construct.lib import integertypes
from pySim.utils import b2h, h2b, swap_nibbles
import gsm0338
"""Utility code related to the integration of the 'construct' declarative parser."""
-# (C) 2021 by Harald Welte <laforge@osmocom.org>
+# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -184,3 +186,45 @@
n (Integer): Fixed length of the encoded byte string
'''
return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
+
+class GreedyInteger(Construct):
+ """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
+ def __init__(self, signed=False, swapped=False):
+ super().__init__()
+ self.signed = signed
+ self.swapped = swapped
+
+ def _parse(self, stream, context, path):
+ data = stream_read_entire(stream, path)
+ if evaluate(self.swapped, context):
+ data = swapbytes(data)
+ try:
+ return bytes2integer(data, self.signed)
+ except ValueError as e:
+ raise IntegerError(str(e), path=path)
+
+ def __bytes_required(self, i):
+ if self.signed:
+ raise NotImplementedError("FIXME: Implement support for encoding signed integer")
+ nbytes = 1
+ while True:
+ i = i >> 8
+ if i == 0:
+ return nbytes
+ else:
+ nbytes = nbytes + 1
+ # this should never happen, above loop must return eventually...
+ raise IntegerError(f"value {i} is out of range")
+
+ def _build(self, obj, stream, context, path):
+ if not isinstance(obj, integertypes):
+ raise IntegerError(f"value {obj} is not an integer", path=path)
+ length = self.__bytes_required(obj)
+ try:
+ data = integer2bytes(obj, length, self.signed)
+ except ValueError as e:
+ raise IntegerError(str(e), path=path)
+ if evaluate(self.swapped, context):
+ data = swapbytes(data)
+ stream_write(stream, data, length, path)
+ return obj