Harald Welte | b8e8bdf | 2022-08-07 19:42:02 +0200 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | # |
| 3 | # This program receive APDUs via the VPCD protocol of Frank Morgner's |
| 4 | # virtualsmartcard, encrypts them with OTA (over the air) keys and |
| 5 | # forwards them via SMPP to a SMSC (SMS service centre). |
| 6 | # |
| 7 | # In other words, you can use it as a poor man's OTA server, to enable |
| 8 | # you to use unmodified application software with PC/SC support to talk |
| 9 | # securely via OTA with a remote SMS card. |
| 10 | # |
| 11 | # This is very much a work in progress at this point. |
| 12 | |
| 13 | ####################################################################### |
| 14 | # twisted VPCD Library |
| 15 | ####################################################################### |
| 16 | |
| 17 | import logging |
| 18 | import struct |
| 19 | import abc |
| 20 | from typing import Union, Optional |
| 21 | from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild |
| 22 | from twisted.internet.protocol import Protocol, ReconnectingClientFactory |
| 23 | from pySim.utils import b2h, h2b |
| 24 | |
| 25 | logger = logging.getLogger(__name__) |
| 26 | |
| 27 | class VirtualCard(abc.ABC): |
| 28 | """Abstract base class for a virtual smart card.""" |
| 29 | def __init__(self, atr: Union[str, bytes]): |
| 30 | if isinstance(atr, str): |
| 31 | atr = h2b(atr) |
| 32 | self.atr = atr |
| 33 | |
| 34 | @abc.abstractmethod |
| 35 | def power_change(self, new_state: bool): |
| 36 | """Power the card on or off.""" |
| 37 | pass |
| 38 | |
| 39 | @abc.abstractmethod |
| 40 | def reset(self): |
| 41 | """Reset the card.""" |
| 42 | pass |
| 43 | |
| 44 | @abc.abstractmethod |
| 45 | def rx_c_apdu(self, apdu: bytes): |
| 46 | """Receive a C-APDU from the reader/application.""" |
| 47 | pass |
| 48 | |
| 49 | def tx_r_apdu(self, apdu: Union[str, bytes]): |
| 50 | if isinstance(apdu, str): |
| 51 | apdu = h2b(apdu) |
| 52 | logger.info("R-APDU: %s" % b2h(apdu)) |
| 53 | self.protocol.send_data(apdu) |
| 54 | |
| 55 | class VpcdProtocolBase(Protocol): |
| 56 | # Prefixed couldn't be used as the this.length wouldn't be available in this case |
| 57 | construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)), |
| 58 | 'data'/If(this.length > 1, Bytes(this.length)), |
| 59 | 'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4))) |
| 60 | def __init__(self, vcard: VirtualCard): |
| 61 | self.recvBuffer = b'' |
| 62 | self.connectionCorrupted = False |
| 63 | self.pduReadTimer = None |
| 64 | self.pduReadTimerSecs = 10 |
| 65 | self.callLater = reactor.callLater |
| 66 | self.on = False |
| 67 | self.vcard = vcard |
| 68 | self.vcard.protocol = self |
| 69 | |
| 70 | def dataReceived(self, data: bytes): |
| 71 | """entry point where twisted tells us data was received.""" |
| 72 | #logger.debug('Data received: %s' % b2h(data)) |
| 73 | self.recvBuffer = self.recvBuffer + data |
| 74 | while True: |
| 75 | if self.connectionCorrupted: |
| 76 | return |
| 77 | msg = self.readMessage() |
| 78 | if msg is None: |
| 79 | break |
| 80 | self.endPDURead() |
| 81 | self.rawMessageReceived(msg) |
| 82 | |
| 83 | if len(self.recvBuffer) > 0: |
| 84 | self.incompletePDURead() |
| 85 | |
| 86 | def incompletePDURead(self): |
| 87 | """We have an incomplete PDU in readBuffer, schedule pduReadTimer""" |
| 88 | if self.pduReadTimer and self.pduReadTimer.active(): |
| 89 | return |
| 90 | self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout) |
| 91 | |
| 92 | def endPDURead(self): |
| 93 | """We completed reading a PDU, cancel the pduReadTimer.""" |
| 94 | if self.pduReadTimer and self.pduReadTimer.active(): |
| 95 | self.pduReadTimer.cancel() |
| 96 | |
| 97 | def readMessage(self) -> Optional[bytes]: |
| 98 | """read an entire [raw] message.""" |
| 99 | pduLen = self._getMessageLength() |
| 100 | if pduLen is None: |
| 101 | return None |
| 102 | return self._getMessage(pduLen) |
| 103 | |
| 104 | def _getMessageLength(self) -> Optional[int]: |
| 105 | if len(self.recvBuffer) < 2: |
| 106 | return None |
| 107 | return struct.unpack('!H', self.recvBuffer[:2])[0] |
| 108 | |
| 109 | def _getMessage(self, pduLen: int) -> Optional[bytes]: |
| 110 | if len(self.recvBuffer) < pduLen+2: |
| 111 | return None |
| 112 | |
| 113 | message = self.recvBuffer[:pduLen+2] |
| 114 | self.recvBuffer = self.recvBuffer[pduLen+2:] |
| 115 | return message |
| 116 | |
| 117 | def onPDUReadTimeout(self): |
| 118 | logger.error('PDU read timed out. Buffer is now considered corrupt') |
| 119 | #self.coruptDataReceived |
| 120 | |
| 121 | def rawMessageReceived(self, message: bytes): |
| 122 | """Called once a complete binary vpcd message has been received.""" |
| 123 | pdu = None |
| 124 | try: |
| 125 | pdu = VpcdProtocolBase.construct.parse(message) |
| 126 | except Exception as e: |
| 127 | logger.exception(e) |
| 128 | logger.critical('Received corrupt PDU %s' % b2h(message)) |
| 129 | #self.corupDataRecvd() |
| 130 | else: |
| 131 | self.PDUReceived(pdu) |
| 132 | |
| 133 | def PDUReceived(self, pdu): |
| 134 | logger.debug("Rx PDU: %s" % pdu) |
| 135 | if pdu['data']: |
| 136 | return self.on_rx_data(pdu) |
| 137 | else: |
| 138 | method = getattr(self, 'on_rx_' + pdu['ctrl']) |
| 139 | return method(pdu) |
| 140 | |
| 141 | def on_rx_atr(self, pdu): |
| 142 | self.send_data(self.vcard.atr) |
| 143 | |
| 144 | def on_rx_on(self, pdu): |
| 145 | if self.on: |
| 146 | return |
| 147 | else: |
| 148 | self.on = True |
| 149 | self.vcard.power_change(self.on) |
| 150 | |
| 151 | def on_rx_reset(self, pdu): |
| 152 | self.vcard.reset() |
| 153 | |
| 154 | def on_rx_off(self, pdu): |
| 155 | if not self.on: |
| 156 | return |
| 157 | else: |
| 158 | self.on = False |
| 159 | self.vcard.power_change(self.on) |
| 160 | |
| 161 | def on_rx_data(self, pdu): |
| 162 | self.vcard.rx_c_apdu(pdu['data']) |
| 163 | |
| 164 | def send_pdu(self, pdu): |
| 165 | logger.debug("Sending PDU: %s" % pdu) |
| 166 | encoded = VpcdProtocolBase.construct.build(pdu) |
| 167 | #logger.debug("Sending binary: %s" % b2h(encoded)) |
| 168 | self.transport.write(encoded) |
| 169 | |
| 170 | def send_data(self, data: Union[str, bytes]): |
| 171 | if isinstance(data, str): |
| 172 | data = h2b(data) |
| 173 | return self.send_pdu({'length': 0, 'ctrl': '', 'data': data}) |
| 174 | |
| 175 | def send_ctrl(self, ctrl: str): |
| 176 | return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''}) |
| 177 | |
| 178 | |
| 179 | class VpcdProtocolClient(VpcdProtocolBase): |
| 180 | pass |
| 181 | |
| 182 | |
| 183 | class VpcdClientFactory(ReconnectingClientFactory): |
| 184 | def __init__(self, vcard_class: VirtualCard): |
| 185 | self.vcard_class = vcard_class |
| 186 | |
| 187 | def startedConnecting(self, connector): |
| 188 | logger.debug('Started to connect') |
| 189 | |
| 190 | def buildProtocol(self, addr): |
| 191 | logger.info('Connection established to %s' % addr) |
| 192 | self.resetDelay() |
| 193 | return VpcdProtocolClient(vcard = self.vcard_class()) |
| 194 | |
| 195 | def clientConnectionLost(self, connector, reason): |
| 196 | logger.warning('Connection lost (reason: %s)' % reason) |
| 197 | super().clientConnectionLost(connector, reason) |
| 198 | |
| 199 | def clientConnectionFailed(self, connector, reason): |
| 200 | logger.warning('Connection failed (reason: %s)' % reason) |
| 201 | super().clientConnectionFailed(connector, reason) |
| 202 | |
| 203 | ####################################################################### |
| 204 | # Application |
| 205 | ####################################################################### |
| 206 | |
| 207 | from pprint import pprint as pp |
| 208 | |
| 209 | from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator |
| 210 | from twisted.internet import reactor |
| 211 | |
| 212 | from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService |
| 213 | from smpp.twisted.protocol import SMPPClientProtocol |
| 214 | from smpp.twisted.config import SMPPClientConfig |
| 215 | from smpp.pdu.operations import SubmitSM, DeliverSM |
| 216 | from smpp.pdu import pdu_types |
| 217 | |
| 218 | from pySim.ota import OtaKeyset, OtaDialectSms |
| 219 | from pySim.utils import b2h, h2b |
| 220 | |
| 221 | |
| 222 | class MyVcard(VirtualCard): |
| 223 | def __init__(self, **kwargs): |
| 224 | super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs) |
| 225 | self.smpp_client = None |
| 226 | # KIC1 + KID1 of 8988211000000467285 |
| 227 | KIC1 = h2b('D0FDA31990D8D64178601317191669B4') |
| 228 | KID1 = h2b('D24EB461799C5E035C77451FD9404463') |
| 229 | KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B') |
| 230 | KID3 = h2b('12110C78E678C25408233076AA033615') |
| 231 | self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3, |
| 232 | algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3) |
| 233 | self.ota_dialect = OtaDialectSms() |
| 234 | self.tar = h2b('B00011') |
| 235 | self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False, |
| 236 | 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'} |
| 237 | |
| 238 | def ensure_smpp(self): |
| 239 | config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test') |
| 240 | if self.smpp_client: |
| 241 | return |
| 242 | self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp) |
| 243 | smpp = self.smpp_client.connectAndBind() |
| 244 | #self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp) |
| 245 | #d = self.smpp.connectTCP(config.host, config.port) |
| 246 | #d = self.smpp.connectAndBind() |
| 247 | #d.addCallback(self.forwardToClient, self.smpp) |
| 248 | |
| 249 | def power_change(self, new_state: bool): |
| 250 | if new_state: |
| 251 | logger.info("POWER ON") |
| 252 | self.ensure_smpp() |
| 253 | else: |
| 254 | logger.info("POWER OFF") |
| 255 | |
| 256 | def reset(self): |
| 257 | logger.info("RESET") |
| 258 | |
| 259 | def rx_c_apdu(self, apdu: bytes): |
| 260 | pp(self.smpp_client.smpp) |
| 261 | logger.info("C-APDU: %s" % b2h(apdu)) |
| 262 | # translate to Secured OTA RFM |
| 263 | secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu) |
| 264 | # add user data header |
| 265 | tpdu = b'\x02\x70\x00' + secured |
| 266 | # send via SMPP |
| 267 | self.tx_sms_tpdu(tpdu) |
| 268 | #self.tx_r_apdu('9000') |
| 269 | |
| 270 | def tx_sms_tpdu(self, tpdu: bytes): |
| 271 | """Send a SMS TPDU via SMPP SubmitSM.""" |
| 272 | dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, |
| 273 | pdu_types.DataCodingDefault.OCTET_UNSPECIFIED) |
| 274 | esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT, |
| 275 | gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET]) |
| 276 | submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class, |
| 277 | protocol_id=0x7f, short_message=tpdu) |
| 278 | self.smpp_client.smpp.sendDataRequest(submit) |
| 279 | |
| 280 | def handleSmpp(self, smpp, pdu): |
| 281 | #logger.info("Received SMPP %s" % pdu) |
| 282 | data = pdu.params['short_message'] |
| 283 | #logger.info("Received SMS Data %s" % b2h(data)) |
| 284 | r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data) |
| 285 | logger.info("Decoded SMPP %s" % r) |
| 286 | self.tx_r_apdu(r['last_response_data'] + r['last_status_word']) |
| 287 | |
| 288 | |
| 289 | if __name__ == '__main__': |
| 290 | import logging |
| 291 | logger = logging.getLogger(__name__) |
| 292 | import colorlog |
| 293 | log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s' |
| 294 | colorlog.basicConfig(level=logging.INFO, format = log_format) |
| 295 | logger = colorlog.getLogger() |
| 296 | |
| 297 | from twisted.internet import reactor |
| 298 | host = 'localhost' |
| 299 | port = 35963 |
| 300 | reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard)) |
| 301 | reactor.run() |