blob: 88ee27a14b410ad11f211c3c78760c96dceb0742 [file] [log] [blame]
Harald Welte75a58d12022-07-31 15:51:19 +02001"""Code related to SMS Encoding/Decoding"""
2# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
3# module to python3, and I gave up after >= 3 hours of trying and failing to do so
4
5# (C) 2022 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import typing
Harald Welte0b327252022-08-11 17:37:46 +020021import abc
22from bidict import bidict
23from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
24from construct import Struct, Enum, Tell, BitStruct, this, Padding
25from construct import Prefixed, GreedyRange, GreedyBytes
Harald Welte75a58d12022-07-31 15:51:19 +020026
Harald Welte0b327252022-08-11 17:37:46 +020027from pySim.construct import HexAdapter, BcdAdapter, TonNpi
Harald Welte75a58d12022-07-31 15:51:19 +020028from pySim.utils import Hexstr, h2b, b2h
29
Harald Welte0b327252022-08-11 17:37:46 +020030from smpp.pdu import pdu_types, operations
31
Harald Welte75a58d12022-07-31 15:51:19 +020032BytesOrHex = typing.Union[Hexstr, bytes]
33
34class UserDataHeader:
35 # a single IE in the user data header
Harald Welte0b327252022-08-11 17:37:46 +020036 ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
Harald Welte75a58d12022-07-31 15:51:19 +020037 # parser for the full UDH: Length octet followed by sequence of IEs
Harald Welte0b327252022-08-11 17:37:46 +020038 _construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
39 'data'/GreedyBytes)
Harald Welte75a58d12022-07-31 15:51:19 +020040
41 def __init__(self, ies=[]):
42 self.ies = ies
43
44 def __repr__(self) -> str:
45 return 'UDH(%r)' % self.ies
46
47 def has_ie(self, iei:int) -> bool:
48 for ie in self.ies:
49 if ie['iei'] == iei:
50 return True
51 return False
52
53 @classmethod
54 def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
55 if isinstance(inb, str):
56 inb = h2b(inb)
57 res = cls._construct.parse(inb)
Harald Welte0b327252022-08-11 17:37:46 +020058 return cls(res['ies']), res['data']
59
60 def toBytes(self) -> bytes:
61 return self._construct.build({'ies':self.ies, 'data':b''})
62
63
64def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
65 """Determine if the given SMPP data coding scheme is 8-bit or not."""
66 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
67 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
68 return True
69 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
70 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
71 return True
Harald Welte985ff312023-06-27 09:23:06 +020072 # pySim/sms.py:72:21: E1101: Instance of 'DataCodingScheme' has no 'GSM_MESSAGE_CLASS' member (no-member)
73 # pylint: disable=no-member
Harald Welte0b327252022-08-11 17:37:46 +020074 if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
75 return True
76 else:
77 return False
78
79def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
80 """Assert if given SMPP data coding scheme is not 8-bit."""
81 if not smpp_dcs_is_8bit(dcs):
82 raise ValueError('We only support 8bit coded SMS for now')
83
84class AddressField:
85 """Representation of an address field as used in SMS T-PDU."""
86 _construct = Struct('addr_len'/Int8ub,
87 'type_of_addr'/TonNpi,
88 'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
89 'tell'/Tell)
90 smpp_map_npi = bidict({
91 'UNKNOWN': 'unknown',
92 'ISDN': 'isdn_e164',
93 'DATA': 'data_x121',
94 'TELEX': 'telex_f69',
95 'LAND_MOBILE': 'sc_specific6',
96 'NATIONAL': 'national',
97 'PRIVATE': 'private',
98 'ERMES': 'ermes',
99 })
100 smpp_map_ton = bidict({
101 'UNKNOWN': 'unknown',
102 'INTERNATIONAL': 'international',
103 'NATIONAL': 'national',
104 'NETWORK_SPECIFIC': 'network_specific',
105 'SUBSCRIBER_NUMBER': 'short_code',
106 'ALPHANUMERIC': 'alphanumeric',
107 'ABBREVIATED': 'abbreviated',
108 })
109
110
111 def __init__(self, digits, ton='unknown', npi='unknown'):
112 self.ton = ton
113 self.npi = npi
114 self.digits = digits
115
116 def __str__(self):
117 return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
118
119 @classmethod
120 def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
121 """Construct an AddressField instance from the binary T-PDU address format."""
122 if isinstance(inb, str):
123 inb = h2b(inb)
124 res = cls._construct.parse(inb)
125 #print("size: %s" % cls._construct.sizeof())
126 ton = res['type_of_addr']['type_of_number']
127 npi = res['type_of_addr']['numbering_plan_id']
128 # return resulting instance + remainder bytes
129 return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
130
131 @classmethod
132 def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
133 """Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
134 # return the resulting instance
135 return cls(addr.decode('ascii'), AddressField.smpp_map_ton[ton.name], AddressField.smpp_map_npi[npi.name])
136
137 def toSmpp(self):
138 """Return smpp.pdo.*.source,dest}_addr_{,ton,npi} attributes for given AddressField."""
139 return (self.digits, self.smpp_map_ton.inverse[self.ton], self.smpp_map_npi.inverse[self.npi])
140
141 def toBytes(self) -> bytes:
142 """Encode the AddressField into the binary representation as used in T-PDU."""
143 num_digits = len(self.digits)
144 if num_digits % 2:
145 self.digits += 'f'
146 d = {
147 'addr_len': num_digits,
148 'type_of_addr': {
149 'ext': True,
150 'type_of_number': self.ton,
151 'numbering_plan_id': self.npi,
152 },
153 'digits': self.digits,
154 }
155 return self._construct.build(d)
156
157
158class SMS_TPDU(abc.ABC):
159 """Base class for a SMS T-PDU."""
160 def __init__(self, **kwargs):
161 self.tp_mti = kwargs.get('tp_mti', None)
162 self.tp_rp = kwargs.get('tp_rp', False)
163 self.tp_udhi = kwargs.get('tp_udhi', False)
164 self.tp_pid = kwargs.get('tp_pid', None)
165 self.tp_dcs = kwargs.get('tp_dcs', None)
166 self.tp_udl = kwargs.get('tp_udl', None)
167 self.tp_ud = kwargs.get('tp_ud', None)
168
169
170
171class SMS_DELIVER(SMS_TPDU):
172 """Representation of a SMS-DELIVER T-PDU. This is the Network to MS/UE (downlink) direction."""
173 flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
174 Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
175 def __init__(self, **kwargs):
176 kwargs['tp_mti'] = 0
177 super().__init__(**kwargs)
178 self.tp_lp = kwargs.get('tp_lp', False)
179 self.tp_mms = kwargs.get('tp_mms', False)
180 self.tp_oa = kwargs.get('tp_oa', None)
181 self.tp_scts = kwargs.get('tp_scts', None)
182 self.tp_sri = kwargs.get('tp_sri', False)
183
184 def __repr__(self):
185 return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
186
187 @classmethod
188 def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
189 """Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
190 if isinstance(inb, str):
191 inb = h2b(inb)
192 flags = inb[0]
193 d = SMS_DELIVER.flags_construct.parse(inb)
194 oa, remainder = AddressField.fromBytes(inb[1:])
195 d['tp_oa'] = oa
196 offset = 0
197 d['tp_pid'] = remainder[offset]
198 offset += 1
199 d['tp_dcs'] = remainder[offset]
200 offset += 1
201 # TODO: further decode
202 d['tp_scts'] = remainder[offset:offset+7]
203 offset += 7
204 d['tp_udl'] = remainder[offset]
205 offset += 1
206 d['tp_ud'] = remainder[offset:]
207 return cls(**d)
208
209 def toBytes(self) -> bytes:
210 """Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
211 outb = bytearray()
212 d = {
213 'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
214 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
215 }
216 flags = SMS_DELIVER.flags_construct.build(d)
217 outb.extend(flags)
218 outb.extend(self.tp_oa.toBytes())
219 outb.append(self.tp_pid)
220 outb.append(self.tp_dcs)
221 outb.extend(self.tp_scts)
222 outb.append(self.tp_udl)
223 outb.extend(self.tp_ud)
224
225 return outb
226
227 @classmethod
228 def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
229 """Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
230 if smpp_pdu.id == pdu_types.CommandId.submit_sm:
231 return cls.fromSmppSubmit(smpp_pdu)
232 else:
233 raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
234
235 @classmethod
236 def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
237 """Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
238 ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
239 tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
240 smpp_pdu.params['source_addr_ton'],
241 smpp_pdu.params['source_addr_npi'])
242 tp_ud = smpp_pdu.params['short_message']
243 d = {
244 'tp_lp': False,
245 'tp_mms': False,
246 'tp_oa': tp_oa,
247 'tp_scts': h2b('22705200000000'), # FIXME
248 'tp_sri': False,
249 'tp_rp': False,
250 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
251 'tp_pid': smpp_pdu.params['protocol_id'],
252 'tp_dcs': 0xF6, # we only deal with binary SMS here
253 'tp_udl': len(tp_ud),
254 'tp_ud': tp_ud,
255 }
256 return cls(**d)
257
258
259
260class SMS_SUBMIT(SMS_TPDU):
261 """Representation of a SMS-SUBMIT T-PDU. This is the MS/UE -> network (uplink) direction."""
262 flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
263 'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
264 'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
265 def __init__(self, **kwargs):
266 kwargs['tp_mti'] = 1
267 super().__init__(**kwargs)
268 self.tp_rd = kwargs.get('tp_rd', False)
269 self.tp_vpf = kwargs.get('tp_vpf', 'none')
270 self.tp_srr = kwargs.get('tp_srr', False)
271 self.tp_mr = kwargs.get('tp_mr', None)
272 self.tp_da = kwargs.get('tp_da', None)
273 self.tp_vp = kwargs.get('tp_vp', None)
274
275 def __repr__(self):
276 return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
277
278 @classmethod
279 def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
280 """Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
281 offset = 0
282 if isinstance(inb, str):
283 inb = h2b(inb)
284 d = SMS_SUBMIT.flags_construct.parse(inb)
285 offset += 1
286 d['tp_mr']= inb[offset]
287 offset += 1
288 da, remainder = AddressField.fromBytes(inb[2:])
289 d['tp_da'] = da
290
291 offset = 0
292 d['tp_pid'] = remainder[offset]
293 offset += 1
294 d['tp_dcs'] = remainder[offset]
295 offset += 1
296 if d['tp_vpf'] == 'none':
297 pass
298 elif d['tp_vpf'] == 'relative':
299 # TODO: further decode
300 d['tp_vp'] = remainder[offset:offset+1]
301 offset += 1
302 elif d['tp_vpf'] == 'enhanced':
303 # TODO: further decode
304 d['tp_vp'] = remainder[offset:offset+7]
305 offset += 7
306 pass
307 elif d['tp_vpf'] == 'absolute':
308 # TODO: further decode
309 d['tp_vp'] = remainder[offset:offset+7]
310 offset += 7
311 pass
312 else:
313 raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
314 d['tp_udl'] = remainder[offset]
315 offset += 1
316 d['tp_ud'] = remainder[offset:]
317 return cls(**d)
318
319 def toBytes(self) -> bytes:
320 """Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
321 outb = bytearray()
322 d = {
323 'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
324 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
325 }
326 flags = SMS_SUBMIT.flags_construct.build(d)
327 outb.extend(flags)
328 outb.append(self.tp_mr)
329 outb.extend(self.tp_da.toBytes())
330 outb.append(self.tp_pid)
331 outb.append(self.tp_dcs)
332 if self.tp_vpf != 'none':
333 outb.extend(self.tp_vp)
334 outb.append(self.tp_udl)
335 outb.extend(self.tp_ud)
336 return outb
337
338 @classmethod
339 def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
340 """Construct a SMS_SUBMIT instance from the format used by smpp.pdu."""
341 if smpp_pdu.id == pdu_types.CommandId.submit_sm:
342 return cls.fromSmppSubmit(smpp_pdu)
343 else:
344 raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
345
346 @classmethod
347 def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
348 """Construct a SMS_SUBMIT instance from the submit format used by smpp.pdu."""
349 ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
350 tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
351 smpp_pdu.params['dest_addr_ton'],
352 smpp_pdu.params['dest_addr_npi'])
353 tp_ud = smpp_pdu.params['short_message']
354 #vp_smpp = smpp_pdu.params['validity_period']
355 #if not vp_smpp:
356 # vpf = 'none'
357 d = {
358 'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
359 'tp_vpf': None, # vpf,
360 'tp_rp': False, # related to ['registered_delivery'] ?
361 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
362 'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
363 'tp_mr': 0, # FIXME: sm_default_msg_id ?
364 'tp_da': tp_da,
365 'tp_pid': smpp_pdu.params['protocol_id'],
366 'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
367 'tp_vp': None, # FIXME: implement VPF conversion
368 'tp_udl': len(tp_ud),
369 'tp_ud': tp_ud,
370 }
371 return cls(**d)
372
373 def toSmpp(self) -> pdu_types.PDU:
374 """Translate a SMS_SUBMIT instance to a smpp.pdu.operations.SubmitSM instance."""
375 esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
376 reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
377 if self.tp_rp:
378 repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
379 else:
380 repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
381 # we only deal with binary SMS here:
382 if self.tp_dcs != 0xF6:
383 raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
384 dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
385 (daddr, ton, npi) = self.tp_da.toSmpp()
386 return operations.SubmitSM(service_type='',
387 source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
388 source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
389 source_addr='simcard',
390 dest_addr_ton=ton,
391 dest_addr_npi=npi,
392 destination_addr=daddr,
393 esm_class=esm_class,
394 protocol_id=self.tp_pid,
395 priority_flag=pdu_types.PriorityFlag.LEVEL_0,
396 #schedule_delivery_time,
397 #validity_period,
398 registered_delivery=reg_del,
399 replace_if_present_flag=repl_if,
400 data_coding=dc,
401 #sm_default_msg_id,
402 short_message=self.tp_ud)