blob: 97af235a0ea16e92bfd63323df36545ca04b5640 [file] [log] [blame]
Harald Weltec91085e2022-02-10 18:05:45 +01001from construct.lib.containers import Container, ListContainer
2from construct.core import EnumIntegerString
Harald Welte07c7b1f2021-05-28 22:01:29 +02003import typing
Harald Weltee0f9ef12021-04-10 17:22:35 +02004from construct import *
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +07005from construct.core import evaluate, BitwisableString
Harald Welted0519e02022-02-11 18:05:48 +01006from construct.lib import integertypes
Harald Welte2db5cfb2021-04-10 19:05:37 +02007from pySim.utils import b2h, h2b, swap_nibbles
Robert Falkenbergb07a3e92021-05-07 15:23:20 +02008import gsm0338
Harald Weltee0f9ef12021-04-10 17:22:35 +02009
10"""Utility code related to the integration of the 'construct' declarative parser."""
11
Harald Welted0519e02022-02-11 18:05:48 +010012# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
Harald Weltee0f9ef12021-04-10 17:22:35 +020013#
14# This program is free software: you can redistribute it and/or modify
15# it under the terms of the GNU General Public License as published by
16# the Free Software Foundation, either version 2 of the License, or
17# (at your option) any later version.
18#
19# This program is distributed in the hope that it will be useful,
20# but WITHOUT ANY WARRANTY; without even the implied warranty of
21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22# GNU General Public License for more details.
23#
24# You should have received a copy of the GNU General Public License
25# along with this program. If not, see <http://www.gnu.org/licenses/>.
26
27
28class HexAdapter(Adapter):
29 """convert a bytes() type to a string of hex nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010030
Harald Weltee0f9ef12021-04-10 17:22:35 +020031 def _decode(self, obj, context, path):
32 return b2h(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010033
Harald Weltee0f9ef12021-04-10 17:22:35 +020034 def _encode(self, obj, context, path):
35 return h2b(obj)
36
Harald Weltec91085e2022-02-10 18:05:45 +010037
Harald Welte2db5cfb2021-04-10 19:05:37 +020038class BcdAdapter(Adapter):
39 """convert a bytes() type to a string of BCD nibbles."""
Harald Weltec91085e2022-02-10 18:05:45 +010040
Harald Welte2db5cfb2021-04-10 19:05:37 +020041 def _decode(self, obj, context, path):
42 return swap_nibbles(b2h(obj))
Harald Weltec91085e2022-02-10 18:05:45 +010043
Harald Welte2db5cfb2021-04-10 19:05:37 +020044 def _encode(self, obj, context, path):
45 return h2b(swap_nibbles(obj))
46
Harald Weltebc0e2092022-02-13 10:54:58 +010047class InvertAdapter(Adapter):
48 """inverse logic (false->true, true->false)."""
49 @staticmethod
50 def _invert_bool_in_obj(obj):
51 for k,v in obj.items():
52 # skip all private entries
53 if k.startswith('_'):
54 continue
55 if v == False:
56 obj[k] = True
57 elif v == True:
58 obj[k] = False
59 return obj
60
61 def _decode(self, obj, context, path):
62 return self._invert_bool_in_obj(obj)
63
64 def _encode(self, obj, context, path):
65 return self._invert_bool_in_obj(obj)
Harald Weltec91085e2022-02-10 18:05:45 +010066
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020067class Rpad(Adapter):
68 """
69 Encoder appends padding bytes (b'\\xff') up to target size.
70 Decoder removes trailing padding bytes.
71
72 Parameters:
73 subcon: Subconstruct as defined by construct library
74 pattern: set padding pattern (default: b'\\xff')
75 """
76
77 def __init__(self, subcon, pattern=b'\xff'):
78 super().__init__(subcon)
79 self.pattern = pattern
80
81 def _decode(self, obj, context, path):
82 return obj.rstrip(self.pattern)
83
84 def _encode(self, obj, context, path):
85 if len(obj) > self.sizeof():
Harald Weltec91085e2022-02-10 18:05:45 +010086 raise SizeofError("Input ({}) exceeds target size ({})".format(
87 len(obj), self.sizeof()))
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020088 return obj + self.pattern * (self.sizeof() - len(obj))
89
Harald Weltec91085e2022-02-10 18:05:45 +010090
Robert Falkenbergb07a3e92021-05-07 15:23:20 +020091class GsmStringAdapter(Adapter):
92 """Convert GSM 03.38 encoded bytes to a string."""
93
94 def __init__(self, subcon, codec='gsm03.38', err='strict'):
95 super().__init__(subcon)
96 self.codec = codec
97 self.err = err
98
99 def _decode(self, obj, context, path):
100 return obj.decode(self.codec)
101
102 def _encode(self, obj, context, path):
103 return obj.encode(self.codec, self.err)
104
Harald Weltec91085e2022-02-10 18:05:45 +0100105
Harald Weltee0f9ef12021-04-10 17:22:35 +0200106def filter_dict(d, exclude_prefix='_'):
107 """filter the input dict to ensure no keys starting with 'exclude_prefix' remain."""
Harald Welte7fca85b2021-05-29 21:27:46 +0200108 if not isinstance(d, dict):
109 return d
Harald Weltee0f9ef12021-04-10 17:22:35 +0200110 res = {}
111 for (key, value) in d.items():
112 if key.startswith(exclude_prefix):
113 continue
114 if type(value) is dict:
115 res[key] = filter_dict(value)
116 else:
117 res[key] = value
118 return res
119
Harald Welte07c7b1f2021-05-28 22:01:29 +0200120
121def normalize_construct(c):
122 """Convert a construct specific type to a related base type, mostly useful
123 so we can serialize it."""
124 # we need to include the filter_dict as we otherwise get elements like this
125 # in the dict: '_io': <_io.BytesIO object at 0x7fdb64e05860> which we cannot json-serialize
126 c = filter_dict(c)
127 if isinstance(c, Container) or isinstance(c, dict):
Harald Weltec91085e2022-02-10 18:05:45 +0100128 r = {k: normalize_construct(v) for (k, v) in c.items()}
Harald Welte07c7b1f2021-05-28 22:01:29 +0200129 elif isinstance(c, ListContainer):
130 r = [normalize_construct(x) for x in c]
131 elif isinstance(c, list):
132 r = [normalize_construct(x) for x in c]
133 elif isinstance(c, EnumIntegerString):
134 r = str(c)
135 else:
136 r = c
137 return r
138
Harald Weltec91085e2022-02-10 18:05:45 +0100139
140def parse_construct(c, raw_bin_data: bytes, length: typing.Optional[int] = None, exclude_prefix: str = '_'):
Harald Welte07c7b1f2021-05-28 22:01:29 +0200141 """Helper function to wrap around normalize_construct() and filter_dict()."""
142 if not length:
143 length = len(raw_bin_data)
144 parsed = c.parse(raw_bin_data, total_len=length)
145 return normalize_construct(parsed)
146
Harald Weltec91085e2022-02-10 18:05:45 +0100147
Harald Weltee0f9ef12021-04-10 17:22:35 +0200148# here we collect some shared / common definitions of data types
149LV = Prefixed(Int8ub, HexAdapter(GreedyBytes))
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200150
151# Default value for Reserved for Future Use (RFU) bits/bytes
152# See TS 31.101 Sec. "3.4 Coding Conventions"
153__RFU_VALUE = 0
154
155# Field that packs Reserved for Future Use (RFU) bit
156FlagRFU = Default(Flag, __RFU_VALUE)
157
158# Field that packs Reserved for Future Use (RFU) byte
159ByteRFU = Default(Byte, __RFU_VALUE)
160
161# Field that packs all remaining Reserved for Future Use (RFU) bytes
162GreedyBytesRFU = Default(GreedyBytes, b'')
163
Harald Weltec91085e2022-02-10 18:05:45 +0100164
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200165def BitsRFU(n=1):
166 '''
167 Field that packs Reserved for Future Use (RFU) bit(s)
168 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
169
170 Use this for (currently) unused/reserved bits whose contents
171 should be initialized automatically but should not be cleared
172 in the future or when restoring read data (unlike padding).
173
174 Parameters:
175 n (Integer): Number of bits (default: 1)
176 '''
177 return Default(BitsInteger(n), __RFU_VALUE)
178
Harald Weltec91085e2022-02-10 18:05:45 +0100179
Robert Falkenberg9d16fbc2021-04-12 11:43:22 +0200180def BytesRFU(n=1):
181 '''
182 Field that packs Reserved for Future Use (RFU) byte(s)
183 as defined in TS 31.101 Sec. "3.4 Coding Conventions"
184
185 Use this for (currently) unused/reserved bytes whose contents
186 should be initialized automatically but should not be cleared
187 in the future or when restoring read data (unlike padding).
188
189 Parameters:
190 n (Integer): Number of bytes (default: 1)
191 '''
192 return Default(Bytes(n), __RFU_VALUE)
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200193
Harald Weltec91085e2022-02-10 18:05:45 +0100194
Robert Falkenbergb07a3e92021-05-07 15:23:20 +0200195def GsmString(n):
196 '''
197 GSM 03.38 encoded byte string of fixed length n.
198 Encoder appends padding bytes (b'\\xff') to maintain
199 length. Decoder removes those trailing bytes.
200
201 Exceptions are raised for invalid characters
202 and length excess.
203
204 Parameters:
205 n (Integer): Fixed length of the encoded byte string
206 '''
207 return GsmStringAdapter(Rpad(Bytes(n), pattern=b'\xff'), codec='gsm03.38')
Harald Welted0519e02022-02-11 18:05:48 +0100208
209class GreedyInteger(Construct):
210 """A variable-length integer implementation, think of combining GrredyBytes with BytesInteger."""
Philipp Maier541a9152022-06-01 18:21:17 +0200211 def __init__(self, signed=False, swapped=False, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100212 super().__init__()
213 self.signed = signed
214 self.swapped = swapped
Philipp Maier541a9152022-06-01 18:21:17 +0200215 self.minlen = minlen
Harald Welted0519e02022-02-11 18:05:48 +0100216
217 def _parse(self, stream, context, path):
218 data = stream_read_entire(stream, path)
219 if evaluate(self.swapped, context):
220 data = swapbytes(data)
221 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700222 return int.from_bytes(data, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100223 except ValueError as e:
224 raise IntegerError(str(e), path=path)
225
Philipp Maier541a9152022-06-01 18:21:17 +0200226 def __bytes_required(self, i, minlen=0):
Harald Welted0519e02022-02-11 18:05:48 +0100227 if self.signed:
228 raise NotImplementedError("FIXME: Implement support for encoding signed integer")
Philipp Maier541a9152022-06-01 18:21:17 +0200229
230 # compute how many bytes we need
Harald Welted0519e02022-02-11 18:05:48 +0100231 nbytes = 1
232 while True:
233 i = i >> 8
234 if i == 0:
Philipp Maier541a9152022-06-01 18:21:17 +0200235 break
Harald Welted0519e02022-02-11 18:05:48 +0100236 else:
237 nbytes = nbytes + 1
Philipp Maier541a9152022-06-01 18:21:17 +0200238
239 # round up to the minimum number
240 # of bytes we anticipate
241 if nbytes < minlen:
242 nbytes = minlen
243
244 return nbytes
Harald Welted0519e02022-02-11 18:05:48 +0100245
246 def _build(self, obj, stream, context, path):
247 if not isinstance(obj, integertypes):
248 raise IntegerError(f"value {obj} is not an integer", path=path)
Philipp Maier541a9152022-06-01 18:21:17 +0200249 length = self.__bytes_required(obj, self.minlen)
Harald Welted0519e02022-02-11 18:05:48 +0100250 try:
Vadim Yanitskiy05d30eb2022-08-29 20:24:44 +0700251 data = obj.to_bytes(length, byteorder='big', signed=self.signed)
Harald Welted0519e02022-02-11 18:05:48 +0100252 except ValueError as e:
253 raise IntegerError(str(e), path=path)
254 if evaluate(self.swapped, context):
255 data = swapbytes(data)
256 stream_write(stream, data, length, path)
257 return obj
Harald Weltebc0e2092022-02-13 10:54:58 +0100258
259# merged definitions of 24.008 + 23.040
260TypeOfNumber = Enum(BitsInteger(3), unknown=0, international=1, national=2, network_specific=3,
261 short_code=4, alphanumeric=5, abbreviated=6, reserved_for_extension=7)
262NumberingPlan = Enum(BitsInteger(4), unknown=0, isdn_e164=1, data_x121=3, telex_f69=4,
263 sc_specific_5=5, sc_specific_6=6, national=8, private=9,
264 ermes=10, reserved_cts=11, reserved_for_extension=15)
265TonNpi = BitStruct('ext'/Flag, 'type_of_number'/TypeOfNumber, 'numbering_plan_id'/NumberingPlan)