| """Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115.""" |
| |
| # (C) 2021-2023 by Harald Welte <laforge@osmocom.org> |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation, either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| from pySim.construct import * |
| from pySim.utils import b2h |
| from pySim.sms import UserDataHeader |
| from construct import * |
| import zlib |
| import abc |
| import struct |
| from typing import Optional |
| |
| # ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS |
| # 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP |
| |
| # CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data |
| |
| # CAT_TP TCP/IP SMS |
| # CPI 0x01 0x01 =IEIa=70,len=0 |
| # CHI NULL NULL NULL |
| # CPI, CPL and CHL included in RC/CC/DS true true |
| # RPI 0x02 0x02 =IEIa=71,len=0 |
| # RHI NULL NULL |
| # RPI, RPL and RHL included in RC/CC/DS true true |
| # packet-id 0-bf,ff 0-bf,ff |
| # identification packet false 102 225 tbl 6 |
| |
| # KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK |
| |
| # ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7 |
| ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3, |
| cntr_blocked=4, ciphering_error=5, undefined_security_error=6, |
| insufficient_memory=7, more_time_needed=8, tar_unknown=9, |
| insufficient_security_level=0x0A, |
| actual_response_sms_submit=0x0B, |
| actual_response_ussd=0x0C) |
| |
| # ETSI TS 102 226 Section 5.1.2 |
| CompactRemoteResp = Struct('number_of_commands'/Int8ub, |
| 'last_status_word'/HexAdapter(Bytes(2)), |
| 'last_response_data'/HexAdapter(GreedyBytes)) |
| |
| RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3) |
| |
| # TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2 |
| SPI = BitStruct( # first octet |
| Padding(3), |
| 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1, |
| counter_must_be_higher=2, counter_must_be_lower=3), |
| 'ciphering'/Flag, |
| 'rc_cc_ds'/RC_CC_DS, |
| # second octet |
| Padding(2), |
| 'por_in_submit'/Flag, |
| 'por_shall_be_ciphered'/Flag, |
| 'por_rc_cc_ds'/RC_CC_DS, |
| 'por'/Enum(BitsInteger(2), no_por=0, |
| por_required=1, por_only_when_error=2) |
| ) |
| |
| # TS 102 225 Section 5.1.2 |
| KIC = BitStruct('key'/BitsInteger(4), |
| 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, |
| aes_cbc=2) |
| ) |
| |
| # TS 102 225 Section 5.1.3.1 |
| KID_CC = BitStruct('key'/BitsInteger(4), |
| 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9, |
| aes_cmac=2) |
| ) |
| |
| # TS 102 225 Section 5.1.3.2 |
| KID_RC = BitStruct('key'/BitsInteger(4), |
| 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3) |
| ) |
| |
| SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub, |
| 'cmd_hdr_len'/Int8ub, |
| 'spi'/SPI, |
| 'kic'/KIC, |
| 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }), |
| 'tar'/Bytes(3), |
| 'secured_data'/GreedyBytes) |
| |
| class OtaKeyset: |
| """The OTA related data (key material, counter) to be used in encrypt/decrypt.""" |
| def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes, |
| algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0): |
| self.algo_crypt = algo_crypt |
| self.kic = bytes(kic) |
| self.kic_idx = kic_idx |
| self.algo_auth = algo_auth |
| self.kid = bytes(kid) |
| self.kid_idx = kid_idx |
| self.cntr = cntr |
| |
| @property |
| def auth(self): |
| """Return an instance of the matching OtaAlgoAuth.""" |
| return OtaAlgoAuth.fromKeyset(self) |
| |
| @property |
| def crypt(self): |
| """Return an instance of the matching OtaAlgoCrypt.""" |
| return OtaAlgoCrypt.fromKeyset(self) |
| |
| class OtaCheckError(Exception): |
| pass |
| |
| class OtaDialect(abc.ABC): |
| """Base Class for OTA dialects such as SMS, BIP, ...""" |
| |
| def _compute_sig_len(self, spi:SPI): |
| if spi['rc_cc_ds'] == 'no_rc_cc_ds': |
| return 0 |
| elif spi['rc_cc_ds'] == 'rc': # CRC-32 |
| return 4 |
| elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC) |
| # TODO: this is not entirely correct, as in AES case it could be 4 or 8 |
| return 8 |
| else: |
| raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) |
| |
| @abc.abstractmethod |
| def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes: |
| pass |
| |
| @abc.abstractmethod |
| def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> (object, Optional["CompactRemoteResp"]): |
| """Decode a response into a response packet and, if indicted (by a |
| response status of `"por_ok"`) a decoded response. |
| |
| The response packet's common characteristics are not fully determined, |
| and (so far) completely proprietary per dialect.""" |
| pass |
| |
| |
| from Cryptodome.Cipher import DES, DES3, AES |
| from Cryptodome.Hash import CMAC |
| |
| class OtaAlgo(abc.ABC): |
| iv = property(lambda self: bytes([0] * self.blocksize)) |
| blocksize = None |
| enum_name = None |
| |
| @staticmethod |
| def _get_padding(in_len: int, multiple: int, padding: int = 0): |
| """Return padding bytes towards multiple of N.""" |
| if in_len % multiple == 0: |
| return b'' |
| pad_cnt = multiple - (in_len % multiple) |
| return b'\x00' * pad_cnt |
| |
| @staticmethod |
| def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0): |
| """Pad input bytes to multiple of N.""" |
| return indat + OtaAlgo._get_padding(len(indat), multiple, padding) |
| |
| def pad_to_blocksize(self, indat: bytes, padding: int = 0): |
| """Pad the given input data to multiple of the cipher block size.""" |
| return self._pad_to_multiple(indat, self.blocksize, padding) |
| |
| def __init__(self, otak: OtaKeyset): |
| self.otak = otak |
| |
| def __str__(self): |
| return self.__class__.__name__ |
| |
| class OtaAlgoCrypt(OtaAlgo, abc.ABC): |
| def __init__(self, otak: OtaKeyset): |
| if self.enum_name != otak.algo_crypt: |
| raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) |
| super().__init__(otak) |
| |
| def encrypt(self, data:bytes) -> bytes: |
| """Encrypt given input bytes using the key material given in constructor.""" |
| padded_data = self.pad_to_blocksize(data) |
| return self._encrypt(data) |
| |
| def decrypt(self, data:bytes) -> bytes: |
| """Decrypt given input bytes using the key material given in constructor.""" |
| return self._decrypt(data) |
| |
| @abc.abstractmethod |
| def _encrypt(self, data:bytes) -> bytes: |
| """Actual implementation, to be implemented by derived class.""" |
| pass |
| |
| @abc.abstractmethod |
| def _decrypt(self, data:bytes) -> bytes: |
| """Actual implementation, to be implemented by derived class.""" |
| pass |
| |
| @classmethod |
| def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt': |
| """Resolve the class for the encryption algorithm of otak and instantiate it.""" |
| for subc in cls.__subclasses__(): |
| if subc.enum_name == otak.algo_crypt: |
| return subc(otak) |
| raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth) |
| |
| class OtaAlgoAuth(OtaAlgo, abc.ABC): |
| def __init__(self, otak: OtaKeyset): |
| if self.enum_name != otak.algo_auth: |
| raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt)) |
| super().__init__(otak) |
| |
| def sign(self, data:bytes) -> bytes: |
| """Compute the CC/CR check bytes for the input data using key material |
| given in constructor.""" |
| padded_data = self.pad_to_blocksize(data) |
| sig = self._sign(padded_data) |
| return sig |
| |
| def check_sig(self, data:bytes, cc_received:bytes): |
| """Compute the CC/CR check bytes for the input data and compare against cc_received.""" |
| cc = self.sign(data) |
| if cc_received != cc: |
| raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc))) |
| |
| @abc.abstractmethod |
| def _sign(self, data:bytes) -> bytes: |
| """Actual implementation, to be implemented by derived class.""" |
| pass |
| |
| @classmethod |
| def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth': |
| """Resolve the class for the authentication algorithm of otak and instantiate it.""" |
| for subc in cls.__subclasses__(): |
| if subc.enum_name == otak.algo_auth: |
| return subc(otak) |
| raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth) |
| |
| class OtaAlgoCryptDES(OtaAlgoCrypt): |
| """DES is insecure. For backwards compatibility with pre-Rel8""" |
| name = 'DES' |
| enum_name = 'single_des' |
| blocksize = 8 |
| def _encrypt(self, data:bytes) -> bytes: |
| cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) |
| return cipher.encrypt(data) |
| |
| def _decrypt(self, data:bytes) -> bytes: |
| cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv) |
| return cipher.decrypt(data) |
| |
| class OtaAlgoAuthDES(OtaAlgoAuth): |
| """DES is insecure. For backwards compatibility with pre-Rel8""" |
| name = 'DES' |
| enum_name = 'single_des' |
| blocksize = 8 |
| def _sign(self, data:bytes) -> bytes: |
| cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv) |
| ciph = cipher.encrypt(data) |
| return ciph[len(ciph) - 8:] |
| |
| class OtaAlgoCryptDES3(OtaAlgoCrypt): |
| name = '3DES' |
| enum_name = 'triple_des_cbc2' |
| blocksize = 8 |
| def _encrypt(self, data:bytes) -> bytes: |
| cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) |
| return cipher.encrypt(data) |
| |
| def _decrypt(self, data:bytes) -> bytes: |
| cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv) |
| return cipher.decrypt(data) |
| |
| class OtaAlgoAuthDES3(OtaAlgoAuth): |
| name = '3DES' |
| enum_name = 'triple_des_cbc2' |
| blocksize = 8 |
| def _sign(self, data:bytes) -> bytes: |
| cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv) |
| ciph = cipher.encrypt(data) |
| return ciph[len(ciph) - 8:] |
| |
| class OtaAlgoCryptAES(OtaAlgoCrypt): |
| name = 'AES' |
| enum_name = 'aes_cbc' |
| blocksize = 16 # TODO: is this needed? |
| def _encrypt(self, data:bytes) -> bytes: |
| cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) |
| return cipher.encrypt(data) |
| |
| def _decrypt(self, data:bytes) -> bytes: |
| cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv) |
| return cipher.decrypt(data) |
| |
| class OtaAlgoAuthAES(OtaAlgoAuth): |
| name = 'AES' |
| enum_name = 'aes_cmac' |
| blocksize = 1 # AES CMAC doesn't need any padding by us |
| def _sign(self, data:bytes) -> bytes: |
| cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8) |
| cmac.update(data) |
| ciph = cmac.digest() |
| return ciph[len(ciph) - 8:] |
| |
| |
| |
| class OtaDialectSms(OtaDialect): |
| """OTA dialect for SMS based transport, as described in 3GPP TS 31.115.""" |
| SmsResponsePacket = Struct('rpl'/Int16ub, |
| 'rhl'/Int8ub, |
| 'tar'/Bytes(3), |
| 'cntr'/Bytes(5), |
| 'pcntr'/Int8ub, |
| 'response_status'/ResponseStatus, |
| 'cc_rc'/Bytes(this.rhl-10), |
| 'secured_data'/GreedyBytes) |
| |
| def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes: |
| # length of signature in octets |
| len_sig = self._compute_sig_len(spi) |
| pad_cnt = 0 |
| if spi['ciphering']: # ciphering is requested |
| # append padding bytes to end up with blocksize |
| len_cipher = 6 + len_sig + len(apdu) |
| padding = otak.crypt._get_padding(len_cipher, otak.crypt.blocksize) |
| pad_cnt = len(padding) |
| apdu += padding |
| |
| kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt} |
| kid = {'key': otak.kid_idx, 'algo': otak.algo_auth} |
| |
| # CHL = number of octets from (and including) SPI to the end of RC/CC/DS |
| # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1) |
| chl = 13 + len_sig |
| |
| # CHL + SPI (+ KIC + KID) |
| c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3)) |
| part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar}) |
| #print("part_head: %s" % b2h(part_head)) |
| |
| # CNTR + PCNTR (CNTR not used) |
| part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big') |
| #print("part_cnt: %s" % b2h(part_cnt)) |
| |
| envelope_data = part_head + part_cnt + apdu |
| #print("envelope_data: %s" % b2h(envelope_data)) |
| |
| # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering |
| # CPL from and including CPI to end of secured data, including any padding for ciphering |
| cpl = len(envelope_data) + len_sig |
| envelope_data = cpl.to_bytes(2, 'big') + envelope_data |
| #print("envelope_data with cpl: %s" % b2h(envelope_data)) |
| |
| if spi['rc_cc_ds'] == 'cc': |
| cc = otak.auth.sign(envelope_data) |
| envelope_data = part_cnt + cc + apdu |
| elif spi['rc_cc_ds'] == 'rc': |
| # CRC32 |
| crc32 = zlib.crc32(envelope_data) & 0xffffffff |
| envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu |
| elif spi['rc_cc_ds'] == 'no_rc_cc_ds': |
| envelope_data = part_cnt + apdu |
| else: |
| raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds']) |
| |
| #print("envelope_data with sig: %s" % b2h(envelope_data)) |
| |
| # encrypt as needed |
| if spi['ciphering']: # ciphering is requested |
| ciph = otak.crypt.encrypt(envelope_data) |
| envelope_data = part_head + ciph |
| # prefix with another CPL |
| cpl = len(envelope_data) |
| envelope_data = cpl.to_bytes(2, 'big') + envelope_data |
| else: |
| envelope_data = part_head + envelope_data |
| |
| #print("envelope_data: %s" % b2h(envelope_data)) |
| |
| return envelope_data |
| |
| def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> ("OtaDialectSms.SmsResponsePacket", Optional["CompactRemoteResp"]): |
| if isinstance(data, str): |
| data = h2b(data) |
| # plain-text POR: 027100000e0ab000110000000000000001612f |
| # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS |
| # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f |
| # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f |
| # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c |
| if data[0] != 0x02: |
| raise ValueError('Unexpected UDL=0x%02x' % data[0]) |
| udhd, remainder = UserDataHeader.fromBytes(data) |
| if not udhd.has_ie(0x71): |
| raise ValueError('RPI 0x71 not found in UDH') |
| rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered |
| res = self.SmsResponsePacket.parse(remainder) |
| |
| if spi['por_shall_be_ciphered']: |
| # decrypt |
| ciphered_part = remainder[6:] |
| deciph = otak.crypt.decrypt(ciphered_part) |
| temp_data = rph_rhl_tar + deciph |
| res = self.SmsResponsePacket.parse(temp_data) |
| # remove specified number of padding bytes, if any |
| if res['pcntr'] != 0: |
| # this conditional is needed as python [:-0] renders an empty return! |
| res['secured_data'] = res['secured_data'][:-res['pcntr']] |
| remainder = temp_data |
| |
| # is there a CC/RC present? |
| len_sig = res['rhl'] - 10 |
| if spi['por_rc_cc_ds'] == 'no_rc_cc_ds': |
| if len_sig: |
| raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig) |
| elif spi['por_rc_cc_ds'] == 'cc': |
| # verify signature |
| # UDH is part of CC/RC! |
| udh = data[:3] |
| # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC |
| rpl_rhl_tar_cntr_pcntr_sts = remainder[:13] |
| # remove the CC/RC bytes |
| temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:] |
| otak.auth.check_sig(temp_data, res['cc_rc']) |
| # TODO: CRC |
| else: |
| raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds']) |
| |
| # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2 |
| if res.response_status == 'por_ok': |
| dec = CompactRemoteResp.parse(res['secured_data']) |
| else: |
| dec = None |
| return (res, dec) |