blob: 80c823eb15e500a099dc37aa29f4ccd4e6f58e7f [file] [log] [blame]
Harald Welte75a58d12022-07-31 15:51:19 +02001"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
2
3# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18from pySim.construct import *
19from pySim.utils import b2h
20from pySim.sms import UserDataHeader
21from construct import *
22import zlib
23import abc
24import struct
25from typing import Optional
26
27# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
28# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
29
30# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
31
32# CAT_TP TCP/IP SMS
33# CPI 0x01 0x01 =IEIa=70,len=0
34# CHI NULL NULL NULL
35# CPI, CPL and CHL included in RC/CC/DS true true
36# RPI 0x02 0x02 =IEIa=71,len=0
37# RHI NULL NULL
38# RPI, RPL and RHL included in RC/CC/DS true true
39# packet-id 0-bf,ff 0-bf,ff
40# identification packet false 102 225 tbl 6
41
42# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
43
44# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
45ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
46 cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
47 insufficient_memory=7, more_time_needed=8, tar_unknown=9,
48 insufficient_security_level=0x0A,
49 actual_response_sms_submit=0x0B,
50 actual_response_ussd=0x0C)
51
52# ETSI TS 102 226 Section 5.1.2
53CompactRemoteResp = Struct('number_of_commands'/Int8ub,
54 'last_status_word'/HexAdapter(Bytes(2)),
55 'last_response_data'/HexAdapter(GreedyBytes))
56
57RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
58
59# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
60SPI = BitStruct( # first octet
61 Padding(3),
62 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
63 counter_must_be_higher=2, counter_must_be_lower=3),
64 'ciphering'/Flag,
65 'rc_cc_ds'/RC_CC_DS,
66 # second octet
67 Padding(2),
68 'por_in_submit'/Flag,
69 'por_shall_be_ciphered'/Flag,
70 'por_rc_cc_ds'/RC_CC_DS,
71 'por'/Enum(BitsInteger(2), no_por=0,
72 por_required=1, por_only_when_error=2)
73)
74
75# TS 102 225 Section 5.1.2
76KIC = BitStruct('key'/BitsInteger(4),
77 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
78 aes_cbc=2)
79 )
80
81# TS 102 225 Section 5.1.3.1
82KID_CC = BitStruct('key'/BitsInteger(4),
83 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
84 aes_cmac=2)
85 )
86
87# TS 102 225 Section 5.1.3.2
88KID_RC = BitStruct('key'/BitsInteger(4),
89 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
90 )
91
92SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
93 'cmd_hdr_len'/Int8ub,
94 'spi'/SPI,
95 'kic'/KIC,
96 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
97 'tar'/Bytes(3),
98 'secured_data'/GreedyBytes)
99
100class OtaKeyset:
101 """The OTA related data (key material, counter) to be used in encrypt/decrypt."""
102 def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
103 algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
104 self.algo_crypt = algo_crypt
105 self.kic = bytes(kic)
106 self.kic_idx = kic_idx
107 self.algo_auth = algo_auth
108 self.kid = bytes(kid)
109 self.kid_idx = kid_idx
110 self.cntr = cntr
111
112 @property
113 def auth(self):
114 """Return an instance of the matching OtaAlgoAuth."""
115 return OtaAlgoAuth.fromKeyset(self)
116
117 @property
118 def crypt(self):
119 """Return an instance of the matching OtaAlgoCrypt."""
120 return OtaAlgoCrypt.fromKeyset(self)
121
122class OtaCheckError(Exception):
123 pass
124
125class OtaDialect(abc.ABC):
126 """Base Class for OTA dialects such as SMS, BIP, ..."""
127
128 def _compute_sig_len(self, spi:SPI):
129 if spi['rc_cc_ds'] == 'no_rc_cc_ds':
130 return 0
131 elif spi['rc_cc_ds'] == 'rc': # CRC-32
132 return 4
133 elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
134 # TODO: this is not entirely correct, as in AES case it could be 4 or 8
135 return 8
136 else:
137 raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
138
139 @abc.abstractmethod
140 def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
141 pass
142
143 @abc.abstractmethod
144 def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]):
145 """Decode a response into a response packet and, if indicted (by a
146 response status of `"por_ok"`) a decoded response.
147
148 The response packet's common characteristics are not fully determined,
149 and (so far) completely proprietary per dialect."""
150 pass
151
152
153from Crypto.Cipher import DES, DES3, AES
154from Crypto.Hash import CMAC
155
156class OtaAlgo(abc.ABC):
Christian Amsüss5d263112022-11-25 04:00:55 +0100157 iv = property(lambda self: bytes([0] * self.blocksize))
Harald Welte75a58d12022-07-31 15:51:19 +0200158 blocksize = None
159 enum_name = None
160
161 @staticmethod
162 def _get_padding(in_len: int, multiple: int, padding: int = 0):
163 """Return padding bytes towards multiple of N."""
164 if in_len % multiple == 0:
165 return b''
166 pad_cnt = multiple - (in_len % multiple)
167 return b'\x00' * pad_cnt
168
169 @staticmethod
170 def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
171 """Pad input bytes to multiple of N."""
172 return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
173
174 def pad_to_blocksize(self, indat: bytes, padding: int = 0):
175 """Pad the given input data to multiple of the cipher block size."""
176 return self._pad_to_multiple(indat, self.blocksize, padding)
177
178 def __init__(self, otak: OtaKeyset):
179 self.otak = otak
180
181 def __str__(self):
182 return self.__class__.__name__
183
184class OtaAlgoCrypt(OtaAlgo, abc.ABC):
185 def __init__(self, otak: OtaKeyset):
186 if self.enum_name != otak.algo_crypt:
187 raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
188 super().__init__(otak)
189
190 def encrypt(self, data:bytes) -> bytes:
191 """Encrypt given input bytes using the key material given in constructor."""
192 padded_data = self.pad_to_blocksize(data)
193 return self._encrypt(data)
194
195 def decrypt(self, data:bytes) -> bytes:
196 """Decrypt given input bytes using the key material given in constructor."""
197 return self._decrypt(data)
198
199 @abc.abstractmethod
200 def _encrypt(self, data:bytes) -> bytes:
201 """Actual implementation, to be implemented by derived class."""
202 pass
203
204 @abc.abstractmethod
205 def _decrypt(self, data:bytes) -> bytes:
206 """Actual implementation, to be implemented by derived class."""
207 pass
208
209 @classmethod
210 def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
211 """Resolve the class for the encryption algorithm of otak and instantiate it."""
212 for subc in cls.__subclasses__():
213 if subc.enum_name == otak.algo_crypt:
214 return subc(otak)
215 raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
216
217class OtaAlgoAuth(OtaAlgo, abc.ABC):
218 def __init__(self, otak: OtaKeyset):
219 if self.enum_name != otak.algo_auth:
220 raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
221 super().__init__(otak)
222
223 def sign(self, data:bytes) -> bytes:
224 """Compute the CC/CR check bytes for the input data using key material
225 given in constructor."""
226 padded_data = self.pad_to_blocksize(data)
227 sig = self._sign(padded_data)
228 return sig
229
230 def check_sig(self, data:bytes, cc_received:bytes):
231 """Compute the CC/CR check bytes for the input data and compare against cc_received."""
232 cc = self.sign(data)
233 if cc_received != cc:
234 raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
235
236 @abc.abstractmethod
237 def _sign(self, data:bytes) -> bytes:
238 """Actual implementation, to be implemented by derived class."""
239 pass
240
241 @classmethod
242 def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
243 """Resolve the class for the authentication algorithm of otak and instantiate it."""
244 for subc in cls.__subclasses__():
245 if subc.enum_name == otak.algo_auth:
246 return subc(otak)
247 raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
248
249class OtaAlgoCryptDES(OtaAlgoCrypt):
250 """DES is insecure. For backwards compatibility with pre-Rel8"""
251 name = 'DES'
252 enum_name = 'single_des'
253 blocksize = 8
254 def _encrypt(self, data:bytes) -> bytes:
255 cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
256 return cipher.encrypt(data)
257
258 def _decrypt(self, data:bytes) -> bytes:
259 cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
260 return cipher.decrypt(data)
261
262class OtaAlgoAuthDES(OtaAlgoAuth):
263 """DES is insecure. For backwards compatibility with pre-Rel8"""
264 name = 'DES'
265 enum_name = 'single_des'
266 blocksize = 8
267 def _sign(self, data:bytes) -> bytes:
268 cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
269 ciph = cipher.encrypt(data)
270 return ciph[len(ciph) - 8:]
271
272class OtaAlgoCryptDES3(OtaAlgoCrypt):
273 name = '3DES'
274 enum_name = 'triple_des_cbc2'
275 blocksize = 8
276 def _encrypt(self, data:bytes) -> bytes:
277 cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
278 return cipher.encrypt(data)
279
280 def _decrypt(self, data:bytes) -> bytes:
281 cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
282 return cipher.decrypt(data)
283
284class OtaAlgoAuthDES3(OtaAlgoAuth):
285 name = '3DES'
286 enum_name = 'triple_des_cbc2'
287 blocksize = 8
288 def _sign(self, data:bytes) -> bytes:
289 cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
290 ciph = cipher.encrypt(data)
291 return ciph[len(ciph) - 8:]
292
293class OtaAlgoCryptAES(OtaAlgoCrypt):
294 name = 'AES'
295 enum_name = 'aes_cbc'
296 blocksize = 16 # TODO: is this needed?
297 def _encrypt(self, data:bytes) -> bytes:
298 cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
299 return cipher.encrypt(data)
300
301 def _decrypt(self, data:bytes) -> bytes:
302 cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
303 return cipher.decrypt(data)
304
305class OtaAlgoAuthAES(OtaAlgoAuth):
306 name = 'AES'
307 enum_name = 'aes_cmac'
308 blocksize = 16 # TODO: is this needed?
309 def _sign(self, data:bytes) -> bytes:
310 cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
311 cmac.update(data)
312 ciph = cmac.digest()
313 return ciph[len(ciph) - 8:]
314
315
316
317class OtaDialectSms(OtaDialect):
318 """OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
319 SmsResponsePacket = Struct('rpl'/Int16ub,
320 'rhl'/Int8ub,
321 'tar'/Bytes(3),
322 'cntr'/Bytes(5),
323 'pcntr'/Int8ub,
324 'response_status'/ResponseStatus,
325 'cc_rc'/Bytes(this.rhl-10),
326 'secured_data'/GreedyBytes)
327
328 def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
329 # length of signature in octets
330 len_sig = self._compute_sig_len(spi)
331 pad_cnt = 0
332 if spi['ciphering']: # ciphering is requested
333 # append padding bytes to end up with blocksize
334 len_cipher = 6 + len_sig + len(apdu)
335 padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
336 pad_cnt = len(padding)
337 apdu += padding
338
339 kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
340 kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
341
342 # CHL = number of octets from (and including) SPI to the end of RC/CC/DS
343 # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
344 chl = 13 + len_sig
345
346 # CHL + SPI (+ KIC + KID)
347 c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
348 part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
349 #print("part_head: %s" % b2h(part_head))
350
351 # CNTR + PCNTR (CNTR not used)
352 part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
353 #print("part_cnt: %s" % b2h(part_cnt))
354
355 envelope_data = part_head + part_cnt + apdu
356 #print("envelope_data: %s" % b2h(envelope_data))
357
358 # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
359 # CPL from and including CPI to end of secured data, including any padding for ciphering
360 cpl = len(envelope_data) + len_sig
361 envelope_data = cpl.to_bytes(2, 'big') + envelope_data
362 #print("envelope_data with cpl: %s" % b2h(envelope_data))
363
364 if spi['rc_cc_ds'] == 'cc':
365 cc = otak.auth.sign(envelope_data)
366 envelope_data = part_cnt + cc + apdu
367 elif spi['rc_cc_ds'] == 'rc':
368 # CRC32
369 crc32 = zlib.crc32(envelope_data) & 0xffffffff
370 envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
371 elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
372 envelope_data = part_cnt + apdu
373 else:
374 raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
375
376 #print("envelope_data with sig: %s" % b2h(envelope_data))
377
378 # encrypt as needed
379 if spi['ciphering']: # ciphering is requested
380 ciph = otak.crypt.encrypt(envelope_data)
381 envelope_data = part_head + ciph
382 # prefix with another CPL
383 cpl = len(envelope_data)
384 envelope_data = cpl.to_bytes(2, 'big') + envelope_data
385 else:
386 envelope_data = part_head + envelope_data
387
388 #print("envelope_data: %s" % b2h(envelope_data))
389
390 return envelope_data
391
392 def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]):
393 if isinstance(data, str):
394 data = h2b(data)
395 # plain-text POR: 027100000e0ab000110000000000000001612f
396 # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
397 # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
398 # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
399 # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
400 if data[0] != 0x02:
401 raise ValueError('Unexpected UDL=0x%02x' % data[0])
402 udhd, remainder = UserDataHeader.fromBytes(data)
403 if not udhd.has_ie(0x71):
404 raise ValueError('RPI 0x71 not found in UDH')
405 rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
406 res = self.SmsResponsePacket.parse(remainder)
407
408 if spi['por_shall_be_ciphered']:
409 # decrypt
410 ciphered_part = remainder[6:]
411 deciph = otak.crypt.decrypt(ciphered_part)
412 temp_data = rph_rhl_tar + deciph
413 res = self.SmsResponsePacket.parse(temp_data)
414 # remove specified number of padding bytes, if any
415 if res['pcntr'] != 0:
416 # this conditional is needed as python [:-0] renders an empty return!
417 res['secured_data'] = res['secured_data'][:-res['pcntr']]
418 remainder = temp_data
419
420 # is there a CC/RC present?
421 len_sig = res['rhl'] - 10
422 if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
423 if len_sig:
424 raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
425 elif spi['por_rc_cc_ds'] == 'cc':
426 # verify signature
427 # UDH is part of CC/RC!
428 udh = data[:3]
429 # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
430 rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
431 # remove the CC/RC bytes
432 temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
433 otak.auth.check_sig(temp_data, res['cc_rc'])
434 # TODO: CRC
435 else:
436 raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
437
438 # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
439 if res.response_status == 'por_ok':
440 dec = CompactRemoteResp.parse(res['secured_data'])
441 else:
442 dec = None
443 return (res, dec)