blob: 5955a084cf27be3798683f0df17564424d9a1bb5 [file] [log] [blame]
Harald Welte75a58d12022-07-31 15:51:19 +02001"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
2
Harald Welte219a5f32023-05-31 18:07:48 +02003# (C) 2021-2023 by Harald Welte <laforge@osmocom.org>
Harald Welte75a58d12022-07-31 15:51:19 +02004#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18from pySim.construct import *
19from pySim.utils import b2h
20from pySim.sms import UserDataHeader
21from construct import *
22import zlib
23import abc
24import struct
25from typing import Optional
26
27# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
28# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
29
30# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
31
32# CAT_TP TCP/IP SMS
33# CPI 0x01 0x01 =IEIa=70,len=0
34# CHI NULL NULL NULL
35# CPI, CPL and CHL included in RC/CC/DS true true
36# RPI 0x02 0x02 =IEIa=71,len=0
37# RHI NULL NULL
38# RPI, RPL and RHL included in RC/CC/DS true true
39# packet-id 0-bf,ff 0-bf,ff
40# identification packet false 102 225 tbl 6
41
42# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
43
44# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
45ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
46 cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
47 insufficient_memory=7, more_time_needed=8, tar_unknown=9,
48 insufficient_security_level=0x0A,
49 actual_response_sms_submit=0x0B,
50 actual_response_ussd=0x0C)
51
52# ETSI TS 102 226 Section 5.1.2
53CompactRemoteResp = Struct('number_of_commands'/Int8ub,
54 'last_status_word'/HexAdapter(Bytes(2)),
55 'last_response_data'/HexAdapter(GreedyBytes))
56
57RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
58
59# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
60SPI = BitStruct( # first octet
61 Padding(3),
62 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
63 counter_must_be_higher=2, counter_must_be_lower=3),
64 'ciphering'/Flag,
65 'rc_cc_ds'/RC_CC_DS,
66 # second octet
67 Padding(2),
68 'por_in_submit'/Flag,
69 'por_shall_be_ciphered'/Flag,
70 'por_rc_cc_ds'/RC_CC_DS,
71 'por'/Enum(BitsInteger(2), no_por=0,
72 por_required=1, por_only_when_error=2)
73)
74
75# TS 102 225 Section 5.1.2
76KIC = BitStruct('key'/BitsInteger(4),
77 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
78 aes_cbc=2)
79 )
80
81# TS 102 225 Section 5.1.3.1
82KID_CC = BitStruct('key'/BitsInteger(4),
83 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
84 aes_cmac=2)
85 )
86
87# TS 102 225 Section 5.1.3.2
88KID_RC = BitStruct('key'/BitsInteger(4),
89 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
90 )
91
92SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
93 'cmd_hdr_len'/Int8ub,
94 'spi'/SPI,
95 'kic'/KIC,
96 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
97 'tar'/Bytes(3),
98 'secured_data'/GreedyBytes)
99
100class OtaKeyset:
101 """The OTA related data (key material, counter) to be used in encrypt/decrypt."""
102 def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
103 algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
104 self.algo_crypt = algo_crypt
105 self.kic = bytes(kic)
106 self.kic_idx = kic_idx
107 self.algo_auth = algo_auth
108 self.kid = bytes(kid)
109 self.kid_idx = kid_idx
110 self.cntr = cntr
111
112 @property
113 def auth(self):
114 """Return an instance of the matching OtaAlgoAuth."""
115 return OtaAlgoAuth.fromKeyset(self)
116
117 @property
118 def crypt(self):
119 """Return an instance of the matching OtaAlgoCrypt."""
120 return OtaAlgoCrypt.fromKeyset(self)
121
122class OtaCheckError(Exception):
123 pass
124
125class OtaDialect(abc.ABC):
126 """Base Class for OTA dialects such as SMS, BIP, ..."""
127
128 def _compute_sig_len(self, spi:SPI):
129 if spi['rc_cc_ds'] == 'no_rc_cc_ds':
130 return 0
131 elif spi['rc_cc_ds'] == 'rc': # CRC-32
132 return 4
133 elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
134 # TODO: this is not entirely correct, as in AES case it could be 4 or 8
135 return 8
136 else:
137 raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
138
139 @abc.abstractmethod
140 def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
141 pass
142
143 @abc.abstractmethod
144 def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
145 """Decode a response into a response packet and, if indicted (by a
146 response status of `"por_ok"`) a decoded response.
147
148 The response packet's common characteristics are not fully determined,
149 and (so far) completely proprietary per dialect."""
150 pass
151
152
Harald Welted75fa3f2023-05-31 20:47:55 +0200153from Cryptodome.Cipher import DES, DES3, AES
154from Cryptodome.Hash import CMAC
Harald Welte75a58d12022-07-31 15:51:19 +0200155
156class OtaAlgo(abc.ABC):
Christian Amsüss5d263112022-11-25 04:00:55 +0100157 iv = property(lambda self: bytes([0] * self.blocksize))
Harald Welte75a58d12022-07-31 15:51:19 +0200158 blocksize = None
159 enum_name = None
160
161 @staticmethod
162 def _get_padding(in_len: int, multiple: int, padding: int = 0):
163 """Return padding bytes towards multiple of N."""
164 if in_len % multiple == 0:
165 return b''
166 pad_cnt = multiple - (in_len % multiple)
167 return b'\x00' * pad_cnt
168
169 @staticmethod
170 def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
171 """Pad input bytes to multiple of N."""
172 return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
173
174 def pad_to_blocksize(self, indat: bytes, padding: int = 0):
175 """Pad the given input data to multiple of the cipher block size."""
176 return self._pad_to_multiple(indat, self.blocksize, padding)
177
178 def __init__(self, otak: OtaKeyset):
179 self.otak = otak
180
181 def __str__(self):
182 return self.__class__.__name__
183
184class OtaAlgoCrypt(OtaAlgo, abc.ABC):
185 def __init__(self, otak: OtaKeyset):
186 if self.enum_name != otak.algo_crypt:
187 raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
188 super().__init__(otak)
189
190 def encrypt(self, data:bytes) -> bytes:
191 """Encrypt given input bytes using the key material given in constructor."""
192 padded_data = self.pad_to_blocksize(data)
193 return self._encrypt(data)
194
195 def decrypt(self, data:bytes) -> bytes:
196 """Decrypt given input bytes using the key material given in constructor."""
197 return self._decrypt(data)
198
199 @abc.abstractmethod
200 def _encrypt(self, data:bytes) -> bytes:
201 """Actual implementation, to be implemented by derived class."""
202 pass
203
204 @abc.abstractmethod
205 def _decrypt(self, data:bytes) -> bytes:
206 """Actual implementation, to be implemented by derived class."""
207 pass
208
209 @classmethod
210 def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
211 """Resolve the class for the encryption algorithm of otak and instantiate it."""
212 for subc in cls.__subclasses__():
213 if subc.enum_name == otak.algo_crypt:
214 return subc(otak)
215 raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
216
217class OtaAlgoAuth(OtaAlgo, abc.ABC):
218 def __init__(self, otak: OtaKeyset):
219 if self.enum_name != otak.algo_auth:
220 raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
221 super().__init__(otak)
222
223 def sign(self, data:bytes) -> bytes:
224 """Compute the CC/CR check bytes for the input data using key material
225 given in constructor."""
226 padded_data = self.pad_to_blocksize(data)
227 sig = self._sign(padded_data)
228 return sig
229
230 def check_sig(self, data:bytes, cc_received:bytes):
231 """Compute the CC/CR check bytes for the input data and compare against cc_received."""
232 cc = self.sign(data)
233 if cc_received != cc:
234 raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
235
236 @abc.abstractmethod
237 def _sign(self, data:bytes) -> bytes:
238 """Actual implementation, to be implemented by derived class."""
239 pass
240
241 @classmethod
242 def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
243 """Resolve the class for the authentication algorithm of otak and instantiate it."""
244 for subc in cls.__subclasses__():
245 if subc.enum_name == otak.algo_auth:
246 return subc(otak)
247 raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
248
249class OtaAlgoCryptDES(OtaAlgoCrypt):
250 """DES is insecure. For backwards compatibility with pre-Rel8"""
251 name = 'DES'
252 enum_name = 'single_des'
253 blocksize = 8
254 def _encrypt(self, data:bytes) -> bytes:
255 cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
256 return cipher.encrypt(data)
257
258 def _decrypt(self, data:bytes) -> bytes:
259 cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
260 return cipher.decrypt(data)
261
262class OtaAlgoAuthDES(OtaAlgoAuth):
263 """DES is insecure. For backwards compatibility with pre-Rel8"""
264 name = 'DES'
265 enum_name = 'single_des'
266 blocksize = 8
267 def _sign(self, data:bytes) -> bytes:
268 cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
269 ciph = cipher.encrypt(data)
270 return ciph[len(ciph) - 8:]
271
272class OtaAlgoCryptDES3(OtaAlgoCrypt):
273 name = '3DES'
274 enum_name = 'triple_des_cbc2'
275 blocksize = 8
276 def _encrypt(self, data:bytes) -> bytes:
277 cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
278 return cipher.encrypt(data)
279
280 def _decrypt(self, data:bytes) -> bytes:
281 cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
282 return cipher.decrypt(data)
283
284class OtaAlgoAuthDES3(OtaAlgoAuth):
285 name = '3DES'
286 enum_name = 'triple_des_cbc2'
287 blocksize = 8
288 def _sign(self, data:bytes) -> bytes:
289 cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
290 ciph = cipher.encrypt(data)
291 return ciph[len(ciph) - 8:]
292
293class OtaAlgoCryptAES(OtaAlgoCrypt):
294 name = 'AES'
295 enum_name = 'aes_cbc'
296 blocksize = 16 # TODO: is this needed?
297 def _encrypt(self, data:bytes) -> bytes:
298 cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
299 return cipher.encrypt(data)
300
301 def _decrypt(self, data:bytes) -> bytes:
302 cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
303 return cipher.decrypt(data)
304
305class OtaAlgoAuthAES(OtaAlgoAuth):
306 name = 'AES'
307 enum_name = 'aes_cmac'
Harald Welte219a5f32023-05-31 18:07:48 +0200308 blocksize = 1 # AES CMAC doesn't need any padding by us
Harald Welte75a58d12022-07-31 15:51:19 +0200309 def _sign(self, data:bytes) -> bytes:
310 cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
311 cmac.update(data)
312 ciph = cmac.digest()
313 return ciph[len(ciph) - 8:]
314
315
316
317class OtaDialectSms(OtaDialect):
318 """OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
319 SmsResponsePacket = Struct('rpl'/Int16ub,
320 'rhl'/Int8ub,
321 'tar'/Bytes(3),
322 'cntr'/Bytes(5),
323 'pcntr'/Int8ub,
324 'response_status'/ResponseStatus,
325 'cc_rc'/Bytes(this.rhl-10),
326 'secured_data'/GreedyBytes)
327
328 def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
329 # length of signature in octets
330 len_sig = self._compute_sig_len(spi)
331 pad_cnt = 0
332 if spi['ciphering']: # ciphering is requested
333 # append padding bytes to end up with blocksize
334 len_cipher = 6 + len_sig + len(apdu)
335 padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
336 pad_cnt = len(padding)
337 apdu += padding
338
339 kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
340 kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
341
342 # CHL = number of octets from (and including) SPI to the end of RC/CC/DS
343 # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
344 chl = 13 + len_sig
345
346 # CHL + SPI (+ KIC + KID)
347 c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
348 part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
349 #print("part_head: %s" % b2h(part_head))
350
351 # CNTR + PCNTR (CNTR not used)
352 part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
353 #print("part_cnt: %s" % b2h(part_cnt))
354
355 envelope_data = part_head + part_cnt + apdu
356 #print("envelope_data: %s" % b2h(envelope_data))
357
358 # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
359 # CPL from and including CPI to end of secured data, including any padding for ciphering
360 cpl = len(envelope_data) + len_sig
361 envelope_data = cpl.to_bytes(2, 'big') + envelope_data
362 #print("envelope_data with cpl: %s" % b2h(envelope_data))
363
364 if spi['rc_cc_ds'] == 'cc':
365 cc = otak.auth.sign(envelope_data)
366 envelope_data = part_cnt + cc + apdu
367 elif spi['rc_cc_ds'] == 'rc':
368 # CRC32
369 crc32 = zlib.crc32(envelope_data) & 0xffffffff
370 envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
371 elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
372 envelope_data = part_cnt + apdu
373 else:
374 raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
375
376 #print("envelope_data with sig: %s" % b2h(envelope_data))
377
378 # encrypt as needed
379 if spi['ciphering']: # ciphering is requested
380 ciph = otak.crypt.encrypt(envelope_data)
381 envelope_data = part_head + ciph
382 # prefix with another CPL
383 cpl = len(envelope_data)
384 envelope_data = cpl.to_bytes(2, 'big') + envelope_data
385 else:
386 envelope_data = part_head + envelope_data
387
388 #print("envelope_data: %s" % b2h(envelope_data))
389
390 return envelope_data
391
392 def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
393 if isinstance(data, str):
394 data = h2b(data)
395 # plain-text POR: 027100000e0ab000110000000000000001612f
396 # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
397 # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
398 # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
399 # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
400 if data[0] != 0x02:
401 raise ValueError('Unexpected UDL=0x%02x' % data[0])
402 udhd, remainder = UserDataHeader.fromBytes(data)
403 if not udhd.has_ie(0x71):
404 raise ValueError('RPI 0x71 not found in UDH')
405 rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
406 res = self.SmsResponsePacket.parse(remainder)
407
408 if spi['por_shall_be_ciphered']:
409 # decrypt
410 ciphered_part = remainder[6:]
411 deciph = otak.crypt.decrypt(ciphered_part)
412 temp_data = rph_rhl_tar + deciph
413 res = self.SmsResponsePacket.parse(temp_data)
414 # remove specified number of padding bytes, if any
415 if res['pcntr'] != 0:
416 # this conditional is needed as python [:-0] renders an empty return!
417 res['secured_data'] = res['secured_data'][:-res['pcntr']]
418 remainder = temp_data
419
420 # is there a CC/RC present?
421 len_sig = res['rhl'] - 10
422 if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
423 if len_sig:
424 raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
425 elif spi['por_rc_cc_ds'] == 'cc':
426 # verify signature
427 # UDH is part of CC/RC!
428 udh = data[:3]
429 # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
430 rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
431 # remove the CC/RC bytes
432 temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
433 otak.auth.check_sig(temp_data, res['cc_rc'])
434 # TODO: CRC
435 else:
436 raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
437
438 # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
439 if res.response_status == 'por_ok':
440 dec = CompactRemoteResp.parse(res['secured_data'])
441 else:
442 dec = None
443 return (res, dec)