blob: f78adfe8b731b7fa2fa705d98c21ab7499eb4fca [file] [log] [blame]
Harald Weltec91085e2022-02-10 18:05:45 +01001from construct.lib.containers import Container, ListContainer
2from construct.core import EnumIntegerString
Harald Welte07c7b1f2021-05-28 22:01:29 +02003import typing
Harald Weltee0f9ef12021-04-10 17:22:35 +02004from construct import *
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +07005from construct.core import evaluate, BitwisableString
Harald Welted0519e02022-02-11 18:05:48 +01006from construct.lib import integertypes
Harald Welte2db5cfb2021-04-10 19:05:37 +02007from pySim.utils import b2h, h2b, swap_nibbles
Robert Falkenbergb07a3e92021-05-07 15:23:20 +02008import gsm0338
Philipp Maier791f80a2023-07-26 17:01:37 +02009import codecs
Harald Welte6e9ae8a2023-12-08 14:57:19 +010010import ipaddress
Harald Weltee0f9ef12021-04-10 17:22:35 +020011
12"""Utility code related to the integration of the 'construct' declarative parser."""
13
Harald Welted0519e02022-02-11 18:05:48 +010014# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
Harald Weltee0f9ef12021-04-10 17:22:35 +020015#
16# This program is free software: you can redistribute it and/or modify
17# it under the terms of the GNU General Public License as published by
18# the Free Software Foundation, either version 2 of the License, or
19# (at your option) any later version.
20#
21# This program is distributed in the hope that it will be useful,
22# but WITHOUT ANY WARRANTY; without even the implied warranty of
23# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24# GNU General Public License for more details.
25#
26# You should have received a copy of the GNU General Public License
27# along with this program. If not, see <http://www.gnu.org/licenses/>.
28
29
30class HexAdapter(Adapter):
31 """convert a bytes() type to a string of hex nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010032
Harald Weltee0f9ef12021-04-10 17:22:35 +020033 def _decode(self, obj, context, path):
34 return b2h(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010035
Harald Weltee0f9ef12021-04-10 17:22:35 +020036 def _encode(self, obj, context, path):
37 return h2b(obj)
38
Philipp Maier791f80a2023-07-26 17:01:37 +020039class Utf8Adapter(Adapter):
40 """convert a bytes() type that contains utf8 encoded text to human readable text."""
41
42 def _decode(self, obj, context, path):
43 # In case the string contains only 0xff bytes we interpret it as an empty string
44 if obj == b'\xff' * len(obj):
45 return ""
46 return codecs.decode(obj, "utf-8")
47
48 def _encode(self, obj, context, path):
49 return codecs.encode(obj, "utf-8")
50
Harald Weltec91085e2022-02-10 18:05:45 +010051
Harald Welte2db5cfb2021-04-10 19:05:37 +020052class BcdAdapter(Adapter):
53 """convert a bytes() type to a string of BCD nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010054
Harald Welte2db5cfb2021-04-10 19:05:37 +020055 def _decode(self, obj, context, path):
56 return swap_nibbles(b2h(obj))
Harald Weltec91085e2022-02-10 18:05:45 +010057
Harald Welte2db5cfb2021-04-10 19:05:37 +020058 def _encode(self, obj, context, path):
59 return h2b(swap_nibbles(obj))
60
Harald Welte842fbdb2023-12-27 17:06:58 +010061class PlmnAdapter(BcdAdapter):
62 """convert a bytes(3) type to BCD string like 262-02 or 262-002."""
63 def _decode(self, obj, context, path):
64 bcd = super()._decode(obj, context, path)
65 if bcd[3] == 'f':
66 return '-'.join([bcd[:3], bcd[4:]])
67 else:
68 return '-'.join([bcd[:3], bcd[3:]])
69
70 def _encode(self, obj, context, path):
71 l = obj.split('-')
72 if len(l[1]) == 2:
73 bcd = l[0] + 'f' + l[1]
74 else:
75 bcd = l[0] + l[1]
76 return super()._encode(bcd, context, path)
77
Harald Weltebc0e2092022-02-13 10:54:58 +010078class InvertAdapter(Adapter):
79 """inverse logic (false->true, true->false)."""
80 @staticmethod
81 def _invert_bool_in_obj(obj):
82 for k,v in obj.items():
83 # skip all private entries
84 if k.startswith('_'):
85 continue
86 if v == False:
87 obj[k] = True
88 elif v == True:
89 obj[k] = False
90 return obj
91
92 def _decode(self, obj, context, path):
93 return self._invert_bool_in_obj(obj)
94
95 def _encode(self, obj, context, path):
96 return self._invert_bool_in_obj(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010097
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020098class Rpad(Adapter):
99 """
Harald Weltef9a5ba52023-06-09 09:17:05 +0200100 Encoder appends padding bytes (b'\\xff') or characters up to target size.
101 Decoder removes trailing padding bytes/characters.
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200102
103 Parameters:
104 subcon: Subconstruct as defined by construct library
105 pattern: set padding pattern (default: b'\\xff')
Harald Weltef9a5ba52023-06-09 09:17:05 +0200106 num_per_byte: number of 'elements' per byte. E.g. for hex nibbles: 2
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200107 """
108
Harald Weltef9a5ba52023-06-09 09:17:05 +0200109 def __init__(self, subcon, pattern=b'\xff', num_per_byte=1):
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200110 super().__init__(subcon)
111 self.pattern = pattern
Harald Weltef9a5ba52023-06-09 09:17:05 +0200112 self.num_per_byte = num_per_byte
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200113
114 def _decode(self, obj, context, path):
115 return obj.rstrip(self.pattern)
116
117 def _encode(self, obj, context, path):
Harald Weltef9a5ba52023-06-09 09:17:05 +0200118 target_size = self.sizeof() * self.num_per_byte
119 if len(obj) > target_size:
Harald Weltec91085e2022-02-10 18:05:45 +0100120 raise SizeofError("Input ({}) exceeds target size ({})".format(
Harald Weltef9a5ba52023-06-09 09:17:05 +0200121 len(obj), target_size))
122 return obj + self.pattern * (target_size - len(obj))
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200123
Harald Welte954ce952023-05-27 20:08:09 +0200124class MultiplyAdapter(Adapter):
125 """
126 Decoder multiplies by multiplicator
127 Encoder divides by multiplicator
128
129 Parameters:
130 subcon: Subconstruct as defined by construct library
131 multiplier: Multiplier to apply to raw encoded value
132 """
133
134 def __init__(self, subcon, multiplicator):
135 super().__init__(subcon)
136 self.multiplicator = multiplicator
137
138 def _decode(self, obj, context, path):
139 return obj * 8
140
141 def _encode(self, obj, context, path):
142 return obj // 8
143
Harald Weltec91085e2022-02-10 18:05:45 +0100144
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200145class GsmStringAdapter(Adapter):
146 """Convert GSM 03.38 encoded bytes to a string."""
147
148 def __init__(self, subcon, codec='gsm03.38', err='strict'):
149 super().__init__(subcon)
150 self.codec = codec
151 self.err = err
152
153 def _decode(self, obj, context, path):
154 return obj.decode(self.codec)
155
156 def _encode(self, obj, context, path):
157 return obj.encode(self.codec, self.err)
158
Harald Welte6e9ae8a2023-12-08 14:57:19 +0100159class Ipv4Adapter(Adapter):
160 """
161 Encoder converts from 4 bytes to string representation (A.B.C.D).
162 Decoder converts from string representation (A.B.C.D) to four bytes.
163 """
164 def _decode(self, obj, context, path):
165 ia = ipaddress.IPv4Address(obj)
166 return ia.compressed
167
168 def _encode(self, obj, context, path):
169 ia = ipaddress.IPv4Address(obj)
170 return ia.packed
171
172class Ipv6Adapter(Adapter):
173 """
174 Encoder converts from 16 bytes to string representation.
175 Decoder converts from string representation to 16 bytes.
176 """
177 def _decode(self, obj, context, path):
178 ia = ipaddress.IPv6Address(obj)
179 return ia.compressed
180
181 def _encode(self, obj, context, path):
182 ia = ipaddress.IPv6Address(obj)
183 return ia.packed
184
Harald Weltec91085e2022-02-10 18:05:45 +0100185
Harald Weltee0f9ef12021-04-10 17:22:35 +0200186def filter_dict(d, exclude_prefix='_'):
187 """filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
Harald Welte7fca85b2021-05-29 21:27:46 +0200188 if not isinstance(d, dict):
189 return d
Harald Weltee0f9ef12021-04-10 17:22:35 +0200190 res = {}
191 for (key, value) in d.items():
192 if key.startswith(exclude_prefix):
193 continue
194 if type(value) is dict:
195 res[key] = filter_dict(value)
196 else:
197 res[key] = value
198 return res
199
Harald Welte07c7b1f2021-05-28 22:01:29 +0200200
201def normalize_construct(c):
202 """Convert a construct specific type to a related base type, mostly useful
203 so we can serialize it."""
204 # we need to include the filter_dict as we otherwise get elements like this
205 # in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
206 c = filter_dict(c)
207 if isinstance(c, Container) or isinstance(c, dict):
Harald Weltec91085e2022-02-10 18:05:45 +0100208 r = {k: normalize_construct(v) for (k, v) in c.items()}
Harald Welte07c7b1f2021-05-28 22:01:29 +0200209 elif isinstance(c, ListContainer):
210 r = [normalize_construct(x) for x in c]
211 elif isinstance(c, list):
212 r = [normalize_construct(x) for x in c]
213 elif isinstance(c, EnumIntegerString):
214 r = str(c)
215 else:
216 r = c
217 return r
218
Harald Weltec91085e2022-02-10 18:05:45 +0100219
Harald Weltecaef0df2023-12-17 10:07:01 +0100220def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None, exclude_prefix: str = '_', context: dict = {}):
Harald Welte07c7b1f2021-05-28 22:01:29 +0200221 """Helper function to wrap around normalize_construct() and filter_dict()."""
222 if not length:
223 length = len(raw_bin_data)
Harald Weltecaef0df2023-12-17 10:07:01 +0100224 parsed = c.parse(raw_bin_data, total_len=length, **context)
Harald Welte07c7b1f2021-05-28 22:01:29 +0200225 return normalize_construct(parsed)
226
Harald Weltecaef0df2023-12-17 10:07:01 +0100227def build_construct(c, decoded_data, context: dict = {}):
228 """Helper function to handle total_len."""
229 return c.build(decoded_data, total_len=None, **context)
Harald Weltec91085e2022-02-10 18:05:45 +0100230
Harald Weltee0f9ef12021-04-10 17:22:35 +0200231# here we collect some shared / common definitions of data types
232LV = Prefixed(Int8ub, HexAdapter(GreedyBytes))
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200233
234# Default value for Reserved for Future Use (RFU) bits/bytes
235# See TS 31.101 Sec. "3.4 Coding Conventions"
236__RFU_VALUE = 0
237
238# Field that packs Reserved for Future Use (RFU) bit
239FlagRFU = Default(Flag, __RFU_VALUE)
240
241# Field that packs Reserved for Future Use (RFU) byte
242ByteRFU = Default(Byte, __RFU_VALUE)
243
244# Field that packs all remaining Reserved for Future Use (RFU) bytes
245GreedyBytesRFU = Default(GreedyBytes, b'')
246
Harald Weltec91085e2022-02-10 18:05:45 +0100247
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200248def BitsRFU(n=1):
249 '''
250 Field that packs Reserved for Future Use (RFU) bit(s)
251 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
252
253 Use this for (currently) unused/reserved bits whose contents
254 should be initialized automatically but should not be cleared
255 in the future or when restoring read data (unlike padding).
256
257 Parameters:
258 n (Integer): Number of bits (default: 1)
259 '''
260 return Default(BitsInteger(n), __RFU_VALUE)
261
Harald Weltec91085e2022-02-10 18:05:45 +0100262
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200263def BytesRFU(n=1):
264 '''
265 Field that packs Reserved for Future Use (RFU) byte(s)
266 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
267
268 Use this for (currently) unused/reserved bytes whose contents
269 should be initialized automatically but should not be cleared
270 in the future or when restoring read data (unlike padding).
271
272 Parameters:
273 n (Integer): Number of bytes (default: 1)
274 '''
275 return Default(Bytes(n), __RFU_VALUE)
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200276
Harald Weltec91085e2022-02-10 18:05:45 +0100277
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200278def GsmString(n):
279 '''
280 GSM 03.38 encoded byte string of fixed length n.
281 Encoder appends padding bytes (b'\\xff') to maintain
282 length. Decoder removes those trailing bytes.
283
284 Exceptions are raised for invalid characters
285 and length excess.
286
287 Parameters:
288 n (Integer): Fixed length of the encoded byte string
289 '''
290 return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
Harald Welted0519e02022-02-11 18:05:48 +0100291
292class GreedyInteger(Construct):
293 """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
Philipp Maier541a9152022-06-01 18:21:17 +0200294 def __init__(self, signed=False, swapped=False, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100295 super().__init__()
296 self.signed = signed
297 self.swapped = swapped
Philipp Maier541a9152022-06-01 18:21:17 +0200298 self.minlen = minlen
Harald Welted0519e02022-02-11 18:05:48 +0100299
300 def _parse(self, stream, context, path):
301 data = stream_read_entire(stream, path)
302 if evaluate(self.swapped, context):
303 data = swapbytes(data)
304 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700305 return int.from_bytes(data, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100306 except ValueError as e:
307 raise IntegerError(str(e), path=path)
308
Philipp Maier541a9152022-06-01 18:21:17 +0200309 def __bytes_required(self, i, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100310 if self.signed:
311 raise NotImplementedError("FIXME: Implement support for encoding signed integer")
Philipp Maier541a9152022-06-01 18:21:17 +0200312
313 # compute how many bytes we need
Harald Welted0519e02022-02-11 18:05:48 +0100314 nbytes = 1
315 while True:
316 i = i >> 8
317 if i == 0:
Philipp Maier541a9152022-06-01 18:21:17 +0200318 break
Harald Welted0519e02022-02-11 18:05:48 +0100319 else:
320 nbytes = nbytes + 1
Philipp Maier541a9152022-06-01 18:21:17 +0200321
322 # round up to the minimum number
323 # of bytes we anticipate
324 if nbytes < minlen:
325 nbytes = minlen
326
327 return nbytes
Harald Welted0519e02022-02-11 18:05:48 +0100328
329 def _build(self, obj, stream, context, path):
330 if not isinstance(obj, integertypes):
331 raise IntegerError(f"value {obj} is not an integer", path=path)
Philipp Maier541a9152022-06-01 18:21:17 +0200332 length = self.__bytes_required(obj, self.minlen)
Harald Welted0519e02022-02-11 18:05:48 +0100333 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700334 data = obj.to_bytes(length, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100335 except ValueError as e:
336 raise IntegerError(str(e), path=path)
337 if evaluate(self.swapped, context):
338 data = swapbytes(data)
339 stream_write(stream, data, length, path)
340 return obj
Harald Weltebc0e2092022-02-13 10:54:58 +0100341
342# merged definitions of 24.008 + 23.040
343TypeOfNumber = Enum(BitsInteger(3), unknown=0, international=1, national=2, network_specific=3,
344 short_code=4, alphanumeric=5, abbreviated=6, reserved_for_extension=7)
345NumberingPlan = Enum(BitsInteger(4), unknown=0, isdn_e164=1, data_x121=3, telex_f69=4,
346 sc_specific_5=5, sc_specific_6=6, national=8, private=9,
347 ermes=10, reserved_cts=11, reserved_for_extension=15)
348TonNpi = BitStruct('ext'/Flag, 'type_of_number'/TypeOfNumber, 'numbering_plan_id'/NumberingPlan)