blob: 0922b400b6dc5aa81acb882ace0b64811debb984 [file] [log] [blame]
Harald Welte2241e722022-08-06 19:24:52 +02001#!/usr/bin/env python3
2#
3# Program to emulate the entire communication path SMSC-MSC-BSC-BTS-ME
4# that is usually between an OTA backend and the SIM card. This allows
5# to play with SIM OTA technology without using a mobile network or even
6# a mobile phone.
7#
8# An external application must encode (and encrypt/sign) the OTA SMS
9# and submit them via SMPP to this program, just like it would submit
10# it normally to a SMSC (SMS Service Centre). The program then re-formats
11# the SMPP-SUBMIT into a SMS DELIVER TPDU and passes it via an ENVELOPE
12# APDU to the SIM card that is locally inserted into a smart card reader.
13#
14# The path from SIM to external OTA application works the opposite way.
15
16import argparse
17import logging
18import colorlog
19from pprint import pprint as pp
20
21from twisted.protocols import basic
22from twisted.internet import defer, endpoints, protocol, reactor, task
23from twisted.cred.portal import IRealm
24from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
25from twisted.cred.portal import Portal
26from zope.interface import implementer
27
28from smpp.twisted.config import SMPPServerConfig
29from smpp.twisted.server import SMPPServerFactory, SMPPBindManager
30from smpp.twisted.protocol import SMPPSessionStates, DataHandlerResponse
31
32from smpp.pdu import pdu_types, operations, pdu_encoding
33
34from pySim.sms import SMS_DELIVER, AddressField
35
36from pySim.transport import LinkBase, ProactiveHandler, argparse_add_reader_args, init_reader
37from pySim.commands import SimCardCommands
38from pySim.cards import UsimCard
39from pySim.exceptions import *
40from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload
41from pySim.cat import DeviceIdentities, Address
42from pySim.utils import b2h, h2b
43
44logger = logging.getLogger(__name__)
45
46# MSISDNs to use when generating proactive SMS messages
47SIM_MSISDN='23'
48ESME_MSISDN='12'
49
50# HACK: we need some kind of mapping table between system_id and card-reader
51# or actually route based on MSISDNs
52hackish_global_smpp = None
53
54class Proact(ProactiveHandler):
55 def __init__(self, smpp_factory):
56 self.smpp_factory = smpp_factory
57
58 @staticmethod
59 def _find_first_element_of_type(instlist, cls):
60 for i in instlist:
61 if isinstance(i, cls):
62 return i
63 return None
64
65 """Call-back which the pySim transport core calls whenever it receives a
66 proactive command from the SIM."""
67 def handle_SendShortMessage(self, data):
68 """Card requests sending a SMS."""
69 pp(data)
70 # Relevant parts in data: Address, SMS_TPDU
71 addr_ie = _find_first_element_of_type(data.children, Address)
72 sms_tpdu_ie = _find_first_element_of_type(data.children, SMS_TPDU)
73 raw_tpdu = sms_tpdu_ie.decoded['tpdu']
74 submit = SMS_SUBMIT.fromBytes(raw_tpdu)
75 self.send_sms_via_smpp(data)
76 def handle_OpenChannel(self, data):
77 """Card requests opening a new channel via a UDP/TCP socket."""
78 pp(data)
79 pass
80 def handle_CloseChannel(self, data):
81 """Close a channel."""
82 pp(data)
83 pass
84 def handleReceiveData(self, data):
85 """Receive/read data from the socket."""
86 pp(data)
87 pass
88 def handleSendData(self, data):
89 """Send/write data to the socket."""
90 pp(data)
91 pass
92 def getChannelStatus(self, data):
93 pp(data)
94 pass
95
96 def send_sms_via_smpp(self, data):
97 # while in a normal network the phone/ME would *submit* a message to the SMSC,
98 # we are actually emulating the SMSC itself, so we must *deliver* the message
99 # to the ESME
100 dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
101 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
102 esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
103 gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
104 deliver = operations.DeliverSM(source_addr=SIM_MSISDN,
105 destination_addr=ESME_MSISDN,
106 esm_class=esm_class,
107 protocol_id=0x7F,
108 data_coding=dcs,
109 short_message=h2b(data))
110 hackish_global_smpp.sendDataRequest(deliver)
111# # obtain the connection/binding of system_id to be used for delivering MO-SMS to the ESME
112# connection = smpp_server.getBoundConnections[system_id].getNextBindingForDelivery()
113# connection.sendDataRequest(deliver)
114
115
116
117def dcs_is_8bit(dcs):
118 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
119 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
120 return True
121 if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
122 pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
123 return True
124 if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
125 return True
126 else:
127 return False
128
129
130class MyServer:
131
132 @implementer(IRealm)
133 class SmppRealm:
134 def requestAvatar(self, avatarId, mind, *interfaces):
135 return ('SMPP', avatarId, lambda: None)
136
137 def __init__(self, tcp_port:int = 2775, bind_ip = '::'):
138 smpp_config = SMPPServerConfig(msgHandler=self._msgHandler,
139 systems={'test': {'max_bindings': 2}})
140 portal = Portal(self.SmppRealm())
141 credential_checker = InMemoryUsernamePasswordDatabaseDontUse()
142 credential_checker.addUser('test', 'test')
143 portal.registerChecker(credential_checker)
144 self.factory = SMPPServerFactory(smpp_config, auth_portal=portal)
145 logger.info('Binding Virtual SMSC to TCP Port %u at %s' % (tcp_port, bind_ip))
146 smppEndpoint = endpoints.TCP6ServerEndpoint(reactor, tcp_port, interface=bind_ip)
147 smppEndpoint.listen(self.factory)
148 self.tp = self.scc = self.card = None
149
150 def connect_to_card(self, tp: LinkBase):
151 self.tp = tp
152 self.scc = SimCardCommands(self.tp)
153 self.card = UsimCard(self.scc)
154 # this should be part of UsimCard, but FairewavesSIM breaks with that :/
155 self.scc.cla_byte = "00"
156 self.scc.sel_ctrl = "0004"
157 self.card.read_aids()
158 self.card.select_adf_by_aid(adf='usim')
159 # FIXME: create a more realistic profile than ffffff
160 self.scc.terminal_profile('ffffff')
161
162 def _msgHandler(self, system_id, smpp, pdu):
163 # HACK: we need some kind of mapping table between system_id and card-reader
164 # or actually route based on MSISDNs
165 global hackish_global_smpp
166 hackish_global_smpp = smpp
167 #pp(pdu)
168 if pdu.id == pdu_types.CommandId.submit_sm:
169 return self.handle_submit_sm(system_id, smpp, pdu)
170 else:
171 logging.warning('Rejecting non-SUBMIT commandID')
172 return pdu_types.CommandStatus.ESME_RINVCMDID
173
174 def handle_submit_sm(self, system_id, smpp, pdu):
175 # check for valid data coding scheme + PID
176 if not dcs_is_8bit(pdu.params['data_coding']):
177 logging.warning('Rejecting non-8bit DCS')
178 return pdu_types.CommandStatus.ESME_RINVDCS
179 if pdu.params['protocol_id'] != 0x7f:
180 logging.warning('Rejecting non-SIM PID')
181 return pdu_types.CommandStatus.ESME_RINVDCS
182
183 # 1) build a SMS-DELIVER (!) from the SMPP-SUBMIT
184 tpdu = SMS_DELIVER.fromSmppSubmit(pdu)
185 print(tpdu)
186 # 2) wrap into the CAT ENVELOPE for SMS-PP-Download
187 tpdu_ie = SMS_TPDU(decoded={'tpdu': b2h(tpdu.toBytes())})
188 dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
189 sms_dl = SMSPPDownload(children=[dev_ids, tpdu_ie])
190 # 3) send to the card
191 envelope_hex = b2h(sms_dl.to_tlv())
192 print("ENVELOPE: %s" % envelope_hex)
193 (data, sw) = self.scc.envelope(envelope_hex)
194 print("SW %s: %s" % (sw, data))
195 if sw == '9300':
196 # TODO send back RP-ERROR message with TP-FCS == 'SIM Application Toolkit Busy'
197 return pdu_types.CommandStatus.ESME_RSUBMITFAIL
198 elif sw == '9000' or sw[0:2] in ['6f', '62', '63']:
199 # data something like 027100000e0ab000110000000000000001612f or
200 # 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
201 # which is the user-data portion of the SMS starting with the UDH (027100)
202 # TODO: return the response back to the sender in an RP-ACK; PID/DCS like in CMD
203 deliver = operations.DeliverSM(service_type=pdu.params['service_type'],
204 source_addr_ton=pdu.params['dest_addr_ton'],
205 source_addr_npi=pdu.params['dest_addr_npi'],
206 source_addr=pdu.params['destination_addr'],
207 dest_addr_ton=pdu.params['source_addr_ton'],
208 dest_addr_npi=pdu.params['source_addr_npi'],
209 destination_addr=pdu.params['source_addr'],
210 esm_class=pdu.params['esm_class'],
211 protocol_id=pdu.params['protocol_id'],
212 priority_flag=pdu.params['priority_flag'],
213 data_coding=pdu.params['data_coding'],
214 short_message=h2b(data))
215 smpp.sendDataRequest(deliver)
216 return pdu_types.CommandStatus.ESME_ROK
217 else:
218 return pdu_types.CommandStatus.ESME_RSUBMITFAIL
219
220
221option_parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
222argparse_add_reader_args(option_parser)
223smpp_group = option_parser.add_argument_group('SMPP Options')
224smpp_group.add_argument('--smpp-bind-port', type=int, default=2775,
225 help='TCP Port to bind the SMPP socket to')
226smpp_group.add_argument('--smpp-bind-ip', default='::',
227 help='IPv4/IPv6 address to bind the SMPP socket to')
228
229if __name__ == '__main__':
230 log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
231 colorlog.basicConfig(level=logging.INFO, format = log_format)
232 logger = colorlog.getLogger()
233
234 opts = option_parser.parse_args()
235
236 #tp = init_reader(opts, proactive_handler = Proact())
237 tp = init_reader(opts)
238 if tp is None:
239 exit(1)
240 tp.connect()
241
242 ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip)
243 ms.connect_to_card(tp)
244 reactor.run()
245