| #!/usr/bin/env python3 |
| # |
| # This program receive APDUs via the VPCD protocol of Frank Morgner's |
| # virtualsmartcard, encrypts them with OTA (over the air) keys and |
| # forwards them via SMPP to a SMSC (SMS service centre). |
| # |
| # In other words, you can use it as a poor man's OTA server, to enable |
| # you to use unmodified application software with PC/SC support to talk |
| # securely via OTA with a remote SMS card. |
| # |
| # This is very much a work in progress at this point. |
| |
| ####################################################################### |
| # twisted VPCD Library |
| ####################################################################### |
| |
| import logging |
| import struct |
| import abc |
| from typing import Union, Optional |
| from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild |
| from twisted.internet.protocol import Protocol, ReconnectingClientFactory |
| from pySim.utils import b2h, h2b |
| |
| logger = logging.getLogger(__name__) |
| |
| class VirtualCard(abc.ABC): |
| """Abstract base class for a virtual smart card.""" |
| def __init__(self, atr: Union[str, bytes]): |
| if isinstance(atr, str): |
| atr = h2b(atr) |
| self.atr = atr |
| |
| @abc.abstractmethod |
| def power_change(self, new_state: bool): |
| """Power the card on or off.""" |
| pass |
| |
| @abc.abstractmethod |
| def reset(self): |
| """Reset the card.""" |
| pass |
| |
| @abc.abstractmethod |
| def rx_c_apdu(self, apdu: bytes): |
| """Receive a C-APDU from the reader/application.""" |
| pass |
| |
| def tx_r_apdu(self, apdu: Union[str, bytes]): |
| if isinstance(apdu, str): |
| apdu = h2b(apdu) |
| logger.info("R-APDU: %s" % b2h(apdu)) |
| self.protocol.send_data(apdu) |
| |
| class VpcdProtocolBase(Protocol): |
| # Prefixed couldn't be used as the this.length wouldn't be available in this case |
| construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)), |
| 'data'/If(this.length > 1, Bytes(this.length)), |
| 'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4))) |
| def __init__(self, vcard: VirtualCard): |
| self.recvBuffer = b'' |
| self.connectionCorrupted = False |
| self.pduReadTimer = None |
| self.pduReadTimerSecs = 10 |
| self.callLater = reactor.callLater |
| self.on = False |
| self.vcard = vcard |
| self.vcard.protocol = self |
| |
| def dataReceived(self, data: bytes): |
| """entry point where twisted tells us data was received.""" |
| #logger.debug('Data received: %s' % b2h(data)) |
| self.recvBuffer = self.recvBuffer + data |
| while True: |
| if self.connectionCorrupted: |
| return |
| msg = self.readMessage() |
| if msg is None: |
| break |
| self.endPDURead() |
| self.rawMessageReceived(msg) |
| |
| if len(self.recvBuffer) > 0: |
| self.incompletePDURead() |
| |
| def incompletePDURead(self): |
| """We have an incomplete PDU in readBuffer, schedule pduReadTimer""" |
| if self.pduReadTimer and self.pduReadTimer.active(): |
| return |
| self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout) |
| |
| def endPDURead(self): |
| """We completed reading a PDU, cancel the pduReadTimer.""" |
| if self.pduReadTimer and self.pduReadTimer.active(): |
| self.pduReadTimer.cancel() |
| |
| def readMessage(self) -> Optional[bytes]: |
| """read an entire [raw] message.""" |
| pduLen = self._getMessageLength() |
| if pduLen is None: |
| return None |
| return self._getMessage(pduLen) |
| |
| def _getMessageLength(self) -> Optional[int]: |
| if len(self.recvBuffer) < 2: |
| return None |
| return struct.unpack('!H', self.recvBuffer[:2])[0] |
| |
| def _getMessage(self, pduLen: int) -> Optional[bytes]: |
| if len(self.recvBuffer) < pduLen+2: |
| return None |
| |
| message = self.recvBuffer[:pduLen+2] |
| self.recvBuffer = self.recvBuffer[pduLen+2:] |
| return message |
| |
| def onPDUReadTimeout(self): |
| logger.error('PDU read timed out. Buffer is now considered corrupt') |
| #self.coruptDataReceived |
| |
| def rawMessageReceived(self, message: bytes): |
| """Called once a complete binary vpcd message has been received.""" |
| pdu = None |
| try: |
| pdu = VpcdProtocolBase.construct.parse(message) |
| except Exception as e: |
| logger.exception(e) |
| logger.critical('Received corrupt PDU %s' % b2h(message)) |
| #self.corupDataRecvd() |
| else: |
| self.PDUReceived(pdu) |
| |
| def PDUReceived(self, pdu): |
| logger.debug("Rx PDU: %s" % pdu) |
| if pdu['data']: |
| return self.on_rx_data(pdu) |
| else: |
| method = getattr(self, 'on_rx_' + pdu['ctrl']) |
| return method(pdu) |
| |
| def on_rx_atr(self, pdu): |
| self.send_data(self.vcard.atr) |
| |
| def on_rx_on(self, pdu): |
| if self.on: |
| return |
| else: |
| self.on = True |
| self.vcard.power_change(self.on) |
| |
| def on_rx_reset(self, pdu): |
| self.vcard.reset() |
| |
| def on_rx_off(self, pdu): |
| if not self.on: |
| return |
| else: |
| self.on = False |
| self.vcard.power_change(self.on) |
| |
| def on_rx_data(self, pdu): |
| self.vcard.rx_c_apdu(pdu['data']) |
| |
| def send_pdu(self, pdu): |
| logger.debug("Sending PDU: %s" % pdu) |
| encoded = VpcdProtocolBase.construct.build(pdu) |
| #logger.debug("Sending binary: %s" % b2h(encoded)) |
| self.transport.write(encoded) |
| |
| def send_data(self, data: Union[str, bytes]): |
| if isinstance(data, str): |
| data = h2b(data) |
| return self.send_pdu({'length': 0, 'ctrl': '', 'data': data}) |
| |
| def send_ctrl(self, ctrl: str): |
| return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''}) |
| |
| |
| class VpcdProtocolClient(VpcdProtocolBase): |
| pass |
| |
| |
| class VpcdClientFactory(ReconnectingClientFactory): |
| def __init__(self, vcard_class: VirtualCard): |
| self.vcard_class = vcard_class |
| |
| def startedConnecting(self, connector): |
| logger.debug('Started to connect') |
| |
| def buildProtocol(self, addr): |
| logger.info('Connection established to %s' % addr) |
| self.resetDelay() |
| return VpcdProtocolClient(vcard = self.vcard_class()) |
| |
| def clientConnectionLost(self, connector, reason): |
| logger.warning('Connection lost (reason: %s)' % reason) |
| super().clientConnectionLost(connector, reason) |
| |
| def clientConnectionFailed(self, connector, reason): |
| logger.warning('Connection failed (reason: %s)' % reason) |
| super().clientConnectionFailed(connector, reason) |
| |
| ####################################################################### |
| # Application |
| ####################################################################### |
| |
| from pprint import pprint as pp |
| |
| from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator |
| from twisted.internet import reactor |
| |
| from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService |
| from smpp.twisted.protocol import SMPPClientProtocol |
| from smpp.twisted.config import SMPPClientConfig |
| from smpp.pdu.operations import SubmitSM, DeliverSM |
| from smpp.pdu import pdu_types |
| |
| from pySim.ota import OtaKeyset, OtaDialectSms |
| from pySim.utils import b2h, h2b |
| |
| |
| class MyVcard(VirtualCard): |
| def __init__(self, **kwargs): |
| super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs) |
| self.smpp_client = None |
| # KIC1 + KID1 of 8988211000000467285 |
| KIC1 = h2b('D0FDA31990D8D64178601317191669B4') |
| KID1 = h2b('D24EB461799C5E035C77451FD9404463') |
| KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B') |
| KID3 = h2b('12110C78E678C25408233076AA033615') |
| self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3, |
| algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3) |
| self.ota_dialect = OtaDialectSms() |
| self.tar = h2b('B00011') |
| self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False, |
| 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'} |
| |
| def ensure_smpp(self): |
| config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test') |
| if self.smpp_client: |
| return |
| self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp) |
| smpp = self.smpp_client.connectAndBind() |
| #self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp) |
| #d = self.smpp.connectTCP(config.host, config.port) |
| #d = self.smpp.connectAndBind() |
| #d.addCallback(self.forwardToClient, self.smpp) |
| |
| def power_change(self, new_state: bool): |
| if new_state: |
| logger.info("POWER ON") |
| self.ensure_smpp() |
| else: |
| logger.info("POWER OFF") |
| |
| def reset(self): |
| logger.info("RESET") |
| |
| def rx_c_apdu(self, apdu: bytes): |
| pp(self.smpp_client.smpp) |
| logger.info("C-APDU: %s" % b2h(apdu)) |
| # translate to Secured OTA RFM |
| secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu) |
| # add user data header |
| tpdu = b'\x02\x70\x00' + secured |
| # send via SMPP |
| self.tx_sms_tpdu(tpdu) |
| #self.tx_r_apdu('9000') |
| |
| def tx_sms_tpdu(self, tpdu: bytes): |
| """Send a SMS TPDU via SMPP SubmitSM.""" |
| dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, |
| pdu_types.DataCodingDefault.OCTET_UNSPECIFIED) |
| esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT, |
| gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET]) |
| submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class, |
| protocol_id=0x7f, short_message=tpdu) |
| self.smpp_client.smpp.sendDataRequest(submit) |
| |
| def handleSmpp(self, smpp, pdu): |
| #logger.info("Received SMPP %s" % pdu) |
| data = pdu.params['short_message'] |
| #logger.info("Received SMS Data %s" % b2h(data)) |
| r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data) |
| logger.info("Decoded SMPP %s" % r) |
| self.tx_r_apdu(r['last_response_data'] + r['last_status_word']) |
| |
| |
| if __name__ == '__main__': |
| import logging |
| logger = logging.getLogger(__name__) |
| import colorlog |
| log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s' |
| colorlog.basicConfig(level=logging.INFO, format = log_format) |
| logger = colorlog.getLogger() |
| |
| from twisted.internet import reactor |
| host = 'localhost' |
| port = 35963 |
| reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard)) |
| reactor.run() |