| # -*- coding: utf-8 -*- |
| |
| """ pySim: Bluetooth rSAP transport link |
| """ |
| |
| # |
| # Copyright (C) 2021 Gabriel K. Gegenhuber <ggegenhuber@sba-research.org> |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation, either version 2 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| # |
| |
| import time |
| import struct |
| import logging |
| import bluetooth |
| |
| from pySim.exceptions import ReaderError, NoCardError, ProtocolError |
| from pySim.transport import LinkBase |
| from pySim.utils import b2h, h2b, rpad |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| # thx to osmocom/softsim |
| # SAP table 5.16 |
| SAP_CONNECTION_STATUS = { |
| 0x00: "OK, Server can fulfill requirements", |
| 0x01: "Error, Server unable to establish connection", |
| 0x02: "Error, Server does not support maximum message size", |
| 0x03: "Error, maximum message size by Client is too small", |
| 0x04: "OK, ongoing call" |
| } |
| |
| # SAP table 5.18 |
| SAP_RESULT_CODE = { |
| 0x00: "OK, request processed correctly", |
| 0x01: "Error, no reason defined", |
| 0x02: "Error, card not accessible", |
| 0x03: "Error, card (already) powered off", |
| 0x04: "Error, card removed", |
| 0x05: "Error, card already powered on", |
| 0x06: "Error, data not available", |
| 0x07: "Error, not supported" |
| } |
| |
| # SAP table 5.19 |
| SAP_STATUS_CHANGE = { |
| 0x00: "Unknown Error", |
| 0x01: "Card reset", |
| 0x02: "Card not accessible", |
| 0x03: "Card removed", |
| 0x04: "Card inserted", |
| 0x05: "Card recovered" |
| } |
| |
| # SAP table 5.15 |
| SAP_PARAMETERS = [ |
| { |
| 'name': "MaxMsgSize", |
| 'length': 2, |
| 'id': 0x00 |
| }, |
| { |
| 'name': "ConnectionStatus", |
| 'length': 1, |
| 'id': 0x01 |
| }, |
| { |
| 'name': "ResultCode", |
| 'length': 1, |
| 'id': 0x02 |
| }, |
| { |
| 'name': "DisconnectionType", |
| 'length': 1, |
| 'id': 0x03 |
| }, |
| { |
| 'name': "CommandAPDU", |
| 'length': None, |
| 'id': 0x04 |
| }, |
| { |
| 'name': "ResponseAPDU", |
| 'length': None, |
| 'id': 0x05 |
| }, |
| { |
| 'name': "ATR", |
| 'length': None, |
| 'id': 0x06 |
| }, |
| { |
| 'name': "CardReaderdStatus", |
| 'length': 1, |
| 'id': 0x07 |
| }, |
| { |
| 'name': "StatusChange", |
| 'length': 1, |
| 'id': 0x08 |
| }, |
| { |
| 'name': "TransportProtocol", |
| 'length': 1, |
| 'id': 0x09 |
| }, |
| { |
| 'name': "CommandAPDU7816", |
| 'length': 2, |
| 'id': 0x10 |
| } |
| ] |
| |
| # SAP table 5.1 |
| SAP_MESSAGES = [ |
| { |
| 'name': 'CONNECT_REQ', |
| 'client_to_server': True, |
| 'id': 0x00, |
| 'parameters': [(0x00, True)] |
| }, |
| { |
| 'name': 'CONNECT_RESP', |
| 'client_to_server': False, |
| 'id': 0x01, |
| 'parameters': [(0x01, True), (0x00, False)] |
| }, |
| { |
| 'name': 'DISCONNECT_REQ', |
| 'client_to_server': True, |
| 'id': 0x02, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'DISCONNECT_RESP', |
| 'client_to_server': False, |
| 'id': 0x03, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'DISCONNECT_IND', |
| 'client_to_server': False, |
| 'id': 0x04, |
| 'parameters': [(0x03, True)] |
| }, |
| { |
| 'name': 'TRANSFER_APDU_REQ', |
| 'client_to_server': True, |
| 'id': 0x05, |
| 'parameters': [(0x04, False), (0x10, False)] |
| }, |
| { |
| 'name': 'TRANSFER_APDU_RESP', |
| 'client_to_server': False, |
| 'id': 0x06, |
| 'parameters': [(0x02, True), (0x05, False)] |
| }, |
| { |
| 'name': 'TRANSFER_ATR_REQ', |
| 'client_to_server': True, |
| 'id': 0x07, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'TRANSFER_ATR_RESP', |
| 'client_to_server': False, |
| 'id': 0x08, |
| 'parameters': [(0x02, True), (0x06, False)] |
| }, |
| { |
| 'name': 'POWER_SIM_OFF_REQ', |
| 'client_to_server': True, |
| 'id': 0x09, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'POWER_SIM_OFF_RESP', |
| 'client_to_server': False, |
| 'id': 0x0A, |
| 'parameters': [(0x02, True)] |
| }, |
| { |
| 'name': 'POWER_SIM_ON_REQ', |
| 'client_to_server': True, |
| 'id': 0x0B, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'POWER_SIM_ON_RESP', |
| 'client_to_server': False, |
| 'id': 0x0C, |
| 'parameters': [(0x02, True)] |
| }, |
| { |
| 'name': 'RESET_SIM_REQ', |
| 'client_to_server': True, |
| 'id': 0x0D, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'RESET_SIM_RESP', |
| 'client_to_server': False, |
| 'id': 0x0E, |
| 'parameters': [(0x02, True)] |
| }, |
| { |
| 'name': 'TRANSFER_CARD_READER_STATUS_REQ', |
| 'client_to_server': True, |
| 'id': 0x0F, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'TRANSFER_CARD_READER_STATUS_RESP', |
| 'client_to_server': False, |
| 'id': 0x10, |
| 'parameters': [(0x02, True), (0x07, False)] |
| }, |
| { |
| 'name': 'STATUS_IND', |
| 'client_to_server': False, |
| 'id': 0x11, |
| 'parameters': [(0x08, True)] |
| }, |
| |
| { |
| 'name': 'ERROR_RESP', |
| 'client_to_server': False, |
| 'id': 0x12, |
| 'parameters': [] |
| }, |
| { |
| 'name': 'SET_TRANSPORT_PROTOCOL_REQ', |
| 'client_to_server': True, |
| 'id': 0x13, |
| 'parameters': [(0x09, True)] |
| }, |
| { |
| 'name': 'SET_TRANSPORT_PROTOCOL_RESP', |
| 'client_to_server': False, |
| 'id': 0x14, |
| 'parameters': [(0x02, True)] |
| }, |
| |
| ] |
| |
| |
| class BluetoothSapSimLink(LinkBase): |
| # UUID for SIM Access Service |
| UUID_SIM_ACCESS = '0000112d-0000-1000-8000-00805f9b34fb' |
| SAP_MAX_MSG_SIZE = 0xffff |
| |
| def __init__(self, bt_mac_addr, **kwargs): |
| super().__init__(**kwargs) |
| self._bt_mac_addr = bt_mac_addr |
| self._max_msg_size = self.SAP_MAX_MSG_SIZE |
| self._atr = None |
| self.connected = False |
| # at first try to find the bluetooth device |
| if not bluetooth.find_service(address=bt_mac_addr): |
| raise ReaderError(f"Cannot find bluetooth device [{bt_mac_addr}]") |
| # then check for rSAP support |
| self._sim_service = next(iter(bluetooth.find_service( |
| uuid=self.UUID_SIM_ACCESS, address=bt_mac_addr)), None) |
| if not self._sim_service: |
| raise ReaderError( |
| f"Bluetooth device [{bt_mac_addr}] does not support SIM Access service") |
| |
| def __del__(self): |
| # TODO: do something here |
| pass |
| |
| def wait_for_card(self, timeout=None, newcardonly=False): |
| self.connect() |
| |
| def connect(self): |
| try: |
| self._sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) |
| self._sock.connect( |
| (self._sim_service['host'], self._sim_service['port'])) |
| self.connected = True |
| self.establish_sim_connection() |
| self.retrieve_atr() |
| except: |
| raise ReaderError("Cannot connect to SIM Access service") |
| |
| def get_atr(self): |
| return self._atr |
| |
| def disconnect(self): |
| if self.connected: |
| self.send_sap_message("DISCONNECT_REQ") |
| self._sock.close() |
| self.connected = False |
| |
| def reset_card(self): |
| if self.connected: |
| self.send_sap_message("RESET_SIM_REQ") |
| msg_name, param_list = self._recv_sap_response('RESET_SIM_RESP') |
| connection_status = next( |
| (x[1] for x in param_list if x[0] == 'ConnectionStatus'), 0x01) |
| if connection_status == 0x00: |
| logger.info("SIM Reset successful") |
| return 1 |
| else: |
| self.disconnect() |
| self.connect() |
| return 1 |
| |
| def send_sap_message(self, msg_name, param_list=[]): |
| # maby check for idle state before sending? |
| message = self.craft_sap_message(msg_name, param_list) |
| return self._sock.send(message) |
| |
| def _recv_sap_message(self): |
| resp = self._sock.recv(self._max_msg_size) |
| msg_name, param_list = self.parse_sap_message(resp) |
| return msg_name, param_list |
| |
| def _recv_sap_response(self, waiting_msg_name): |
| while self.connected: |
| msg_name, param_list = self._recv_sap_message() |
| self.handle_sap_response_generic(msg_name, param_list) |
| if msg_name == waiting_msg_name: |
| return msg_name, param_list |
| |
| def establish_sim_connection(self, retries=5): |
| self.send_sap_message( |
| "CONNECT_REQ", [("MaxMsgSize", self._max_msg_size)]) |
| msg_name, param_list = self._recv_sap_response('CONNECT_RESP') |
| |
| connection_status = next( |
| (x[1] for x in param_list if x[0] == 'ConnectionStatus'), 0x01) |
| if connection_status == 0x00: |
| logger.info("Successfully connected to rSAP server") |
| return |
| elif connection_status == 0x02: # invalid max size |
| self._max_msg_size = next( |
| (x[1] for x in param_list if x[0] == 'MaxMsgSize'), self._max_msg_size) |
| return self.establish_sim_connection(retries) |
| else: |
| logger.info( |
| "Wait some seconds and make another connection attempt...") |
| time.sleep(5) |
| return self.establish_sim_connection(retries-1) |
| |
| def retrieve_atr(self): |
| self.send_sap_message("TRANSFER_ATR_REQ") |
| msg_name, param_list = self._recv_sap_response('TRANSFER_ATR_RESP') |
| result_code = next( |
| (x[1] for x in param_list if x[0] == 'ResultCode'), 0x01) |
| if result_code == 0x00: |
| atr = next((x[1] for x in param_list if x[0] == 'ATR'), None) |
| self._atr = atr |
| logger.debug(f"Recieved ATR from server: {b2h(atr)}") |
| |
| def handle_sap_response_generic(self, msg_name, param_list): |
| # print stuff |
| logger.debug( |
| f"Recieved sap message from server: {(msg_name, param_list)}") |
| for param in param_list: |
| param_name, param_value = param |
| if param_name == 'ConnectionStatus': |
| new_status = SAP_CONNECTION_STATUS.get(param_value) |
| logger.debug(f"Connection Status: {new_status}") |
| elif param_name == 'StatusChange': |
| new_status = SAP_STATUS_CHANGE.get(param_value) |
| logger.debug(f"SIM Status: {new_status}") |
| elif param_name == 'ResultCode': |
| response_code = SAP_RESULT_CODE.get(param_value) |
| logger.debug(f"ResultCode: {response_code}") |
| |
| # handle some important stuff: |
| if msg_name == 'DISCONNECT_IND': |
| # graceful disconnect --> technically could still send some apdus |
| # however, we just make it short and sweet and directly disconnect |
| self.send_sap_message("DISCONNECT_REQ") |
| elif msg_name == 'DISCONNECT_RESP': |
| self.connected = False |
| logger.info(f"Client disconnected") |
| |
| # if msg_name == 'CONNECT_RESP': |
| # elif msg_name == 'DISCONNECT_RESP': |
| # elif msg_name == 'DISCONNECT_IND': |
| # elif msg_name == 'TRANSFER_APDU_RESP': |
| # elif msg_name == 'TRANSFER_ATR_RESP': |
| # elif msg_name == 'POWER_SIM_OFF_RESP': |
| # elif msg_name == 'POWER_SIM_ON_RESP': |
| # elif msg_name == 'RESET_SIM_RESP': |
| # elif msg_name == 'TRANSFER_CARD_READER_STATUS_RESP': |
| # elif msg_name == 'STATUS_IND': |
| # elif msg_name == 'ERROR_RESP': |
| # elif msg_name == 'SET_TRANSPORT_PROTOCOL_RESP': |
| # else: |
| # logger.error("Unknown message...") |
| |
| def craft_sap_message(self, msg_name, param_list=[]): |
| msg_info = next( |
| (x for x in SAP_MESSAGES if x.get('name') == msg_name), None) |
| if not msg_info: |
| raise ProtocolError(f"Unknown SAP message name ({msg_name})") |
| |
| msg_id = msg_info.get('id') |
| msg_params = msg_info.get('parameters') |
| # msg_direction = msg_info.get('client_to_server') |
| |
| param_cnt = len(param_list) |
| |
| msg_bytes = struct.pack( |
| '!BBH', |
| msg_id, |
| param_cnt, |
| 0 |
| ) |
| |
| allowed_params = (x[0] for x in msg_params) |
| mandatory_params = (x[0] for x in msg_params if x[1] == True) |
| |
| collected_param_ids = [] |
| |
| for p in param_list: |
| param_name = p[0] |
| param_value = p[1] |
| |
| param_id = next( |
| (x.get('id') for x in SAP_PARAMETERS if x.get('name') == param_name), None) |
| if param_id is None: |
| raise ProtocolError(f"Unknown SAP param name ({param_name})") |
| if param_id not in allowed_params: |
| raise ProtocolError( |
| f"Parameter {param_name} not allowed in message {msg_name}") |
| |
| collected_param_ids.append(param_id) |
| msg_bytes += self.craft_sap_parameter(param_name, param_value) |
| |
| if not set(mandatory_params).issubset(collected_param_ids): |
| raise ProtocolError( |
| f"Missing mandatory parameter for message {msg_name} (mandatory: {*mandatory_params,}, present: {*collected_param_ids,})") |
| |
| return msg_bytes |
| |
| def calc_padding_len(self, length, blocksize=4): |
| extra = length % blocksize |
| if extra > 0: |
| return blocksize-extra |
| return 0 |
| |
| def pad_bytes(self, b, blocksize=4): |
| padding_len = self.calc_padding_len(len(b), blocksize) |
| return b + bytearray(padding_len) |
| |
| def craft_sap_parameter(self, param_name, param_value): |
| param_info = next( |
| (x for x in SAP_PARAMETERS if x.get('name') == param_name), None) |
| param_id = param_info.get('id') |
| param_len = param_info.get('length') |
| |
| if isinstance(param_value, str): |
| param_value = h2b(param_value) |
| |
| if isinstance(param_value, int): |
| # TODO: when param len is not set we have a problem :X |
| param_value = (param_value).to_bytes(param_len, byteorder='big') |
| |
| if param_len is None: |
| # just assume param length from bytearray |
| param_len = len(param_value) |
| elif param_len != len(param_value): |
| raise ProtocolError( |
| f"Invalid param length (epected {param_len} but got {len(param_value)} bytes)") |
| |
| param_bytes = struct.pack( |
| f'!BBH{param_len}s', |
| param_id, |
| 0, # reserved |
| param_len, |
| param_value |
| ) |
| param_bytes = self.pad_bytes(param_bytes) |
| return param_bytes |
| |
| def parse_sap_message(self, msg_bytes): |
| header_struct = struct.Struct('!BBH') |
| msg_id, param_cnt, reserved = header_struct.unpack_from(msg_bytes) |
| msg_bytes = msg_bytes[header_struct.size:] |
| |
| msg_info = next( |
| (x for x in SAP_MESSAGES if x.get('id') == msg_id), None) |
| |
| msg_name = msg_info.get('name') |
| msg_params = msg_info.get('parameters') |
| # msg_direction = msg_info.get('client_to_server') |
| |
| # TODO: check if params allowed etc |
| # allowed_params = (x[0] for x in msg_params) |
| # mandatory_params = (x[0] for x in msg_params if x[1] == True) |
| |
| param_list = [] |
| |
| for x in range(param_cnt): |
| param_name, param_value, total_len = self.parse_sap_parameter( |
| msg_bytes) |
| param_list.append((param_name, param_value)) |
| msg_bytes = msg_bytes[total_len:] |
| |
| return msg_name, param_list |
| |
| def parse_sap_parameter(self, param_bytes): |
| header_struct = struct.Struct('!BBH') |
| total_len = header_struct.size |
| param_id, reserved, param_len = header_struct.unpack_from(param_bytes) |
| padding_len = self.calc_padding_len(param_len) |
| paramval_struct = struct.Struct(f'!{param_len}s{padding_len}s') |
| param_value, padding = paramval_struct.unpack_from( |
| param_bytes[total_len:]) |
| total_len += paramval_struct.size |
| |
| param_info = next( |
| (x for x in SAP_PARAMETERS if x.get('id') == param_id), None) |
| # TODO: check if param found, length plausible, ... |
| param_name = param_info.get('name') |
| |
| # if it is set then value was int, otherwise it is byte array |
| if param_info.get('length') is not None: |
| param_value = int.from_bytes(param_value, "big") |
| # param_len = param_info.get('length') |
| return param_name, param_value, total_len |
| |
| def _send_apdu_raw(self, pdu): |
| if isinstance(pdu, str): |
| pdu = h2b(pdu) |
| self.send_sap_message("TRANSFER_APDU_REQ", [("CommandAPDU", pdu)]) |
| |
| msg_name, param_list = self._recv_sap_response('TRANSFER_APDU_RESP') |
| result_code = next( |
| (x[1] for x in param_list if x[0] == 'ResultCode'), 0x01) |
| if result_code == 0x00: |
| response = next( |
| (x[1] for x in param_list if x[0] == 'ResponseAPDU'), None) |
| sw = response[-2:] |
| data = response[0:-2] |
| return b2h(data), b2h(sw) |
| return None, None |