blob: cef9557ee19743759f9a17851a4d32e82b69d3d3 [file] [log] [blame]
Harald Weltec91085e2022-02-10 18:05:45 +01001from construct.lib.containers import Container, ListContainer
2from construct.core import EnumIntegerString
Harald Welte07c7b1f2021-05-28 22:01:29 +02003import typing
Harald Weltee0f9ef12021-04-10 17:22:35 +02004from construct import *
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +07005from construct.core import evaluate, BitwisableString
Harald Welted0519e02022-02-11 18:05:48 +01006from construct.lib import integertypes
Harald Welte2db5cfb2021-04-10 19:05:37 +02007from pySim.utils import b2h, h2b, swap_nibbles
Robert Falkenbergb07a3e92021-05-07 15:23:20 +02008import gsm0338
Philipp Maier791f80a2023-07-26 17:01:37 +02009import codecs
Harald Welte6e9ae8a2023-12-08 14:57:19 +010010import ipaddress
Harald Weltee0f9ef12021-04-10 17:22:35 +020011
12"""Utility code related to the integration of the 'construct' declarative parser."""
13
Harald Welted0519e02022-02-11 18:05:48 +010014# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
Harald Weltee0f9ef12021-04-10 17:22:35 +020015#
16# This program is free software: you can redistribute it and/or modify
17# it under the terms of the GNU General Public License as published by
18# the Free Software Foundation, either version 2 of the License, or
19# (at your option) any later version.
20#
21# This program is distributed in the hope that it will be useful,
22# but WITHOUT ANY WARRANTY; without even the implied warranty of
23# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24# GNU General Public License for more details.
25#
26# You should have received a copy of the GNU General Public License
27# along with this program. If not, see <http://www.gnu.org/licenses/>.
28
29
30class HexAdapter(Adapter):
31 """convert a bytes() type to a string of hex nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010032
Harald Weltee0f9ef12021-04-10 17:22:35 +020033 def _decode(self, obj, context, path):
34 return b2h(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010035
Harald Weltee0f9ef12021-04-10 17:22:35 +020036 def _encode(self, obj, context, path):
37 return h2b(obj)
38
Philipp Maier791f80a2023-07-26 17:01:37 +020039class Utf8Adapter(Adapter):
40 """convert a bytes() type that contains utf8 encoded text to human readable text."""
41
42 def _decode(self, obj, context, path):
43 # In case the string contains only 0xff bytes we interpret it as an empty string
44 if obj == b'\xff' * len(obj):
45 return ""
46 return codecs.decode(obj, "utf-8")
47
48 def _encode(self, obj, context, path):
49 return codecs.encode(obj, "utf-8")
50
Harald Weltec91085e2022-02-10 18:05:45 +010051
Harald Welte2db5cfb2021-04-10 19:05:37 +020052class BcdAdapter(Adapter):
53 """convert a bytes() type to a string of BCD nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010054
Harald Welte2db5cfb2021-04-10 19:05:37 +020055 def _decode(self, obj, context, path):
56 return swap_nibbles(b2h(obj))
Harald Weltec91085e2022-02-10 18:05:45 +010057
Harald Welte2db5cfb2021-04-10 19:05:37 +020058 def _encode(self, obj, context, path):
59 return h2b(swap_nibbles(obj))
60
Harald Weltebc0e2092022-02-13 10:54:58 +010061class InvertAdapter(Adapter):
62 """inverse logic (false->true, true->false)."""
63 @staticmethod
64 def _invert_bool_in_obj(obj):
65 for k,v in obj.items():
66 # skip all private entries
67 if k.startswith('_'):
68 continue
69 if v == False:
70 obj[k] = True
71 elif v == True:
72 obj[k] = False
73 return obj
74
75 def _decode(self, obj, context, path):
76 return self._invert_bool_in_obj(obj)
77
78 def _encode(self, obj, context, path):
79 return self._invert_bool_in_obj(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010080
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020081class Rpad(Adapter):
82 """
Harald Weltef9a5ba52023-06-09 09:17:05 +020083 Encoder appends padding bytes (b'\\xff') or characters up to target size.
84 Decoder removes trailing padding bytes/characters.
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020085
86 Parameters:
87 subcon: Subconstruct as defined by construct library
88 pattern: set padding pattern (default: b'\\xff')
Harald Weltef9a5ba52023-06-09 09:17:05 +020089 num_per_byte: number of 'elements' per byte. E.g. for hex nibbles: 2
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020090 """
91
Harald Weltef9a5ba52023-06-09 09:17:05 +020092 def __init__(self, subcon, pattern=b'\xff', num_per_byte=1):
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020093 super().__init__(subcon)
94 self.pattern = pattern
Harald Weltef9a5ba52023-06-09 09:17:05 +020095 self.num_per_byte = num_per_byte
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020096
97 def _decode(self, obj, context, path):
98 return obj.rstrip(self.pattern)
99
100 def _encode(self, obj, context, path):
Harald Weltef9a5ba52023-06-09 09:17:05 +0200101 target_size = self.sizeof() * self.num_per_byte
102 if len(obj) > target_size:
Harald Weltec91085e2022-02-10 18:05:45 +0100103 raise SizeofError("Input ({}) exceeds target size ({})".format(
Harald Weltef9a5ba52023-06-09 09:17:05 +0200104 len(obj), target_size))
105 return obj + self.pattern * (target_size - len(obj))
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200106
Harald Welte954ce952023-05-27 20:08:09 +0200107class MultiplyAdapter(Adapter):
108 """
109 Decoder multiplies by multiplicator
110 Encoder divides by multiplicator
111
112 Parameters:
113 subcon: Subconstruct as defined by construct library
114 multiplier: Multiplier to apply to raw encoded value
115 """
116
117 def __init__(self, subcon, multiplicator):
118 super().__init__(subcon)
119 self.multiplicator = multiplicator
120
121 def _decode(self, obj, context, path):
122 return obj * 8
123
124 def _encode(self, obj, context, path):
125 return obj // 8
126
Harald Weltec91085e2022-02-10 18:05:45 +0100127
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200128class GsmStringAdapter(Adapter):
129 """Convert GSM 03.38 encoded bytes to a string."""
130
131 def __init__(self, subcon, codec='gsm03.38', err='strict'):
132 super().__init__(subcon)
133 self.codec = codec
134 self.err = err
135
136 def _decode(self, obj, context, path):
137 return obj.decode(self.codec)
138
139 def _encode(self, obj, context, path):
140 return obj.encode(self.codec, self.err)
141
Harald Welte6e9ae8a2023-12-08 14:57:19 +0100142class Ipv4Adapter(Adapter):
143 """
144 Encoder converts from 4 bytes to string representation (A.B.C.D).
145 Decoder converts from string representation (A.B.C.D) to four bytes.
146 """
147 def _decode(self, obj, context, path):
148 ia = ipaddress.IPv4Address(obj)
149 return ia.compressed
150
151 def _encode(self, obj, context, path):
152 ia = ipaddress.IPv4Address(obj)
153 return ia.packed
154
155class Ipv6Adapter(Adapter):
156 """
157 Encoder converts from 16 bytes to string representation.
158 Decoder converts from string representation to 16 bytes.
159 """
160 def _decode(self, obj, context, path):
161 ia = ipaddress.IPv6Address(obj)
162 return ia.compressed
163
164 def _encode(self, obj, context, path):
165 ia = ipaddress.IPv6Address(obj)
166 return ia.packed
167
Harald Weltec91085e2022-02-10 18:05:45 +0100168
Harald Weltee0f9ef12021-04-10 17:22:35 +0200169def filter_dict(d, exclude_prefix='_'):
170 """filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
Harald Welte7fca85b2021-05-29 21:27:46 +0200171 if not isinstance(d, dict):
172 return d
Harald Weltee0f9ef12021-04-10 17:22:35 +0200173 res = {}
174 for (key, value) in d.items():
175 if key.startswith(exclude_prefix):
176 continue
177 if type(value) is dict:
178 res[key] = filter_dict(value)
179 else:
180 res[key] = value
181 return res
182
Harald Welte07c7b1f2021-05-28 22:01:29 +0200183
184def normalize_construct(c):
185 """Convert a construct specific type to a related base type, mostly useful
186 so we can serialize it."""
187 # we need to include the filter_dict as we otherwise get elements like this
188 # in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
189 c = filter_dict(c)
190 if isinstance(c, Container) or isinstance(c, dict):
Harald Weltec91085e2022-02-10 18:05:45 +0100191 r = {k: normalize_construct(v) for (k, v) in c.items()}
Harald Welte07c7b1f2021-05-28 22:01:29 +0200192 elif isinstance(c, ListContainer):
193 r = [normalize_construct(x) for x in c]
194 elif isinstance(c, list):
195 r = [normalize_construct(x) for x in c]
196 elif isinstance(c, EnumIntegerString):
197 r = str(c)
198 else:
199 r = c
200 return r
201
Harald Weltec91085e2022-02-10 18:05:45 +0100202
Harald Weltecaef0df2023-12-17 10:07:01 +0100203def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None, exclude_prefix: str = '_', context: dict = {}):
Harald Welte07c7b1f2021-05-28 22:01:29 +0200204 """Helper function to wrap around normalize_construct() and filter_dict()."""
205 if not length:
206 length = len(raw_bin_data)
Harald Weltecaef0df2023-12-17 10:07:01 +0100207 parsed = c.parse(raw_bin_data, total_len=length, **context)
Harald Welte07c7b1f2021-05-28 22:01:29 +0200208 return normalize_construct(parsed)
209
Harald Weltecaef0df2023-12-17 10:07:01 +0100210def build_construct(c, decoded_data, context: dict = {}):
211 """Helper function to handle total_len."""
212 return c.build(decoded_data, total_len=None, **context)
Harald Weltec91085e2022-02-10 18:05:45 +0100213
Harald Weltee0f9ef12021-04-10 17:22:35 +0200214# here we collect some shared / common definitions of data types
215LV = Prefixed(Int8ub, HexAdapter(GreedyBytes))
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200216
217# Default value for Reserved for Future Use (RFU) bits/bytes
218# See TS 31.101 Sec. "3.4 Coding Conventions"
219__RFU_VALUE = 0
220
221# Field that packs Reserved for Future Use (RFU) bit
222FlagRFU = Default(Flag, __RFU_VALUE)
223
224# Field that packs Reserved for Future Use (RFU) byte
225ByteRFU = Default(Byte, __RFU_VALUE)
226
227# Field that packs all remaining Reserved for Future Use (RFU) bytes
228GreedyBytesRFU = Default(GreedyBytes, b'')
229
Harald Weltec91085e2022-02-10 18:05:45 +0100230
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200231def BitsRFU(n=1):
232 '''
233 Field that packs Reserved for Future Use (RFU) bit(s)
234 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
235
236 Use this for (currently) unused/reserved bits whose contents
237 should be initialized automatically but should not be cleared
238 in the future or when restoring read data (unlike padding).
239
240 Parameters:
241 n (Integer): Number of bits (default: 1)
242 '''
243 return Default(BitsInteger(n), __RFU_VALUE)
244
Harald Weltec91085e2022-02-10 18:05:45 +0100245
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200246def BytesRFU(n=1):
247 '''
248 Field that packs Reserved for Future Use (RFU) byte(s)
249 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
250
251 Use this for (currently) unused/reserved bytes whose contents
252 should be initialized automatically but should not be cleared
253 in the future or when restoring read data (unlike padding).
254
255 Parameters:
256 n (Integer): Number of bytes (default: 1)
257 '''
258 return Default(Bytes(n), __RFU_VALUE)
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200259
Harald Weltec91085e2022-02-10 18:05:45 +0100260
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200261def GsmString(n):
262 '''
263 GSM 03.38 encoded byte string of fixed length n.
264 Encoder appends padding bytes (b'\\xff') to maintain
265 length. Decoder removes those trailing bytes.
266
267 Exceptions are raised for invalid characters
268 and length excess.
269
270 Parameters:
271 n (Integer): Fixed length of the encoded byte string
272 '''
273 return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
Harald Welted0519e02022-02-11 18:05:48 +0100274
275class GreedyInteger(Construct):
276 """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
Philipp Maier541a9152022-06-01 18:21:17 +0200277 def __init__(self, signed=False, swapped=False, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100278 super().__init__()
279 self.signed = signed
280 self.swapped = swapped
Philipp Maier541a9152022-06-01 18:21:17 +0200281 self.minlen = minlen
Harald Welted0519e02022-02-11 18:05:48 +0100282
283 def _parse(self, stream, context, path):
284 data = stream_read_entire(stream, path)
285 if evaluate(self.swapped, context):
286 data = swapbytes(data)
287 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700288 return int.from_bytes(data, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100289 except ValueError as e:
290 raise IntegerError(str(e), path=path)
291
Philipp Maier541a9152022-06-01 18:21:17 +0200292 def __bytes_required(self, i, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100293 if self.signed:
294 raise NotImplementedError("FIXME: Implement support for encoding signed integer")
Philipp Maier541a9152022-06-01 18:21:17 +0200295
296 # compute how many bytes we need
Harald Welted0519e02022-02-11 18:05:48 +0100297 nbytes = 1
298 while True:
299 i = i >> 8
300 if i == 0:
Philipp Maier541a9152022-06-01 18:21:17 +0200301 break
Harald Welted0519e02022-02-11 18:05:48 +0100302 else:
303 nbytes = nbytes + 1
Philipp Maier541a9152022-06-01 18:21:17 +0200304
305 # round up to the minimum number
306 # of bytes we anticipate
307 if nbytes < minlen:
308 nbytes = minlen
309
310 return nbytes
Harald Welted0519e02022-02-11 18:05:48 +0100311
312 def _build(self, obj, stream, context, path):
313 if not isinstance(obj, integertypes):
314 raise IntegerError(f"value {obj} is not an integer", path=path)
Philipp Maier541a9152022-06-01 18:21:17 +0200315 length = self.__bytes_required(obj, self.minlen)
Harald Welted0519e02022-02-11 18:05:48 +0100316 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700317 data = obj.to_bytes(length, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100318 except ValueError as e:
319 raise IntegerError(str(e), path=path)
320 if evaluate(self.swapped, context):
321 data = swapbytes(data)
322 stream_write(stream, data, length, path)
323 return obj
Harald Weltebc0e2092022-02-13 10:54:58 +0100324
325# merged definitions of 24.008 + 23.040
326TypeOfNumber = Enum(BitsInteger(3), unknown=0, international=1, national=2, network_specific=3,
327 short_code=4, alphanumeric=5, abbreviated=6, reserved_for_extension=7)
328NumberingPlan = Enum(BitsInteger(4), unknown=0, isdn_e164=1, data_x121=3, telex_f69=4,
329 sc_specific_5=5, sc_specific_6=6, national=8, private=9,
330 ermes=10, reserved_cts=11, reserved_for_extension=15)
331TonNpi = BitStruct('ext'/Flag, 'type_of_number'/TypeOfNumber, 'numbering_plan_id'/NumberingPlan)