blob: af96b49eab9c30f90eb4649c280fcff50bdd9172 [file] [log] [blame]
Harald Weltec91085e2022-02-10 18:05:45 +01001from construct.lib.containers import Container, ListContainer
2from construct.core import EnumIntegerString
Harald Welte07c7b1f2021-05-28 22:01:29 +02003import typing
Harald Weltee0f9ef12021-04-10 17:22:35 +02004from construct import *
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +07005from construct.core import evaluate, BitwisableString
Harald Welted0519e02022-02-11 18:05:48 +01006from construct.lib import integertypes
Harald Welte2db5cfb2021-04-10 19:05:37 +02007from pySim.utils import b2h, h2b, swap_nibbles
Robert Falkenbergb07a3e92021-05-07 15:23:20 +02008import gsm0338
Philipp Maier791f80a2023-07-26 17:01:37 +02009import codecs
Harald Weltee0f9ef12021-04-10 17:22:35 +020010
11"""Utility code related to the integration of the 'construct' declarative parser."""
12
Harald Welted0519e02022-02-11 18:05:48 +010013# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
Harald Weltee0f9ef12021-04-10 17:22:35 +020014#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 2 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29class HexAdapter(Adapter):
30 """convert a bytes() type to a string of hex nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010031
Harald Weltee0f9ef12021-04-10 17:22:35 +020032 def _decode(self, obj, context, path):
33 return b2h(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010034
Harald Weltee0f9ef12021-04-10 17:22:35 +020035 def _encode(self, obj, context, path):
36 return h2b(obj)
37
Philipp Maier791f80a2023-07-26 17:01:37 +020038class Utf8Adapter(Adapter):
39 """convert a bytes() type that contains utf8 encoded text to human readable text."""
40
41 def _decode(self, obj, context, path):
42 # In case the string contains only 0xff bytes we interpret it as an empty string
43 if obj == b'\xff' * len(obj):
44 return ""
45 return codecs.decode(obj, "utf-8")
46
47 def _encode(self, obj, context, path):
48 return codecs.encode(obj, "utf-8")
49
Harald Weltec91085e2022-02-10 18:05:45 +010050
Harald Welte2db5cfb2021-04-10 19:05:37 +020051class BcdAdapter(Adapter):
52 """convert a bytes() type to a string of BCD nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010053
Harald Welte2db5cfb2021-04-10 19:05:37 +020054 def _decode(self, obj, context, path):
55 return swap_nibbles(b2h(obj))
Harald Weltec91085e2022-02-10 18:05:45 +010056
Harald Welte2db5cfb2021-04-10 19:05:37 +020057 def _encode(self, obj, context, path):
58 return h2b(swap_nibbles(obj))
59
Harald Weltebc0e2092022-02-13 10:54:58 +010060class InvertAdapter(Adapter):
61 """inverse logic (false->true, true->false)."""
62 @staticmethod
63 def _invert_bool_in_obj(obj):
64 for k,v in obj.items():
65 # skip all private entries
66 if k.startswith('_'):
67 continue
68 if v == False:
69 obj[k] = True
70 elif v == True:
71 obj[k] = False
72 return obj
73
74 def _decode(self, obj, context, path):
75 return self._invert_bool_in_obj(obj)
76
77 def _encode(self, obj, context, path):
78 return self._invert_bool_in_obj(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010079
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020080class Rpad(Adapter):
81 """
Harald Weltef9a5ba52023-06-09 09:17:05 +020082 Encoder appends padding bytes (b'\\xff') or characters up to target size.
83 Decoder removes trailing padding bytes/characters.
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020084
85 Parameters:
86 subcon: Subconstruct as defined by construct library
87 pattern: set padding pattern (default: b'\\xff')
Harald Weltef9a5ba52023-06-09 09:17:05 +020088 num_per_byte: number of 'elements' per byte. E.g. for hex nibbles: 2
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020089 """
90
Harald Weltef9a5ba52023-06-09 09:17:05 +020091 def __init__(self, subcon, pattern=b'\xff', num_per_byte=1):
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020092 super().__init__(subcon)
93 self.pattern = pattern
Harald Weltef9a5ba52023-06-09 09:17:05 +020094 self.num_per_byte = num_per_byte
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020095
96 def _decode(self, obj, context, path):
97 return obj.rstrip(self.pattern)
98
99 def _encode(self, obj, context, path):
Harald Weltef9a5ba52023-06-09 09:17:05 +0200100 target_size = self.sizeof() * self.num_per_byte
101 if len(obj) > target_size:
Harald Weltec91085e2022-02-10 18:05:45 +0100102 raise SizeofError("Input ({}) exceeds target size ({})".format(
Harald Weltef9a5ba52023-06-09 09:17:05 +0200103 len(obj), target_size))
104 return obj + self.pattern * (target_size - len(obj))
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200105
Harald Welte954ce952023-05-27 20:08:09 +0200106class MultiplyAdapter(Adapter):
107 """
108 Decoder multiplies by multiplicator
109 Encoder divides by multiplicator
110
111 Parameters:
112 subcon: Subconstruct as defined by construct library
113 multiplier: Multiplier to apply to raw encoded value
114 """
115
116 def __init__(self, subcon, multiplicator):
117 super().__init__(subcon)
118 self.multiplicator = multiplicator
119
120 def _decode(self, obj, context, path):
121 return obj * 8
122
123 def _encode(self, obj, context, path):
124 return obj // 8
125
Harald Weltec91085e2022-02-10 18:05:45 +0100126
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200127class GsmStringAdapter(Adapter):
128 """Convert GSM 03.38 encoded bytes to a string."""
129
130 def __init__(self, subcon, codec='gsm03.38', err='strict'):
131 super().__init__(subcon)
132 self.codec = codec
133 self.err = err
134
135 def _decode(self, obj, context, path):
136 return obj.decode(self.codec)
137
138 def _encode(self, obj, context, path):
139 return obj.encode(self.codec, self.err)
140
Harald Weltec91085e2022-02-10 18:05:45 +0100141
Harald Weltee0f9ef12021-04-10 17:22:35 +0200142def filter_dict(d, exclude_prefix='_'):
143 """filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
Harald Welte7fca85b2021-05-29 21:27:46 +0200144 if not isinstance(d, dict):
145 return d
Harald Weltee0f9ef12021-04-10 17:22:35 +0200146 res = {}
147 for (key, value) in d.items():
148 if key.startswith(exclude_prefix):
149 continue
150 if type(value) is dict:
151 res[key] = filter_dict(value)
152 else:
153 res[key] = value
154 return res
155
Harald Welte07c7b1f2021-05-28 22:01:29 +0200156
157def normalize_construct(c):
158 """Convert a construct specific type to a related base type, mostly useful
159 so we can serialize it."""
160 # we need to include the filter_dict as we otherwise get elements like this
161 # in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
162 c = filter_dict(c)
163 if isinstance(c, Container) or isinstance(c, dict):
Harald Weltec91085e2022-02-10 18:05:45 +0100164 r = {k: normalize_construct(v) for (k, v) in c.items()}
Harald Welte07c7b1f2021-05-28 22:01:29 +0200165 elif isinstance(c, ListContainer):
166 r = [normalize_construct(x) for x in c]
167 elif isinstance(c, list):
168 r = [normalize_construct(x) for x in c]
169 elif isinstance(c, EnumIntegerString):
170 r = str(c)
171 else:
172 r = c
173 return r
174
Harald Weltec91085e2022-02-10 18:05:45 +0100175
176def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None, exclude_prefix: str = '_'):
Harald Welte07c7b1f2021-05-28 22:01:29 +0200177 """Helper function to wrap around normalize_construct() and filter_dict()."""
178 if not length:
179 length = len(raw_bin_data)
180 parsed = c.parse(raw_bin_data, total_len=length)
181 return normalize_construct(parsed)
182
Harald Weltec91085e2022-02-10 18:05:45 +0100183
Harald Weltee0f9ef12021-04-10 17:22:35 +0200184# here we collect some shared / common definitions of data types
185LV = Prefixed(Int8ub, HexAdapter(GreedyBytes))
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200186
187# Default value for Reserved for Future Use (RFU) bits/bytes
188# See TS 31.101 Sec. "3.4 Coding Conventions"
189__RFU_VALUE = 0
190
191# Field that packs Reserved for Future Use (RFU) bit
192FlagRFU = Default(Flag, __RFU_VALUE)
193
194# Field that packs Reserved for Future Use (RFU) byte
195ByteRFU = Default(Byte, __RFU_VALUE)
196
197# Field that packs all remaining Reserved for Future Use (RFU) bytes
198GreedyBytesRFU = Default(GreedyBytes, b'')
199
Harald Weltec91085e2022-02-10 18:05:45 +0100200
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200201def BitsRFU(n=1):
202 '''
203 Field that packs Reserved for Future Use (RFU) bit(s)
204 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
205
206 Use this for (currently) unused/reserved bits whose contents
207 should be initialized automatically but should not be cleared
208 in the future or when restoring read data (unlike padding).
209
210 Parameters:
211 n (Integer): Number of bits (default: 1)
212 '''
213 return Default(BitsInteger(n), __RFU_VALUE)
214
Harald Weltec91085e2022-02-10 18:05:45 +0100215
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200216def BytesRFU(n=1):
217 '''
218 Field that packs Reserved for Future Use (RFU) byte(s)
219 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
220
221 Use this for (currently) unused/reserved bytes whose contents
222 should be initialized automatically but should not be cleared
223 in the future or when restoring read data (unlike padding).
224
225 Parameters:
226 n (Integer): Number of bytes (default: 1)
227 '''
228 return Default(Bytes(n), __RFU_VALUE)
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200229
Harald Weltec91085e2022-02-10 18:05:45 +0100230
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200231def GsmString(n):
232 '''
233 GSM 03.38 encoded byte string of fixed length n.
234 Encoder appends padding bytes (b'\\xff') to maintain
235 length. Decoder removes those trailing bytes.
236
237 Exceptions are raised for invalid characters
238 and length excess.
239
240 Parameters:
241 n (Integer): Fixed length of the encoded byte string
242 '''
243 return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
Harald Welted0519e02022-02-11 18:05:48 +0100244
245class GreedyInteger(Construct):
246 """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
Philipp Maier541a9152022-06-01 18:21:17 +0200247 def __init__(self, signed=False, swapped=False, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100248 super().__init__()
249 self.signed = signed
250 self.swapped = swapped
Philipp Maier541a9152022-06-01 18:21:17 +0200251 self.minlen = minlen
Harald Welted0519e02022-02-11 18:05:48 +0100252
253 def _parse(self, stream, context, path):
254 data = stream_read_entire(stream, path)
255 if evaluate(self.swapped, context):
256 data = swapbytes(data)
257 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700258 return int.from_bytes(data, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100259 except ValueError as e:
260 raise IntegerError(str(e), path=path)
261
Philipp Maier541a9152022-06-01 18:21:17 +0200262 def __bytes_required(self, i, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100263 if self.signed:
264 raise NotImplementedError("FIXME: Implement support for encoding signed integer")
Philipp Maier541a9152022-06-01 18:21:17 +0200265
266 # compute how many bytes we need
Harald Welted0519e02022-02-11 18:05:48 +0100267 nbytes = 1
268 while True:
269 i = i >> 8
270 if i == 0:
Philipp Maier541a9152022-06-01 18:21:17 +0200271 break
Harald Welted0519e02022-02-11 18:05:48 +0100272 else:
273 nbytes = nbytes + 1
Philipp Maier541a9152022-06-01 18:21:17 +0200274
275 # round up to the minimum number
276 # of bytes we anticipate
277 if nbytes < minlen:
278 nbytes = minlen
279
280 return nbytes
Harald Welted0519e02022-02-11 18:05:48 +0100281
282 def _build(self, obj, stream, context, path):
283 if not isinstance(obj, integertypes):
284 raise IntegerError(f"value {obj} is not an integer", path=path)
Philipp Maier541a9152022-06-01 18:21:17 +0200285 length = self.__bytes_required(obj, self.minlen)
Harald Welted0519e02022-02-11 18:05:48 +0100286 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700287 data = obj.to_bytes(length, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100288 except ValueError as e:
289 raise IntegerError(str(e), path=path)
290 if evaluate(self.swapped, context):
291 data = swapbytes(data)
292 stream_write(stream, data, length, path)
293 return obj
Harald Weltebc0e2092022-02-13 10:54:58 +0100294
295# merged definitions of 24.008 + 23.040
296TypeOfNumber = Enum(BitsInteger(3), unknown=0, international=1, national=2, network_specific=3,
297 short_code=4, alphanumeric=5, abbreviated=6, reserved_for_extension=7)
298NumberingPlan = Enum(BitsInteger(4), unknown=0, isdn_e164=1, data_x121=3, telex_f69=4,
299 sc_specific_5=5, sc_specific_6=6, national=8, private=9,
300 ermes=10, reserved_cts=11, reserved_for_extension=15)
301TonNpi = BitStruct('ext'/Flag, 'type_of_number'/TypeOfNumber, 'numbering_plan_id'/NumberingPlan)