blob: 499e9ff7441d230c8df6a1b14721bb40b0a2a56f [file] [log] [blame]
Harald Welte21caf322022-07-16 14:06:46 +02001# coding=utf-8
2
3# (C) 2022 by Harald Welte <laforge@osmocom.org>
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
19import sys
20import logging
21from pprint import pprint as pp
22from typing import Tuple
23import pyshark
24
25from pySim.utils import h2b, b2h
26from pySim.apdu import Tpdu
27from . import ApduSource, PacketType, CardReset
28
29logger = logging.getLogger(__name__)
30
31class _PysharkRspro(ApduSource):
32 """APDU Source [provider] base class for reading RSPRO (osmo-remsim) via tshark."""
33
34 def __init__(self, pyshark_inst):
35 self.pyshark = pyshark_inst
36 self.bank_id = None
37 self.bank_slot = None
38 self.cmd_tpdu = None
39 super().__init__()
40
41 @staticmethod
42 def get_bank_slot(bank_slot) -> Tuple[int, int]:
43 """Convert a 'bankSlot_element' field into a tuple of bank_id, slot_nr"""
44 bank_id = bank_slot.get_field('bankId')
45 slot_nr = bank_slot.get_field('slotNr')
46 return int(bank_id), int(slot_nr)
47
48 @staticmethod
49 def get_client_slot(client_slot) -> Tuple[int, int]:
50 """Convert a 'clientSlot_element' field into a tuple of client_id, slot_nr"""
51 client_id = client_slot.get_field('clientId')
52 slot_nr = client_slot.get_field('slotNr')
53 return int(client_id), int(slot_nr)
54
55 @staticmethod
56 def get_pstatus(pstatus) -> Tuple[int, int, int]:
57 """Convert a 'slotPhysStatus_element' field into a tuple of vcc, reset, clk"""
58 vccPresent = int(pstatus.get_field('vccPresent'))
59 resetActive = int(pstatus.get_field('resetActive'))
60 clkActive = int(pstatus.get_field('clkActive'))
61 return vccPresent, resetActive, clkActive
62
63 def read_packet(self) -> PacketType:
64 p = self.pyshark.next()
65 return self._parse_packet(p)
66
67 def _set_or_verify_bank_slot(self, bsl: Tuple[int, int]):
68 """Keep track of the bank:slot to make sure we don't mix traces of multiple cards"""
69 if not self.bank_id:
70 self.bank_id = bsl[0]
71 self.bank_slot = bsl[1]
72 else:
73 if self.bank_id != bsl[0] or self.bank_slot != bsl[1]:
74 raise ValueError('Received data for unexpected B(%u:%u)' % (bsl[0], bsl[1]))
75
76 def _parse_packet(self, p) -> PacketType:
77 rspro_layer = p['rspro']
78 #print("Layer: %s" % rspro_layer)
79 rspro_element = rspro_layer.get_field('RsproPDU_element')
80 #print("Element: %s" % rspro_element)
81 msg_type = rspro_element.get_field('msg')
82 rspro_msg = rspro_element.get_field('msg_tree')
83 if msg_type == '12': # tpduModemToCard
84 modem2card = rspro_msg.get_field('tpduModemToCard_element')
85 #print(modem2card)
86 client_slot = modem2card.get_field('fromClientSlot_element')
87 csl = self.get_client_slot(client_slot)
88 bank_slot = modem2card.get_field('toBankSlot_element')
89 bsl = self.get_bank_slot(bank_slot)
90 self._set_or_verify_bank_slot(bsl)
91 data = modem2card.get_field('data').replace(':','')
92 logger.debug("C(%u:%u) -> B(%u:%u): %s" % (csl[0], csl[1], bsl[0], bsl[1], data))
93 # store the CMD portion until the RSP portion arrives later
94 self.cmd_tpdu = h2b(data)
95 elif msg_type == '13': # tpduCardToModem
96 card2modem = rspro_msg.get_field('tpduCardToModem_element')
97 #print(card2modem)
98 client_slot = card2modem.get_field('toClientSlot_element')
99 csl = self.get_client_slot(client_slot)
100 bank_slot = card2modem.get_field('fromBankSlot_element')
101 bsl = self.get_bank_slot(bank_slot)
102 self._set_or_verify_bank_slot(bsl)
103 data = card2modem.get_field('data').replace(':','')
104 logger.debug("C(%u:%u) <- B(%u:%u): %s" % (csl[0], csl[1], bsl[0], bsl[1], data))
105 rsp_tpdu = h2b(data)
106 if self.cmd_tpdu:
107 # combine this R-TPDU with the C-TPDU we saw earlier
108 r = Tpdu(self.cmd_tpdu, rsp_tpdu)
109 self.cmd_tpdu = False
110 return r
111 elif msg_type == '14': # clientSlotStatus
112 cl_slotstatus = rspro_msg.get_field('clientSlotStatusInd_element')
113 #print(cl_slotstatus)
114 client_slot = cl_slotstatus.get_field('fromClientSlot_element')
115 bank_slot = cl_slotstatus.get_field('toBankSlot_element')
116 slot_pstatus = cl_slotstatus.get_field('slotPhysStatus_element')
117 vccPresent, resetActive, clkActive = self.get_pstatus(slot_pstatus)
118 if vccPresent and clkActive and not resetActive:
119 logger.debug("RESET")
120 return CardReset()
121 else:
122 print("Unhandled msg type %s: %s" % (msg_type, rspro_msg))
123
124
125class PysharkRsproPcap(_PysharkRspro):
126 """APDU Source [provider] class for reading RSPRO (osmo-remsim) from a PCAP
127 file via pyshark, which in turn uses tshark (part of wireshark).
128
129 In order to use this, you need a wireshark patched with RSPRO support,
130 such as can be found at https://gitea.osmocom.org/osmocom/wireshark/src/branch/laforge/rspro
131
132 A STANDARD UPSTREAM WIRESHARK *DOES NOT WORK*.
133 """
134 def __init__(self, pcap_filename):
135 """
136 Args:
137 pcap_filename: File name of the pcap file to be opened
138 """
139 pyshark_inst = pyshark.FileCapture(pcap_filename, display_filter='rspro', use_json=True, keep_packets=False)
140 super().__init__(pyshark_inst)
141
142class PysharkRsproLive(_PysharkRspro):
143 """APDU Source [provider] class for reading RSPRO (osmo-remsim) from a live capture
144 via pyshark, which in turn uses tshark (part of wireshark).
145
146 In order to use this, you need a wireshark patched with RSPRO support,
147 such as can be found at https://gitea.osmocom.org/osmocom/wireshark/src/branch/laforge/rspro
148
149 A STANDARD UPSTREAM WIRESHARK *DOES NOT WORK*.
150 """
151 def __init__(self, interface, bpf_filter='tcp port 9999 or tcp port 9998'):
152 """
153 Args:
154 interface: Network interface name to capture packets on (like "eth0")
155 bfp_filter: libpcap capture filter to use
156 """
157 pyshark_inst = pyshark.LiveCapture(interface=interface, display_filter='rspro', bpf_filter=bpf_filter,
158 use_json=True)
159 super().__init__(pyshark_inst)