Harald Welte | 75a58d1 | 2022-07-31 15:51:19 +0200 | [diff] [blame] | 1 | """Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115.""" |
| 2 | |
Harald Welte | 219a5f3 | 2023-05-31 18:07:48 +0200 | [diff] [blame] | 3 | # (C) 2021-2023 by Harald Welte <laforge@osmocom.org> |
Harald Welte | 75a58d1 | 2022-07-31 15:51:19 +0200 | [diff] [blame] | 4 | # |
| 5 | # This program is free software: you can redistribute it and/or modify |
| 6 | # it under the terms of the GNU General Public License as published by |
| 7 | # the Free Software Foundation, either version 2 of the License, or |
| 8 | # (at your option) any later version. |
| 9 | # |
| 10 | # This program is distributed in the hope that it will be useful, |
| 11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | # GNU General Public License for more details. |
| 14 | # |
| 15 | # You should have received a copy of the GNU General Public License |
| 16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 17 | |
| 18 | from pySim.construct import * |
| 19 | from pySim.utils import b2h |
| 20 | from pySim.sms import UserDataHeader |
| 21 | from construct import * |
| 22 | import zlib |
| 23 | import abc |
| 24 | import struct |
| 25 | from typing import Optional |
| 26 | |
| 27 | # ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS |
| 28 | # 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP |
| 29 | |
| 30 | # CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data |
| 31 | |
| 32 | # CAT_TP TCP/IP SMS |
| 33 | # CPI 0x01 0x01 =IEIa=70,len=0 |
| 34 | # CHI NULL NULL NULL |
| 35 | # CPI, CPL and CHL included in RC/CC/DS true true |
| 36 | # RPI 0x02 0x02 =IEIa=71,len=0 |
| 37 | # RHI NULL NULL |
| 38 | # RPI, RPL and RHL included in RC/CC/DS true true |
| 39 | # packet-id 0-bf,ff 0-bf,ff |
| 40 | # identification packet false 102 225 tbl 6 |
| 41 | |
| 42 | # KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK |
| 43 | |
| 44 | # ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7 |
| 45 | ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3, |
| 46 | cntr_blocked=4, ciphering_error=5, undefined_security_error=6, |
| 47 | insufficient_memory=7, more_time_needed=8, tar_unknown=9, |
| 48 | insufficient_security_level=0x0A, |
| 49 | actual_response_sms_submit=0x0B, |
| 50 | actual_response_ussd=0x0C) |
| 51 | |
| 52 | # ETSI TS 102 226 Section 5.1.2 |
| 53 | CompactRemoteResp = Struct('number_of_commands'/Int8ub, |
| 54 | 'last_status_word'/HexAdapter(Bytes(2)), |
| 55 | 'last_response_data'/HexAdapter(GreedyBytes)) |
| 56 | |
| 57 | RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3) |
| 58 | |
| 59 | # TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2 |
| 60 | SPI = BitStruct( # first octet |
| 61 | Padding(3), |
| 62 | 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, |
| 63 | counter_must_be_higher=2, counter_must_be_lower=3), |
| 64 | 'ciphering'/Flag, |
| 65 | 'rc_cc_ds'/RC_CC_DS, |
| 66 | # second octet |
| 67 | Padding(2), |
| 68 | 'por_in_submit'/Flag, |
| 69 | 'por_shall_be_ciphered'/Flag, |
| 70 | 'por_rc_cc_ds'/RC_CC_DS, |
| 71 | 'por'/Enum(BitsInteger(2), no_por=0, |
| 72 | por_required=1, por_only_when_error=2) |
| 73 | ) |
| 74 | |
| 75 | # TS 102 225 Section 5.1.2 |
| 76 | KIC = BitStruct('key'/BitsInteger(4), |
| 77 | 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, |
| 78 | aes_cbc=2) |
| 79 | ) |
| 80 | |
| 81 | # TS 102 225 Section 5.1.3.1 |
| 82 | KID_CC = BitStruct('key'/BitsInteger(4), |
| 83 | 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, |
| 84 | aes_cmac=2) |
| 85 | ) |
| 86 | |
| 87 | # TS 102 225 Section 5.1.3.2 |
| 88 | KID_RC = BitStruct('key'/BitsInteger(4), |
| 89 | 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3) |
| 90 | ) |
| 91 | |
| 92 | SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub, |
| 93 | 'cmd_hdr_len'/Int8ub, |
| 94 | 'spi'/SPI, |
| 95 | 'kic'/KIC, |
| 96 | 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }), |
| 97 | 'tar'/Bytes(3), |
| 98 | 'secured_data'/GreedyBytes) |
| 99 | |
| 100 | class OtaKeyset: |
| 101 | """The OTA related data (key material, counter) to be used in encrypt/decrypt.""" |
| 102 | def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes, |
| 103 | algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0): |
| 104 | self.algo_crypt = algo_crypt |
| 105 | self.kic = bytes(kic) |
| 106 | self.kic_idx = kic_idx |
| 107 | self.algo_auth = algo_auth |
| 108 | self.kid = bytes(kid) |
| 109 | self.kid_idx = kid_idx |
| 110 | self.cntr = cntr |
| 111 | |
| 112 | @property |
| 113 | def auth(self): |
| 114 | """Return an instance of the matching OtaAlgoAuth.""" |
| 115 | return OtaAlgoAuth.fromKeyset(self) |
| 116 | |
| 117 | @property |
| 118 | def crypt(self): |
| 119 | """Return an instance of the matching OtaAlgoCrypt.""" |
| 120 | return OtaAlgoCrypt.fromKeyset(self) |
| 121 | |
| 122 | class OtaCheckError(Exception): |
| 123 | pass |
| 124 | |
| 125 | class OtaDialect(abc.ABC): |
| 126 | """Base Class for OTA dialects such as SMS, BIP, ...""" |
| 127 | |
| 128 | def _compute_sig_len(self, spi:SPI): |
| 129 | if spi['rc_cc_ds'] == 'no_rc_cc_ds': |
| 130 | return 0 |
| 131 | elif spi['rc_cc_ds'] == 'rc': # CRC-32 |
| 132 | return 4 |
| 133 | elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC) |
| 134 | # TODO: this is not entirely correct, as in AES case it could be 4 or 8 |
| 135 | return 8 |
| 136 | else: |
| 137 | raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) |
| 138 | |
| 139 | @abc.abstractmethod |
| 140 | def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes: |
| 141 | pass |
| 142 | |
| 143 | @abc.abstractmethod |
| 144 | def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]): |
| 145 | """Decode a response into a response packet and, if indicted (by a |
| 146 | response status of `"por_ok"`) a decoded response. |
| 147 | |
| 148 | The response packet's common characteristics are not fully determined, |
| 149 | and (so far) completely proprietary per dialect.""" |
| 150 | pass |
| 151 | |
| 152 | |
Harald Welte | d75fa3f | 2023-05-31 20:47:55 +0200 | [diff] [blame] | 153 | from Cryptodome.Cipher import DES, DES3, AES |
| 154 | from Cryptodome.Hash import CMAC |
Harald Welte | 75a58d1 | 2022-07-31 15:51:19 +0200 | [diff] [blame] | 155 | |
| 156 | class OtaAlgo(abc.ABC): |
Christian Amsüss | 5d26311 | 2022-11-25 04:00:55 +0100 | [diff] [blame] | 157 | iv = property(lambda self: bytes([0] * self.blocksize)) |
Harald Welte | 75a58d1 | 2022-07-31 15:51:19 +0200 | [diff] [blame] | 158 | blocksize = None |
| 159 | enum_name = None |
| 160 | |
| 161 | @staticmethod |
| 162 | def _get_padding(in_len: int, multiple: int, padding: int = 0): |
| 163 | """Return padding bytes towards multiple of N.""" |
| 164 | if in_len % multiple == 0: |
| 165 | return b'' |
| 166 | pad_cnt = multiple - (in_len % multiple) |
| 167 | return b'\x00' * pad_cnt |
| 168 | |
| 169 | @staticmethod |
| 170 | def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0): |
| 171 | """Pad input bytes to multiple of N.""" |
| 172 | return indat + OtaAlgo._get_padding(len(indat), multiple, padding) |
| 173 | |
| 174 | def pad_to_blocksize(self, indat: bytes, padding: int = 0): |
| 175 | """Pad the given input data to multiple of the cipher block size.""" |
| 176 | return self._pad_to_multiple(indat, self.blocksize, padding) |
| 177 | |
| 178 | def __init__(self, otak: OtaKeyset): |
| 179 | self.otak = otak |
| 180 | |
| 181 | def __str__(self): |
| 182 | return self.__class__.__name__ |
| 183 | |
| 184 | class OtaAlgoCrypt(OtaAlgo, abc.ABC): |
| 185 | def __init__(self, otak: OtaKeyset): |
| 186 | if self.enum_name != otak.algo_crypt: |
| 187 | raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) |
| 188 | super().__init__(otak) |
| 189 | |
| 190 | def encrypt(self, data:bytes) -> bytes: |
| 191 | """Encrypt given input bytes using the key material given in constructor.""" |
| 192 | padded_data = self.pad_to_blocksize(data) |
| 193 | return self._encrypt(data) |
| 194 | |
| 195 | def decrypt(self, data:bytes) -> bytes: |
| 196 | """Decrypt given input bytes using the key material given in constructor.""" |
| 197 | return self._decrypt(data) |
| 198 | |
| 199 | @abc.abstractmethod |
| 200 | def _encrypt(self, data:bytes) -> bytes: |
| 201 | """Actual implementation, to be implemented by derived class.""" |
| 202 | pass |
| 203 | |
| 204 | @abc.abstractmethod |
| 205 | def _decrypt(self, data:bytes) -> bytes: |
| 206 | """Actual implementation, to be implemented by derived class.""" |
| 207 | pass |
| 208 | |
| 209 | @classmethod |
| 210 | def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt': |
| 211 | """Resolve the class for the encryption algorithm of otak and instantiate it.""" |
| 212 | for subc in cls.__subclasses__(): |
| 213 | if subc.enum_name == otak.algo_crypt: |
| 214 | return subc(otak) |
| 215 | raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth) |
| 216 | |
| 217 | class OtaAlgoAuth(OtaAlgo, abc.ABC): |
| 218 | def __init__(self, otak: OtaKeyset): |
| 219 | if self.enum_name != otak.algo_auth: |
| 220 | raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) |
| 221 | super().__init__(otak) |
| 222 | |
| 223 | def sign(self, data:bytes) -> bytes: |
| 224 | """Compute the CC/CR check bytes for the input data using key material |
| 225 | given in constructor.""" |
| 226 | padded_data = self.pad_to_blocksize(data) |
| 227 | sig = self._sign(padded_data) |
| 228 | return sig |
| 229 | |
| 230 | def check_sig(self, data:bytes, cc_received:bytes): |
| 231 | """Compute the CC/CR check bytes for the input data and compare against cc_received.""" |
| 232 | cc = self.sign(data) |
| 233 | if cc_received != cc: |
| 234 | raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc))) |
| 235 | |
| 236 | @abc.abstractmethod |
| 237 | def _sign(self, data:bytes) -> bytes: |
| 238 | """Actual implementation, to be implemented by derived class.""" |
| 239 | pass |
| 240 | |
| 241 | @classmethod |
| 242 | def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth': |
| 243 | """Resolve the class for the authentication algorithm of otak and instantiate it.""" |
| 244 | for subc in cls.__subclasses__(): |
| 245 | if subc.enum_name == otak.algo_auth: |
| 246 | return subc(otak) |
| 247 | raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth) |
| 248 | |
| 249 | class OtaAlgoCryptDES(OtaAlgoCrypt): |
| 250 | """DES is insecure. For backwards compatibility with pre-Rel8""" |
| 251 | name = 'DES' |
| 252 | enum_name = 'single_des' |
| 253 | blocksize = 8 |
| 254 | def _encrypt(self, data:bytes) -> bytes: |
| 255 | cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) |
| 256 | return cipher.encrypt(data) |
| 257 | |
| 258 | def _decrypt(self, data:bytes) -> bytes: |
| 259 | cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) |
| 260 | return cipher.decrypt(data) |
| 261 | |
| 262 | class OtaAlgoAuthDES(OtaAlgoAuth): |
| 263 | """DES is insecure. For backwards compatibility with pre-Rel8""" |
| 264 | name = 'DES' |
| 265 | enum_name = 'single_des' |
| 266 | blocksize = 8 |
| 267 | def _sign(self, data:bytes) -> bytes: |
| 268 | cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv) |
| 269 | ciph = cipher.encrypt(data) |
| 270 | return ciph[len(ciph) - 8:] |
| 271 | |
| 272 | class OtaAlgoCryptDES3(OtaAlgoCrypt): |
| 273 | name = '3DES' |
| 274 | enum_name = 'triple_des_cbc2' |
| 275 | blocksize = 8 |
| 276 | def _encrypt(self, data:bytes) -> bytes: |
| 277 | cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) |
| 278 | return cipher.encrypt(data) |
| 279 | |
| 280 | def _decrypt(self, data:bytes) -> bytes: |
| 281 | cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) |
| 282 | return cipher.decrypt(data) |
| 283 | |
| 284 | class OtaAlgoAuthDES3(OtaAlgoAuth): |
| 285 | name = '3DES' |
| 286 | enum_name = 'triple_des_cbc2' |
| 287 | blocksize = 8 |
| 288 | def _sign(self, data:bytes) -> bytes: |
| 289 | cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv) |
| 290 | ciph = cipher.encrypt(data) |
| 291 | return ciph[len(ciph) - 8:] |
| 292 | |
| 293 | class OtaAlgoCryptAES(OtaAlgoCrypt): |
| 294 | name = 'AES' |
| 295 | enum_name = 'aes_cbc' |
| 296 | blocksize = 16 # TODO: is this needed? |
| 297 | def _encrypt(self, data:bytes) -> bytes: |
| 298 | cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) |
| 299 | return cipher.encrypt(data) |
| 300 | |
| 301 | def _decrypt(self, data:bytes) -> bytes: |
| 302 | cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) |
| 303 | return cipher.decrypt(data) |
| 304 | |
| 305 | class OtaAlgoAuthAES(OtaAlgoAuth): |
| 306 | name = 'AES' |
| 307 | enum_name = 'aes_cmac' |
Harald Welte | 219a5f3 | 2023-05-31 18:07:48 +0200 | [diff] [blame] | 308 | blocksize = 1 # AES CMAC doesn't need any padding by us |
Harald Welte | 75a58d1 | 2022-07-31 15:51:19 +0200 | [diff] [blame] | 309 | def _sign(self, data:bytes) -> bytes: |
| 310 | cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8) |
| 311 | cmac.update(data) |
| 312 | ciph = cmac.digest() |
| 313 | return ciph[len(ciph) - 8:] |
| 314 | |
| 315 | |
| 316 | |
| 317 | class OtaDialectSms(OtaDialect): |
| 318 | """OTA dialect for SMS based transport, as described in 3GPP TS 31.115.""" |
| 319 | SmsResponsePacket = Struct('rpl'/Int16ub, |
| 320 | 'rhl'/Int8ub, |
| 321 | 'tar'/Bytes(3), |
| 322 | 'cntr'/Bytes(5), |
| 323 | 'pcntr'/Int8ub, |
| 324 | 'response_status'/ResponseStatus, |
| 325 | 'cc_rc'/Bytes(this.rhl-10), |
| 326 | 'secured_data'/GreedyBytes) |
| 327 | |
| 328 | def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes: |
| 329 | # length of signature in octets |
| 330 | len_sig = self._compute_sig_len(spi) |
| 331 | pad_cnt = 0 |
| 332 | if spi['ciphering']: # ciphering is requested |
| 333 | # append padding bytes to end up with blocksize |
| 334 | len_cipher = 6 + len_sig + len(apdu) |
| 335 | padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize) |
| 336 | pad_cnt = len(padding) |
| 337 | apdu += padding |
| 338 | |
| 339 | kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt} |
| 340 | kid = {'key': otak.kid_idx, 'algo': otak.algo_auth} |
| 341 | |
| 342 | # CHL = number of octets from (and including) SPI to the end of RC/CC/DS |
| 343 | # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1) |
| 344 | chl = 13 + len_sig |
| 345 | |
| 346 | # CHL + SPI (+ KIC + KID) |
| 347 | c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3)) |
| 348 | part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar}) |
| 349 | #print("part_head: %s" % b2h(part_head)) |
| 350 | |
| 351 | # CNTR + PCNTR (CNTR not used) |
| 352 | part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big') |
| 353 | #print("part_cnt: %s" % b2h(part_cnt)) |
| 354 | |
| 355 | envelope_data = part_head + part_cnt + apdu |
| 356 | #print("envelope_data: %s" % b2h(envelope_data)) |
| 357 | |
| 358 | # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering |
| 359 | # CPL from and including CPI to end of secured data, including any padding for ciphering |
| 360 | cpl = len(envelope_data) + len_sig |
| 361 | envelope_data = cpl.to_bytes(2, 'big') + envelope_data |
| 362 | #print("envelope_data with cpl: %s" % b2h(envelope_data)) |
| 363 | |
| 364 | if spi['rc_cc_ds'] == 'cc': |
| 365 | cc = otak.auth.sign(envelope_data) |
| 366 | envelope_data = part_cnt + cc + apdu |
| 367 | elif spi['rc_cc_ds'] == 'rc': |
| 368 | # CRC32 |
| 369 | crc32 = zlib.crc32(envelope_data) & 0xffffffff |
| 370 | envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu |
| 371 | elif spi['rc_cc_ds'] == 'no_rc_cc_ds': |
| 372 | envelope_data = part_cnt + apdu |
| 373 | else: |
| 374 | raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) |
| 375 | |
| 376 | #print("envelope_data with sig: %s" % b2h(envelope_data)) |
| 377 | |
| 378 | # encrypt as needed |
| 379 | if spi['ciphering']: # ciphering is requested |
| 380 | ciph = otak.crypt.encrypt(envelope_data) |
| 381 | envelope_data = part_head + ciph |
| 382 | # prefix with another CPL |
| 383 | cpl = len(envelope_data) |
| 384 | envelope_data = cpl.to_bytes(2, 'big') + envelope_data |
| 385 | else: |
| 386 | envelope_data = part_head + envelope_data |
| 387 | |
| 388 | #print("envelope_data: %s" % b2h(envelope_data)) |
| 389 | |
| 390 | return envelope_data |
| 391 | |
| 392 | def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]): |
| 393 | if isinstance(data, str): |
| 394 | data = h2b(data) |
| 395 | # plain-text POR: 027100000e0ab000110000000000000001612f |
| 396 | # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS |
| 397 | # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f |
| 398 | # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f |
| 399 | # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c |
| 400 | if data[0] != 0x02: |
| 401 | raise ValueError('Unexpected UDL=0x%02x' % data[0]) |
| 402 | udhd, remainder = UserDataHeader.fromBytes(data) |
| 403 | if not udhd.has_ie(0x71): |
| 404 | raise ValueError('RPI 0x71 not found in UDH') |
| 405 | rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered |
| 406 | res = self.SmsResponsePacket.parse(remainder) |
| 407 | |
| 408 | if spi['por_shall_be_ciphered']: |
| 409 | # decrypt |
| 410 | ciphered_part = remainder[6:] |
| 411 | deciph = otak.crypt.decrypt(ciphered_part) |
| 412 | temp_data = rph_rhl_tar + deciph |
| 413 | res = self.SmsResponsePacket.parse(temp_data) |
| 414 | # remove specified number of padding bytes, if any |
| 415 | if res['pcntr'] != 0: |
| 416 | # this conditional is needed as python [:-0] renders an empty return! |
| 417 | res['secured_data'] = res['secured_data'][:-res['pcntr']] |
| 418 | remainder = temp_data |
| 419 | |
| 420 | # is there a CC/RC present? |
| 421 | len_sig = res['rhl'] - 10 |
| 422 | if spi['por_rc_cc_ds'] == 'no_rc_cc_ds': |
| 423 | if len_sig: |
| 424 | raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig) |
| 425 | elif spi['por_rc_cc_ds'] == 'cc': |
| 426 | # verify signature |
| 427 | # UDH is part of CC/RC! |
| 428 | udh = data[:3] |
| 429 | # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC |
| 430 | rpl_rhl_tar_cntr_pcntr_sts = remainder[:13] |
| 431 | # remove the CC/RC bytes |
| 432 | temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:] |
| 433 | otak.auth.check_sig(temp_data, res['cc_rc']) |
| 434 | # TODO: CRC |
| 435 | else: |
| 436 | raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds']) |
| 437 | |
| 438 | # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2 |
| 439 | if res.response_status == 'por_ok': |
| 440 | dec = CompactRemoteResp.parse(res['secured_data']) |
| 441 | else: |
| 442 | dec = None |
| 443 | return (res, dec) |