Harald Welte | 21caf32 | 2022-07-16 14:06:46 +0200 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | """ Osmocom GSMTAP python implementation. |
| 4 | GSMTAP is a packet format used for conveying a number of different |
| 5 | telecom-related protocol traces over UDP. |
| 6 | """ |
| 7 | |
| 8 | # |
| 9 | # Copyright (C) 2022 Harald Welte <laforge@gnumonks.org> |
| 10 | # |
| 11 | # This program is free software: you can redistribute it and/or modify |
| 12 | # it under the terms of the GNU General Public License as published by |
| 13 | # the Free Software Foundation, either version 2 of the License, or |
| 14 | # (at your option) any later version. |
| 15 | # |
| 16 | # This program is distributed in the hope that it will be useful, |
| 17 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 18 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 19 | # GNU General Public License for more details. |
| 20 | # |
| 21 | # You should have received a copy of the GNU General Public License |
| 22 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 23 | # |
| 24 | |
| 25 | import socket |
| 26 | from typing import List, Dict, Optional |
| 27 | from construct import Optional as COptional |
| 28 | from construct import * |
| 29 | from pySim.construct import * |
| 30 | |
| 31 | # The root definition of GSMTAP can be found at |
| 32 | # https://cgit.osmocom.org/cgit/libosmocore/tree/include/osmocom/core/gsmtap.h |
| 33 | |
| 34 | GSMTAP_UDP_PORT = 4729 |
| 35 | |
| 36 | # GSMTAP_TYPE_* |
| 37 | gsmtap_type_construct = Enum(Int8ub, |
| 38 | gsm_um = 0x01, |
| 39 | gsm_abis = 0x02, |
| 40 | gsm_um_burst = 0x03, |
| 41 | sim = 0x04, |
| 42 | tetra_i1 = 0x05, |
| 43 | tetra_i1_burst = 0x06, |
| 44 | wimax_burst = 0x07, |
| 45 | gprs_gb_llc = 0x08, |
| 46 | gprs_gb_sndcp = 0x09, |
| 47 | gmr1_um = 0x0a, |
| 48 | umts_rlc_mac = 0x0b, |
| 49 | umts_rrc = 0x0c, |
| 50 | lte_rrc = 0x0d, |
| 51 | lte_mac = 0x0e, |
| 52 | lte_mac_framed = 0x0f, |
| 53 | osmocore_log = 0x10, |
| 54 | qc_diag = 0x11, |
| 55 | lte_nas = 0x12, |
| 56 | e1_t1 = 0x13) |
| 57 | |
| 58 | |
| 59 | # TYPE_UM_BURST |
| 60 | gsmtap_subtype_burst_construct = Enum(Int8ub, |
| 61 | unknown = 0x00, |
| 62 | fcch = 0x01, |
| 63 | partial_sch = 0x02, |
| 64 | sch = 0x03, |
| 65 | cts_sch = 0x04, |
| 66 | compact_sch = 0x05, |
| 67 | normal = 0x06, |
| 68 | dummy = 0x07, |
| 69 | access = 0x08, |
| 70 | none = 0x09) |
| 71 | |
| 72 | gsmtap_subtype_wimax_burst_construct = Enum(Int8ub, |
| 73 | cdma_code = 0x10, |
| 74 | fch = 0x11, |
| 75 | ffb = 0x12, |
| 76 | pdu = 0x13, |
| 77 | hack = 0x14, |
| 78 | phy_attributes = 0x15) |
| 79 | |
| 80 | # GSMTAP_CHANNEL_* |
| 81 | gsmtap_subtype_um_construct = Enum(Int8ub, |
| 82 | unknown = 0x00, |
| 83 | bcch = 0x01, |
| 84 | ccch = 0x02, |
| 85 | rach = 0x03, |
| 86 | agch = 0x04, |
| 87 | pch = 0x05, |
| 88 | sdcch = 0x06, |
| 89 | sdcch4 = 0x07, |
| 90 | sdcch8 = 0x08, |
| 91 | facch_f = 0x09, |
| 92 | facch_h = 0x0a, |
| 93 | pacch = 0x0b, |
| 94 | cbch52 = 0x0c, |
| 95 | pdtch = 0x0d, |
| 96 | ptcch = 0x0e, |
| 97 | cbch51 = 0x0f, |
| 98 | voice_f = 0x10, |
| 99 | voice_h = 0x11) |
| 100 | |
| 101 | |
| 102 | # GSMTAP_SIM_* |
| 103 | gsmtap_subtype_sim_construct = Enum(Int8ub, |
| 104 | apdu = 0x00, |
| 105 | atr = 0x01, |
| 106 | pps_req = 0x02, |
| 107 | pps_rsp = 0x03, |
| 108 | tpdu_hdr = 0x04, |
| 109 | tpdu_cmd = 0x05, |
| 110 | tpdu_rsp = 0x06, |
| 111 | tpdu_sw = 0x07) |
| 112 | |
| 113 | gsmtap_subtype_tetra_construct = Enum(Int8ub, |
| 114 | bsch = 0x01, |
| 115 | aach = 0x02, |
| 116 | sch_hu = 0x03, |
| 117 | sch_hd = 0x04, |
| 118 | sch_f = 0x05, |
| 119 | bnch = 0x06, |
| 120 | stch = 0x07, |
| 121 | tch_f = 0x08, |
| 122 | dmo_sch_s = 0x09, |
| 123 | dmo_sch_h = 0x0a, |
| 124 | dmo_sch_f = 0x0b, |
| 125 | dmo_stch = 0x0c, |
| 126 | dmo_tch = 0x0d) |
| 127 | |
| 128 | gsmtap_subtype_gmr1_construct = Enum(Int8ub, |
| 129 | unknown = 0x00, |
| 130 | bcch = 0x01, |
| 131 | ccch = 0x02, |
| 132 | pch = 0x03, |
| 133 | agch = 0x04, |
| 134 | bach = 0x05, |
| 135 | rach = 0x06, |
| 136 | cbch = 0x07, |
| 137 | sdcch = 0x08, |
| 138 | tachh = 0x09, |
| 139 | gbch = 0x0a, |
| 140 | tch3 = 0x10, |
| 141 | tch6 = 0x14, |
| 142 | tch9 = 0x18) |
| 143 | |
| 144 | gsmtap_subtype_e1t1_construct = Enum(Int8ub, |
| 145 | lapd = 0x01, |
| 146 | fr = 0x02, |
| 147 | raw = 0x03, |
| 148 | trau16 = 0x04, |
| 149 | trau8 = 0x05) |
| 150 | |
| 151 | gsmtap_arfcn_construct = BitStruct('pcs'/Flag, 'uplink'/Flag, 'arfcn'/BitsInteger(14)) |
| 152 | |
| 153 | gsmtap_hdr_construct = Struct('version'/Int8ub, |
| 154 | 'hdr_len'/Int8ub, |
| 155 | 'type'/gsmtap_type_construct, |
| 156 | 'timeslot'/Int8ub, |
| 157 | 'arfcn'/gsmtap_arfcn_construct, |
| 158 | 'signal_dbm'/Int8sb, |
| 159 | 'snr_db'/Int8sb, |
| 160 | 'frame_nr'/Int32ub, |
| 161 | 'sub_type'/Switch(this.type, { |
| 162 | 'gsm_um': gsmtap_subtype_um_construct, |
| 163 | 'gsm_um_burst': gsmtap_subtype_burst_construct, |
| 164 | 'sim': gsmtap_subtype_sim_construct, |
| 165 | 'tetra_i1': gsmtap_subtype_tetra_construct, |
| 166 | 'tetra_i1_burst': gsmtap_subtype_tetra_construct, |
| 167 | 'wimax_burst': gsmtap_subtype_wimax_burst_construct, |
| 168 | 'gmr1_um': gsmtap_subtype_gmr1_construct, |
| 169 | 'e1_t1': gsmtap_subtype_e1t1_construct, |
| 170 | }), |
| 171 | 'antenna_nr'/Int8ub, |
| 172 | 'sub_slot'/Int8ub, |
| 173 | 'res'/Int8ub, |
| 174 | 'body'/GreedyBytes) |
| 175 | |
| 176 | osmocore_log_ts_construct = Struct('sec'/Int32ub, 'usec'/Int32ub) |
| 177 | osmocore_log_level_construct = Enum(Int8ub, debug=1, info=3, notice=5, error=7, fatal=8) |
| 178 | gsmtap_osmocore_log_hdr_construct = Struct('ts'/osmocore_log_ts_construct, |
| 179 | 'proc_name'/PaddedString(16, 'ascii'), |
| 180 | 'pid'/Int32ub, |
| 181 | 'level'/osmocore_log_level_construct, |
| 182 | Bytes(3), |
| 183 | 'subsys'/PaddedString(16, 'ascii'), |
| 184 | 'src_file'/Struct('name'/PaddedString(32, 'ascii'), 'line_nr'/Int32ub)) |
| 185 | |
| 186 | |
| 187 | class GsmtapMessage: |
| 188 | """Class whose objects represent a single GSMTAP message. Can encode and decode messages.""" |
| 189 | def __init__(self, encoded = None): |
| 190 | self.encoded = encoded |
| 191 | self.decoded = None |
| 192 | |
| 193 | def decode(self): |
| 194 | self.decoded = parse_construct(gsmtap_hdr_construct, self.encoded) |
| 195 | return self.decoded |
| 196 | |
| 197 | def encode(self, decoded): |
| 198 | self.encoded = gsmtap_hdr_construct.build(decoded) |
| 199 | return self.encoded |
| 200 | |
| 201 | class GsmtapSource: |
| 202 | def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729): |
| 203 | self.bind_ip = bind_ip |
| 204 | self.bind_port = bind_port |
| 205 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 206 | self.sock.bind((self.bind_ip, self.bind_port)) |
| 207 | |
| 208 | def read_packet(self) -> GsmtapMessage: |
| 209 | data, addr = self.sock.recvfrom(1024) |
| 210 | gsmtap_msg = GsmtapMessage(data) |
| 211 | gsmtap_msg.decode() |
| 212 | if gsmtap_msg.decoded['version'] != 0x02: |
| 213 | raise ValueError('Unknown GSMTAP version 0x%02x' % gsmtap_msg.decoded['version']) |
| 214 | return gsmtap_msg.decoded, addr |