blob: 20a6946b65b448360b69599ed63bf8ff2d5d7906 [file] [log] [blame]
Harald Weltec91085e2022-02-10 18:05:45 +01001from construct.lib.containers import Container, ListContainer
2from construct.core import EnumIntegerString
Harald Welte07c7b1f2021-05-28 22:01:29 +02003import typing
Harald Weltee0f9ef12021-04-10 17:22:35 +02004from construct import *
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +07005from construct.core import evaluate, BitwisableString
Harald Welted0519e02022-02-11 18:05:48 +01006from construct.lib import integertypes
Harald Welte2db5cfb2021-04-10 19:05:37 +02007from pySim.utils import b2h, h2b, swap_nibbles
Robert Falkenbergb07a3e92021-05-07 15:23:20 +02008import gsm0338
Harald Weltee0f9ef12021-04-10 17:22:35 +02009
10"""Utility code related to the integration of the 'construct' declarative parser."""
11
Harald Welted0519e02022-02-11 18:05:48 +010012# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
Harald Weltee0f9ef12021-04-10 17:22:35 +020013#
14# This program is free software: you can redistribute it and/or modify
15# it under the terms of the GNU General Public License as published by
16# the Free Software Foundation, either version 2 of the License, or
17# (at your option) any later version.
18#
19# This program is distributed in the hope that it will be useful,
20# but WITHOUT ANY WARRANTY; without even the implied warranty of
21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22# GNU General Public License for more details.
23#
24# You should have received a copy of the GNU General Public License
25# along with this program. If not, see <http://www.gnu.org/licenses/>.
26
27
28class HexAdapter(Adapter):
29 """convert a bytes() type to a string of hex nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010030
Harald Weltee0f9ef12021-04-10 17:22:35 +020031 def _decode(self, obj, context, path):
32 return b2h(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010033
Harald Weltee0f9ef12021-04-10 17:22:35 +020034 def _encode(self, obj, context, path):
35 return h2b(obj)
36
Harald Weltec91085e2022-02-10 18:05:45 +010037
Harald Welte2db5cfb2021-04-10 19:05:37 +020038class BcdAdapter(Adapter):
39 """convert a bytes() type to a string of BCD nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010040
Harald Welte2db5cfb2021-04-10 19:05:37 +020041 def _decode(self, obj, context, path):
42 return swap_nibbles(b2h(obj))
Harald Weltec91085e2022-02-10 18:05:45 +010043
Harald Welte2db5cfb2021-04-10 19:05:37 +020044 def _encode(self, obj, context, path):
45 return h2b(swap_nibbles(obj))
46
Harald Weltebc0e2092022-02-13 10:54:58 +010047class InvertAdapter(Adapter):
48 """inverse logic (false->true, true->false)."""
49 @staticmethod
50 def _invert_bool_in_obj(obj):
51 for k,v in obj.items():
52 # skip all private entries
53 if k.startswith('_'):
54 continue
55 if v == False:
56 obj[k] = True
57 elif v == True:
58 obj[k] = False
59 return obj
60
61 def _decode(self, obj, context, path):
62 return self._invert_bool_in_obj(obj)
63
64 def _encode(self, obj, context, path):
65 return self._invert_bool_in_obj(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010066
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020067class Rpad(Adapter):
68 """
69 Encoder appends padding bytes (b'\\xff') up to target size.
70 Decoder removes trailing padding bytes.
71
72 Parameters:
73 subcon: Subconstruct as defined by construct library
74 pattern: set padding pattern (default: b'\\xff')
75 """
76
77 def __init__(self, subcon, pattern=b'\xff'):
78 super().__init__(subcon)
79 self.pattern = pattern
80
81 def _decode(self, obj, context, path):
82 return obj.rstrip(self.pattern)
83
84 def _encode(self, obj, context, path):
85 if len(obj) > self.sizeof():
Harald Weltec91085e2022-02-10 18:05:45 +010086 raise SizeofError("Input ({}) exceeds target size ({})".format(
87 len(obj), self.sizeof()))
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020088 return obj + self.pattern * (self.sizeof() - len(obj))
89
Harald Welte954ce952023-05-27 20:08:09 +020090class MultiplyAdapter(Adapter):
91 """
92 Decoder multiplies by multiplicator
93 Encoder divides by multiplicator
94
95 Parameters:
96 subcon: Subconstruct as defined by construct library
97 multiplier: Multiplier to apply to raw encoded value
98 """
99
100 def __init__(self, subcon, multiplicator):
101 super().__init__(subcon)
102 self.multiplicator = multiplicator
103
104 def _decode(self, obj, context, path):
105 return obj * 8
106
107 def _encode(self, obj, context, path):
108 return obj // 8
109
Harald Weltec91085e2022-02-10 18:05:45 +0100110
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200111class GsmStringAdapter(Adapter):
112 """Convert GSM 03.38 encoded bytes to a string."""
113
114 def __init__(self, subcon, codec='gsm03.38', err='strict'):
115 super().__init__(subcon)
116 self.codec = codec
117 self.err = err
118
119 def _decode(self, obj, context, path):
120 return obj.decode(self.codec)
121
122 def _encode(self, obj, context, path):
123 return obj.encode(self.codec, self.err)
124
Harald Weltec91085e2022-02-10 18:05:45 +0100125
Harald Weltee0f9ef12021-04-10 17:22:35 +0200126def filter_dict(d, exclude_prefix='_'):
127 """filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
Harald Welte7fca85b2021-05-29 21:27:46 +0200128 if not isinstance(d, dict):
129 return d
Harald Weltee0f9ef12021-04-10 17:22:35 +0200130 res = {}
131 for (key, value) in d.items():
132 if key.startswith(exclude_prefix):
133 continue
134 if type(value) is dict:
135 res[key] = filter_dict(value)
136 else:
137 res[key] = value
138 return res
139
Harald Welte07c7b1f2021-05-28 22:01:29 +0200140
141def normalize_construct(c):
142 """Convert a construct specific type to a related base type, mostly useful
143 so we can serialize it."""
144 # we need to include the filter_dict as we otherwise get elements like this
145 # in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
146 c = filter_dict(c)
147 if isinstance(c, Container) or isinstance(c, dict):
Harald Weltec91085e2022-02-10 18:05:45 +0100148 r = {k: normalize_construct(v) for (k, v) in c.items()}
Harald Welte07c7b1f2021-05-28 22:01:29 +0200149 elif isinstance(c, ListContainer):
150 r = [normalize_construct(x) for x in c]
151 elif isinstance(c, list):
152 r = [normalize_construct(x) for x in c]
153 elif isinstance(c, EnumIntegerString):
154 r = str(c)
155 else:
156 r = c
157 return r
158
Harald Weltec91085e2022-02-10 18:05:45 +0100159
160def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None, exclude_prefix: str = '_'):
Harald Welte07c7b1f2021-05-28 22:01:29 +0200161 """Helper function to wrap around normalize_construct() and filter_dict()."""
162 if not length:
163 length = len(raw_bin_data)
164 parsed = c.parse(raw_bin_data, total_len=length)
165 return normalize_construct(parsed)
166
Harald Weltec91085e2022-02-10 18:05:45 +0100167
Harald Weltee0f9ef12021-04-10 17:22:35 +0200168# here we collect some shared / common definitions of data types
169LV = Prefixed(Int8ub, HexAdapter(GreedyBytes))
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200170
171# Default value for Reserved for Future Use (RFU) bits/bytes
172# See TS 31.101 Sec. "3.4 Coding Conventions"
173__RFU_VALUE = 0
174
175# Field that packs Reserved for Future Use (RFU) bit
176FlagRFU = Default(Flag, __RFU_VALUE)
177
178# Field that packs Reserved for Future Use (RFU) byte
179ByteRFU = Default(Byte, __RFU_VALUE)
180
181# Field that packs all remaining Reserved for Future Use (RFU) bytes
182GreedyBytesRFU = Default(GreedyBytes, b'')
183
Harald Weltec91085e2022-02-10 18:05:45 +0100184
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200185def BitsRFU(n=1):
186 '''
187 Field that packs Reserved for Future Use (RFU) bit(s)
188 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
189
190 Use this for (currently) unused/reserved bits whose contents
191 should be initialized automatically but should not be cleared
192 in the future or when restoring read data (unlike padding).
193
194 Parameters:
195 n (Integer): Number of bits (default: 1)
196 '''
197 return Default(BitsInteger(n), __RFU_VALUE)
198
Harald Weltec91085e2022-02-10 18:05:45 +0100199
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200200def BytesRFU(n=1):
201 '''
202 Field that packs Reserved for Future Use (RFU) byte(s)
203 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
204
205 Use this for (currently) unused/reserved bytes whose contents
206 should be initialized automatically but should not be cleared
207 in the future or when restoring read data (unlike padding).
208
209 Parameters:
210 n (Integer): Number of bytes (default: 1)
211 '''
212 return Default(Bytes(n), __RFU_VALUE)
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200213
Harald Weltec91085e2022-02-10 18:05:45 +0100214
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200215def GsmString(n):
216 '''
217 GSM 03.38 encoded byte string of fixed length n.
218 Encoder appends padding bytes (b'\\xff') to maintain
219 length. Decoder removes those trailing bytes.
220
221 Exceptions are raised for invalid characters
222 and length excess.
223
224 Parameters:
225 n (Integer): Fixed length of the encoded byte string
226 '''
227 return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
Harald Welted0519e02022-02-11 18:05:48 +0100228
229class GreedyInteger(Construct):
230 """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
Philipp Maier541a9152022-06-01 18:21:17 +0200231 def __init__(self, signed=False, swapped=False, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100232 super().__init__()
233 self.signed = signed
234 self.swapped = swapped
Philipp Maier541a9152022-06-01 18:21:17 +0200235 self.minlen = minlen
Harald Welted0519e02022-02-11 18:05:48 +0100236
237 def _parse(self, stream, context, path):
238 data = stream_read_entire(stream, path)
239 if evaluate(self.swapped, context):
240 data = swapbytes(data)
241 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700242 return int.from_bytes(data, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100243 except ValueError as e:
244 raise IntegerError(str(e), path=path)
245
Philipp Maier541a9152022-06-01 18:21:17 +0200246 def __bytes_required(self, i, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100247 if self.signed:
248 raise NotImplementedError("FIXME: Implement support for encoding signed integer")
Philipp Maier541a9152022-06-01 18:21:17 +0200249
250 # compute how many bytes we need
Harald Welted0519e02022-02-11 18:05:48 +0100251 nbytes = 1
252 while True:
253 i = i >> 8
254 if i == 0:
Philipp Maier541a9152022-06-01 18:21:17 +0200255 break
Harald Welted0519e02022-02-11 18:05:48 +0100256 else:
257 nbytes = nbytes + 1
Philipp Maier541a9152022-06-01 18:21:17 +0200258
259 # round up to the minimum number
260 # of bytes we anticipate
261 if nbytes < minlen:
262 nbytes = minlen
263
264 return nbytes
Harald Welted0519e02022-02-11 18:05:48 +0100265
266 def _build(self, obj, stream, context, path):
267 if not isinstance(obj, integertypes):
268 raise IntegerError(f"value {obj} is not an integer", path=path)
Philipp Maier541a9152022-06-01 18:21:17 +0200269 length = self.__bytes_required(obj, self.minlen)
Harald Welted0519e02022-02-11 18:05:48 +0100270 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700271 data = obj.to_bytes(length, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100272 except ValueError as e:
273 raise IntegerError(str(e), path=path)
274 if evaluate(self.swapped, context):
275 data = swapbytes(data)
276 stream_write(stream, data, length, path)
277 return obj
Harald Weltebc0e2092022-02-13 10:54:58 +0100278
279# merged definitions of 24.008 + 23.040
280TypeOfNumber = Enum(BitsInteger(3), unknown=0, international=1, national=2, network_specific=3,
281 short_code=4, alphanumeric=5, abbreviated=6, reserved_for_extension=7)
282NumberingPlan = Enum(BitsInteger(4), unknown=0, isdn_e164=1, data_x121=3, telex_f69=4,
283 sc_specific_5=5, sc_specific_6=6, national=8, private=9,
284 ermes=10, reserved_cts=11, reserved_for_extension=15)
285TonNpi = BitStruct('ext'/Flag, 'type_of_number'/TypeOfNumber, 'numbering_plan_id'/NumberingPlan)