blob: afeed6d66788292269d6c17947cbc675110a964e [file] [log] [blame]
Harald Welteb8e8bdf2022-08-07 19:42:02 +02001#!/usr/bin/env python3
2#
3# This program receive APDUs via the VPCD protocol of Frank Morgner's
4# virtualsmartcard, encrypts them with OTA (over the air) keys and
5# forwards them via SMPP to a SMSC (SMS service centre).
6#
7# In other words, you can use it as a poor man's OTA server, to enable
8# you to use unmodified application software with PC/SC support to talk
9# securely via OTA with a remote SMS card.
10#
11# This is very much a work in progress at this point.
12
13#######################################################################
14# twisted VPCD Library
15#######################################################################
16
17import logging
18import struct
19import abc
20from typing import Union, Optional
21from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
22from twisted.internet.protocol import Protocol, ReconnectingClientFactory
23from pySim.utils import b2h, h2b
24
25logger = logging.getLogger(__name__)
26
27class VirtualCard(abc.ABC):
28 """Abstract base class for a virtual smart card."""
29 def __init__(self, atr: Union[str, bytes]):
30 if isinstance(atr, str):
31 atr = h2b(atr)
32 self.atr = atr
33
34 @abc.abstractmethod
35 def power_change(self, new_state: bool):
36 """Power the card on or off."""
37 pass
38
39 @abc.abstractmethod
40 def reset(self):
41 """Reset the card."""
42 pass
43
44 @abc.abstractmethod
45 def rx_c_apdu(self, apdu: bytes):
46 """Receive a C-APDU from the reader/application."""
47 pass
48
49 def tx_r_apdu(self, apdu: Union[str, bytes]):
50 if isinstance(apdu, str):
51 apdu = h2b(apdu)
52 logger.info("R-APDU: %s" % b2h(apdu))
53 self.protocol.send_data(apdu)
54
55class VpcdProtocolBase(Protocol):
56 # Prefixed couldn't be used as the this.length wouldn't be available in this case
57 construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
58 'data'/If(this.length > 1, Bytes(this.length)),
59 'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
60 def __init__(self, vcard: VirtualCard):
61 self.recvBuffer = b''
62 self.connectionCorrupted = False
63 self.pduReadTimer = None
64 self.pduReadTimerSecs = 10
65 self.callLater = reactor.callLater
66 self.on = False
67 self.vcard = vcard
68 self.vcard.protocol = self
69
70 def dataReceived(self, data: bytes):
71 """entry point where twisted tells us data was received."""
72 #logger.debug('Data received: %s' % b2h(data))
73 self.recvBuffer = self.recvBuffer + data
74 while True:
75 if self.connectionCorrupted:
76 return
77 msg = self.readMessage()
78 if msg is None:
79 break
80 self.endPDURead()
81 self.rawMessageReceived(msg)
82
83 if len(self.recvBuffer) > 0:
84 self.incompletePDURead()
85
86 def incompletePDURead(self):
87 """We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
88 if self.pduReadTimer and self.pduReadTimer.active():
89 return
90 self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
91
92 def endPDURead(self):
93 """We completed reading a PDU, cancel the pduReadTimer."""
94 if self.pduReadTimer and self.pduReadTimer.active():
95 self.pduReadTimer.cancel()
96
97 def readMessage(self) -> Optional[bytes]:
98 """read an entire [raw] message."""
99 pduLen = self._getMessageLength()
100 if pduLen is None:
101 return None
102 return self._getMessage(pduLen)
103
104 def _getMessageLength(self) -> Optional[int]:
105 if len(self.recvBuffer) < 2:
106 return None
107 return struct.unpack('!H', self.recvBuffer[:2])[0]
108
109 def _getMessage(self, pduLen: int) -> Optional[bytes]:
110 if len(self.recvBuffer) < pduLen+2:
111 return None
112
113 message = self.recvBuffer[:pduLen+2]
114 self.recvBuffer = self.recvBuffer[pduLen+2:]
115 return message
116
117 def onPDUReadTimeout(self):
118 logger.error('PDU read timed out. Buffer is now considered corrupt')
119 #self.coruptDataReceived
120
121 def rawMessageReceived(self, message: bytes):
122 """Called once a complete binary vpcd message has been received."""
123 pdu = None
124 try:
125 pdu = VpcdProtocolBase.construct.parse(message)
126 except Exception as e:
127 logger.exception(e)
128 logger.critical('Received corrupt PDU %s' % b2h(message))
129 #self.corupDataRecvd()
130 else:
131 self.PDUReceived(pdu)
132
133 def PDUReceived(self, pdu):
134 logger.debug("Rx PDU: %s" % pdu)
135 if pdu['data']:
136 return self.on_rx_data(pdu)
137 else:
138 method = getattr(self, 'on_rx_' + pdu['ctrl'])
139 return method(pdu)
140
141 def on_rx_atr(self, pdu):
142 self.send_data(self.vcard.atr)
143
144 def on_rx_on(self, pdu):
145 if self.on:
146 return
147 else:
148 self.on = True
149 self.vcard.power_change(self.on)
150
151 def on_rx_reset(self, pdu):
152 self.vcard.reset()
153
154 def on_rx_off(self, pdu):
155 if not self.on:
156 return
157 else:
158 self.on = False
159 self.vcard.power_change(self.on)
160
161 def on_rx_data(self, pdu):
162 self.vcard.rx_c_apdu(pdu['data'])
163
164 def send_pdu(self, pdu):
165 logger.debug("Sending PDU: %s" % pdu)
166 encoded = VpcdProtocolBase.construct.build(pdu)
167 #logger.debug("Sending binary: %s" % b2h(encoded))
168 self.transport.write(encoded)
169
170 def send_data(self, data: Union[str, bytes]):
171 if isinstance(data, str):
172 data = h2b(data)
173 return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
174
175 def send_ctrl(self, ctrl: str):
176 return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
177
178
179class VpcdProtocolClient(VpcdProtocolBase):
180 pass
181
182
183class VpcdClientFactory(ReconnectingClientFactory):
184 def __init__(self, vcard_class: VirtualCard):
185 self.vcard_class = vcard_class
186
187 def startedConnecting(self, connector):
188 logger.debug('Started to connect')
189
190 def buildProtocol(self, addr):
191 logger.info('Connection established to %s' % addr)
192 self.resetDelay()
193 return VpcdProtocolClient(vcard = self.vcard_class())
194
195 def clientConnectionLost(self, connector, reason):
196 logger.warning('Connection lost (reason: %s)' % reason)
197 super().clientConnectionLost(connector, reason)
198
199 def clientConnectionFailed(self, connector, reason):
200 logger.warning('Connection failed (reason: %s)' % reason)
201 super().clientConnectionFailed(connector, reason)
202
203#######################################################################
204# Application
205#######################################################################
206
207from pprint import pprint as pp
208
209from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
210from twisted.internet import reactor
211
212from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
213from smpp.twisted.protocol import SMPPClientProtocol
214from smpp.twisted.config import SMPPClientConfig
215from smpp.pdu.operations import SubmitSM, DeliverSM
216from smpp.pdu import pdu_types
217
218from pySim.ota import OtaKeyset, OtaDialectSms
219from pySim.utils import b2h, h2b
220
221
222class MyVcard(VirtualCard):
223 def __init__(self, **kwargs):
224 super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
225 self.smpp_client = None
226 # KIC1 + KID1 of 8988211000000467285
227 KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
228 KID1 = h2b('D24EB461799C5E035C77451FD9404463')
229 KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
230 KID3 = h2b('12110C78E678C25408233076AA033615')
231 self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
232 algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
233 self.ota_dialect = OtaDialectSms()
234 self.tar = h2b('B00011')
235 self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
236 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
237
238 def ensure_smpp(self):
239 config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
240 if self.smpp_client:
241 return
242 self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
243 smpp = self.smpp_client.connectAndBind()
244 #self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
245 #d = self.smpp.connectTCP(config.host, config.port)
246 #d = self.smpp.connectAndBind()
247 #d.addCallback(self.forwardToClient, self.smpp)
248
249 def power_change(self, new_state: bool):
250 if new_state:
251 logger.info("POWER ON")
252 self.ensure_smpp()
253 else:
254 logger.info("POWER OFF")
255
256 def reset(self):
257 logger.info("RESET")
258
259 def rx_c_apdu(self, apdu: bytes):
260 pp(self.smpp_client.smpp)
261 logger.info("C-APDU: %s" % b2h(apdu))
262 # translate to Secured OTA RFM
263 secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
264 # add user data header
265 tpdu = b'\x02\x70\x00' + secured
266 # send via SMPP
267 self.tx_sms_tpdu(tpdu)
268 #self.tx_r_apdu('9000')
269
270 def tx_sms_tpdu(self, tpdu: bytes):
271 """Send a SMS TPDU via SMPP SubmitSM."""
272 dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
273 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
274 esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
275 gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
276 submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
277 protocol_id=0x7f, short_message=tpdu)
278 self.smpp_client.smpp.sendDataRequest(submit)
279
280 def handleSmpp(self, smpp, pdu):
281 #logger.info("Received SMPP %s" % pdu)
282 data = pdu.params['short_message']
283 #logger.info("Received SMS Data %s" % b2h(data))
284 r, d = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
285 logger.info("Decoded SMPP %s" % r)
286 self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
287
288
289if __name__ == '__main__':
290 import logging
291 logger = logging.getLogger(__name__)
292 import colorlog
293 log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
294 colorlog.basicConfig(level=logging.INFO, format = log_format)
295 logger = colorlog.getLogger()
296
297 from twisted.internet import reactor
298 host = 'localhost'
299 port = 35963
300 reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
301 reactor.run()