blob: a15bbcb8e7751601a3a9ea6872438d1f63dd6c1d [file] [log] [blame]
Harald Welte75a58d12022-07-31 15:51:19 +02001"""Code related to SMS Encoding/Decoding"""
2# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
3# module to python3, and I gave up after >= 3 hours of trying and failing to do so
4
5# (C) 2022 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 2 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import typing
Harald Welte0b327252022-08-11 17:37:46 +020021import abc
22from bidict import bidict
23from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
24from construct import Struct, Enum, Tell, BitStruct, this, Padding
25from construct import Prefixed, GreedyRange, GreedyBytes
Harald Welte75a58d12022-07-31 15:51:19 +020026
Harald Welte0b327252022-08-11 17:37:46 +020027from pySim.construct import HexAdapter, BcdAdapter, TonNpi
Harald Welte75a58d12022-07-31 15:51:19 +020028from pySim.utils import Hexstr, h2b, b2h
29
Harald Welte0b327252022-08-11 17:37:46 +020030from smpp.pdu import pdu_types, operations
31
Harald Welte75a58d12022-07-31 15:51:19 +020032BytesOrHex = typing.Union[Hexstr, bytes]
33
34class UserDataHeader:
35 # a single IE in the user data header
Harald Welte0b327252022-08-11 17:37:46 +020036 ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
Harald Welte75a58d12022-07-31 15:51:19 +020037 # parser for the full UDH: Length octet followed by sequence of IEs
Harald Welte0b327252022-08-11 17:37:46 +020038 _construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
39 'data'/GreedyBytes)
Harald Welte75a58d12022-07-31 15:51:19 +020040
41 def __init__(self, ies=[]):
42 self.ies = ies
43
44 def __repr__(self) -> str:
45 return 'UDH(%r)' % self.ies
46
47 def has_ie(self, iei:int) -> bool:
48 for ie in self.ies:
49 if ie['iei'] == iei:
50 return True
51 return False
52
53 @classmethod
54 def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
55 if isinstance(inb, str):
56 inb = h2b(inb)
57 res = cls._construct.parse(inb)
Harald Welte0b327252022-08-11 17:37:46 +020058 return cls(res['ies']), res['data']
59
60 def toBytes(self) -> bytes:
61 return self._construct.build({'ies':self.ies, 'data':b''})
62
63
64def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
65 """Determine if the given SMPP data coding scheme is 8-bit or not."""
66 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
67 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
68 return True
69 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
70 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
71 return True
72 if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
73 return True
74 else:
75 return False
76
77def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
78 """Assert if given SMPP data coding scheme is not 8-bit."""
79 if not smpp_dcs_is_8bit(dcs):
80 raise ValueError('We only support 8bit coded SMS for now')
81
82class AddressField:
83 """Representation of an address field as used in SMS T-PDU."""
84 _construct = Struct('addr_len'/Int8ub,
85 'type_of_addr'/TonNpi,
86 'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
87 'tell'/Tell)
88 smpp_map_npi = bidict({
89 'UNKNOWN': 'unknown',
90 'ISDN': 'isdn_e164',
91 'DATA': 'data_x121',
92 'TELEX': 'telex_f69',
93 'LAND_MOBILE': 'sc_specific6',
94 'NATIONAL': 'national',
95 'PRIVATE': 'private',
96 'ERMES': 'ermes',
97 })
98 smpp_map_ton = bidict({
99 'UNKNOWN': 'unknown',
100 'INTERNATIONAL': 'international',
101 'NATIONAL': 'national',
102 'NETWORK_SPECIFIC': 'network_specific',
103 'SUBSCRIBER_NUMBER': 'short_code',
104 'ALPHANUMERIC': 'alphanumeric',
105 'ABBREVIATED': 'abbreviated',
106 })
107
108
109 def __init__(self, digits, ton='unknown', npi='unknown'):
110 self.ton = ton
111 self.npi = npi
112 self.digits = digits
113
114 def __str__(self):
115 return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
116
117 @classmethod
118 def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
119 """Construct an AddressField instance from the binary T-PDU address format."""
120 if isinstance(inb, str):
121 inb = h2b(inb)
122 res = cls._construct.parse(inb)
123 #print("size: %s" % cls._construct.sizeof())
124 ton = res['type_of_addr']['type_of_number']
125 npi = res['type_of_addr']['numbering_plan_id']
126 # return resulting instance + remainder bytes
127 return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
128
129 @classmethod
130 def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
131 """Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
132 # return the resulting instance
133 return cls(addr.decode('ascii'), AddressField.smpp_map_ton[ton.name], AddressField.smpp_map_npi[npi.name])
134
135 def toSmpp(self):
136 """Return smpp.pdo.*.source,dest}_addr_{,ton,npi} attributes for given AddressField."""
137 return (self.digits, self.smpp_map_ton.inverse[self.ton], self.smpp_map_npi.inverse[self.npi])
138
139 def toBytes(self) -> bytes:
140 """Encode the AddressField into the binary representation as used in T-PDU."""
141 num_digits = len(self.digits)
142 if num_digits % 2:
143 self.digits += 'f'
144 d = {
145 'addr_len': num_digits,
146 'type_of_addr': {
147 'ext': True,
148 'type_of_number': self.ton,
149 'numbering_plan_id': self.npi,
150 },
151 'digits': self.digits,
152 }
153 return self._construct.build(d)
154
155
156class SMS_TPDU(abc.ABC):
157 """Base class for a SMS T-PDU."""
158 def __init__(self, **kwargs):
159 self.tp_mti = kwargs.get('tp_mti', None)
160 self.tp_rp = kwargs.get('tp_rp', False)
161 self.tp_udhi = kwargs.get('tp_udhi', False)
162 self.tp_pid = kwargs.get('tp_pid', None)
163 self.tp_dcs = kwargs.get('tp_dcs', None)
164 self.tp_udl = kwargs.get('tp_udl', None)
165 self.tp_ud = kwargs.get('tp_ud', None)
166
167
168
169class SMS_DELIVER(SMS_TPDU):
170 """Representation of a SMS-DELIVER T-PDU. This is the Network to MS/UE (downlink) direction."""
171 flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
172 Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
173 def __init__(self, **kwargs):
174 kwargs['tp_mti'] = 0
175 super().__init__(**kwargs)
176 self.tp_lp = kwargs.get('tp_lp', False)
177 self.tp_mms = kwargs.get('tp_mms', False)
178 self.tp_oa = kwargs.get('tp_oa', None)
179 self.tp_scts = kwargs.get('tp_scts', None)
180 self.tp_sri = kwargs.get('tp_sri', False)
181
182 def __repr__(self):
183 return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
184
185 @classmethod
186 def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
187 """Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
188 if isinstance(inb, str):
189 inb = h2b(inb)
190 flags = inb[0]
191 d = SMS_DELIVER.flags_construct.parse(inb)
192 oa, remainder = AddressField.fromBytes(inb[1:])
193 d['tp_oa'] = oa
194 offset = 0
195 d['tp_pid'] = remainder[offset]
196 offset += 1
197 d['tp_dcs'] = remainder[offset]
198 offset += 1
199 # TODO: further decode
200 d['tp_scts'] = remainder[offset:offset+7]
201 offset += 7
202 d['tp_udl'] = remainder[offset]
203 offset += 1
204 d['tp_ud'] = remainder[offset:]
205 return cls(**d)
206
207 def toBytes(self) -> bytes:
208 """Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
209 outb = bytearray()
210 d = {
211 'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
212 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
213 }
214 flags = SMS_DELIVER.flags_construct.build(d)
215 outb.extend(flags)
216 outb.extend(self.tp_oa.toBytes())
217 outb.append(self.tp_pid)
218 outb.append(self.tp_dcs)
219 outb.extend(self.tp_scts)
220 outb.append(self.tp_udl)
221 outb.extend(self.tp_ud)
222
223 return outb
224
225 @classmethod
226 def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
227 """Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
228 if smpp_pdu.id == pdu_types.CommandId.submit_sm:
229 return cls.fromSmppSubmit(smpp_pdu)
230 else:
231 raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
232
233 @classmethod
234 def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
235 """Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
236 ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
237 tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
238 smpp_pdu.params['source_addr_ton'],
239 smpp_pdu.params['source_addr_npi'])
240 tp_ud = smpp_pdu.params['short_message']
241 d = {
242 'tp_lp': False,
243 'tp_mms': False,
244 'tp_oa': tp_oa,
245 'tp_scts': h2b('22705200000000'), # FIXME
246 'tp_sri': False,
247 'tp_rp': False,
248 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
249 'tp_pid': smpp_pdu.params['protocol_id'],
250 'tp_dcs': 0xF6, # we only deal with binary SMS here
251 'tp_udl': len(tp_ud),
252 'tp_ud': tp_ud,
253 }
254 return cls(**d)
255
256
257
258class SMS_SUBMIT(SMS_TPDU):
259 """Representation of a SMS-SUBMIT T-PDU. This is the MS/UE -> network (uplink) direction."""
260 flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
261 'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
262 'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
263 def __init__(self, **kwargs):
264 kwargs['tp_mti'] = 1
265 super().__init__(**kwargs)
266 self.tp_rd = kwargs.get('tp_rd', False)
267 self.tp_vpf = kwargs.get('tp_vpf', 'none')
268 self.tp_srr = kwargs.get('tp_srr', False)
269 self.tp_mr = kwargs.get('tp_mr', None)
270 self.tp_da = kwargs.get('tp_da', None)
271 self.tp_vp = kwargs.get('tp_vp', None)
272
273 def __repr__(self):
274 return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
275
276 @classmethod
277 def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
278 """Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
279 offset = 0
280 if isinstance(inb, str):
281 inb = h2b(inb)
282 d = SMS_SUBMIT.flags_construct.parse(inb)
283 offset += 1
284 d['tp_mr']= inb[offset]
285 offset += 1
286 da, remainder = AddressField.fromBytes(inb[2:])
287 d['tp_da'] = da
288
289 offset = 0
290 d['tp_pid'] = remainder[offset]
291 offset += 1
292 d['tp_dcs'] = remainder[offset]
293 offset += 1
294 if d['tp_vpf'] == 'none':
295 pass
296 elif d['tp_vpf'] == 'relative':
297 # TODO: further decode
298 d['tp_vp'] = remainder[offset:offset+1]
299 offset += 1
300 elif d['tp_vpf'] == 'enhanced':
301 # TODO: further decode
302 d['tp_vp'] = remainder[offset:offset+7]
303 offset += 7
304 pass
305 elif d['tp_vpf'] == 'absolute':
306 # TODO: further decode
307 d['tp_vp'] = remainder[offset:offset+7]
308 offset += 7
309 pass
310 else:
311 raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
312 d['tp_udl'] = remainder[offset]
313 offset += 1
314 d['tp_ud'] = remainder[offset:]
315 return cls(**d)
316
317 def toBytes(self) -> bytes:
318 """Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
319 outb = bytearray()
320 d = {
321 'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
322 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
323 }
324 flags = SMS_SUBMIT.flags_construct.build(d)
325 outb.extend(flags)
326 outb.append(self.tp_mr)
327 outb.extend(self.tp_da.toBytes())
328 outb.append(self.tp_pid)
329 outb.append(self.tp_dcs)
330 if self.tp_vpf != 'none':
331 outb.extend(self.tp_vp)
332 outb.append(self.tp_udl)
333 outb.extend(self.tp_ud)
334 return outb
335
336 @classmethod
337 def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
338 """Construct a SMS_SUBMIT instance from the format used by smpp.pdu."""
339 if smpp_pdu.id == pdu_types.CommandId.submit_sm:
340 return cls.fromSmppSubmit(smpp_pdu)
341 else:
342 raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
343
344 @classmethod
345 def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
346 """Construct a SMS_SUBMIT instance from the submit format used by smpp.pdu."""
347 ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
348 tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
349 smpp_pdu.params['dest_addr_ton'],
350 smpp_pdu.params['dest_addr_npi'])
351 tp_ud = smpp_pdu.params['short_message']
352 #vp_smpp = smpp_pdu.params['validity_period']
353 #if not vp_smpp:
354 # vpf = 'none'
355 d = {
356 'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
357 'tp_vpf': None, # vpf,
358 'tp_rp': False, # related to ['registered_delivery'] ?
359 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
360 'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
361 'tp_mr': 0, # FIXME: sm_default_msg_id ?
362 'tp_da': tp_da,
363 'tp_pid': smpp_pdu.params['protocol_id'],
364 'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
365 'tp_vp': None, # FIXME: implement VPF conversion
366 'tp_udl': len(tp_ud),
367 'tp_ud': tp_ud,
368 }
369 return cls(**d)
370
371 def toSmpp(self) -> pdu_types.PDU:
372 """Translate a SMS_SUBMIT instance to a smpp.pdu.operations.SubmitSM instance."""
373 esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
374 reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
375 if self.tp_rp:
376 repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
377 else:
378 repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
379 # we only deal with binary SMS here:
380 if self.tp_dcs != 0xF6:
381 raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
382 dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
383 (daddr, ton, npi) = self.tp_da.toSmpp()
384 return operations.SubmitSM(service_type='',
385 source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
386 source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
387 source_addr='simcard',
388 dest_addr_ton=ton,
389 dest_addr_npi=npi,
390 destination_addr=daddr,
391 esm_class=esm_class,
392 protocol_id=self.tp_pid,
393 priority_flag=pdu_types.PriorityFlag.LEVEL_0,
394 #schedule_delivery_time,
395 #validity_period,
396 registered_delivery=reg_del,
397 replace_if_present_flag=repl_if,
398 data_coding=dc,
399 #sm_default_msg_id,
400 short_message=self.tp_ud)