blob: 48a7af7dc7e18153ecae1ea959eda67529ff6b0b [file] [log] [blame]
Harald Welte21caf322022-07-16 14:06:46 +02001# -*- coding: utf-8 -*-
2
3""" Osmocom GSMTAP python implementation.
4GSMTAP is a packet format used for conveying a number of different
5telecom-related protocol traces over UDP.
6"""
7
8#
9# Copyright (C) 2022 Harald Welte <laforge@gnumonks.org>
10#
11# This program is free software: you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation, either version 2 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program. If not, see <http://www.gnu.org/licenses/>.
23#
24
25import socket
26from typing import List, Dict, Optional
27from construct import Optional as COptional
28from construct import *
29from pySim.construct import *
30
31# The root definition of GSMTAP can be found at
32# https://cgit.osmocom.org/cgit/libosmocore/tree/include/osmocom/core/gsmtap.h
33
34GSMTAP_UDP_PORT = 4729
35
36# GSMTAP_TYPE_*
37gsmtap_type_construct = Enum(Int8ub,
38 gsm_um = 0x01,
39 gsm_abis = 0x02,
40 gsm_um_burst = 0x03,
41 sim = 0x04,
42 tetra_i1 = 0x05,
43 tetra_i1_burst = 0x06,
44 wimax_burst = 0x07,
45 gprs_gb_llc = 0x08,
46 gprs_gb_sndcp = 0x09,
47 gmr1_um = 0x0a,
48 umts_rlc_mac = 0x0b,
49 umts_rrc = 0x0c,
50 lte_rrc = 0x0d,
51 lte_mac = 0x0e,
52 lte_mac_framed = 0x0f,
53 osmocore_log = 0x10,
54 qc_diag = 0x11,
55 lte_nas = 0x12,
56 e1_t1 = 0x13)
57
58
59# TYPE_UM_BURST
60gsmtap_subtype_burst_construct = Enum(Int8ub,
61 unknown = 0x00,
62 fcch = 0x01,
63 partial_sch = 0x02,
64 sch = 0x03,
65 cts_sch = 0x04,
66 compact_sch = 0x05,
67 normal = 0x06,
68 dummy = 0x07,
69 access = 0x08,
70 none = 0x09)
71
72gsmtap_subtype_wimax_burst_construct = Enum(Int8ub,
73 cdma_code = 0x10,
74 fch = 0x11,
75 ffb = 0x12,
76 pdu = 0x13,
77 hack = 0x14,
78 phy_attributes = 0x15)
79
80# GSMTAP_CHANNEL_*
81gsmtap_subtype_um_construct = Enum(Int8ub,
82 unknown = 0x00,
83 bcch = 0x01,
84 ccch = 0x02,
85 rach = 0x03,
86 agch = 0x04,
87 pch = 0x05,
88 sdcch = 0x06,
89 sdcch4 = 0x07,
90 sdcch8 = 0x08,
91 facch_f = 0x09,
92 facch_h = 0x0a,
93 pacch = 0x0b,
94 cbch52 = 0x0c,
95 pdtch = 0x0d,
96 ptcch = 0x0e,
97 cbch51 = 0x0f,
98 voice_f = 0x10,
99 voice_h = 0x11)
100
101
102# GSMTAP_SIM_*
103gsmtap_subtype_sim_construct = Enum(Int8ub,
104 apdu = 0x00,
105 atr = 0x01,
106 pps_req = 0x02,
107 pps_rsp = 0x03,
108 tpdu_hdr = 0x04,
109 tpdu_cmd = 0x05,
110 tpdu_rsp = 0x06,
111 tpdu_sw = 0x07)
112
113gsmtap_subtype_tetra_construct = Enum(Int8ub,
114 bsch = 0x01,
115 aach = 0x02,
116 sch_hu = 0x03,
117 sch_hd = 0x04,
118 sch_f = 0x05,
119 bnch = 0x06,
120 stch = 0x07,
121 tch_f = 0x08,
122 dmo_sch_s = 0x09,
123 dmo_sch_h = 0x0a,
124 dmo_sch_f = 0x0b,
125 dmo_stch = 0x0c,
126 dmo_tch = 0x0d)
127
128gsmtap_subtype_gmr1_construct = Enum(Int8ub,
129 unknown = 0x00,
130 bcch = 0x01,
131 ccch = 0x02,
132 pch = 0x03,
133 agch = 0x04,
134 bach = 0x05,
135 rach = 0x06,
136 cbch = 0x07,
137 sdcch = 0x08,
138 tachh = 0x09,
139 gbch = 0x0a,
140 tch3 = 0x10,
141 tch6 = 0x14,
142 tch9 = 0x18)
143
144gsmtap_subtype_e1t1_construct = Enum(Int8ub,
145 lapd = 0x01,
146 fr = 0x02,
147 raw = 0x03,
148 trau16 = 0x04,
149 trau8 = 0x05)
150
151gsmtap_arfcn_construct = BitStruct('pcs'/Flag, 'uplink'/Flag, 'arfcn'/BitsInteger(14))
152
153gsmtap_hdr_construct = Struct('version'/Int8ub,
154 'hdr_len'/Int8ub,
155 'type'/gsmtap_type_construct,
156 'timeslot'/Int8ub,
157 'arfcn'/gsmtap_arfcn_construct,
158 'signal_dbm'/Int8sb,
159 'snr_db'/Int8sb,
160 'frame_nr'/Int32ub,
161 'sub_type'/Switch(this.type, {
162 'gsm_um': gsmtap_subtype_um_construct,
163 'gsm_um_burst': gsmtap_subtype_burst_construct,
164 'sim': gsmtap_subtype_sim_construct,
165 'tetra_i1': gsmtap_subtype_tetra_construct,
166 'tetra_i1_burst': gsmtap_subtype_tetra_construct,
167 'wimax_burst': gsmtap_subtype_wimax_burst_construct,
168 'gmr1_um': gsmtap_subtype_gmr1_construct,
169 'e1_t1': gsmtap_subtype_e1t1_construct,
170 }),
171 'antenna_nr'/Int8ub,
172 'sub_slot'/Int8ub,
173 'res'/Int8ub,
174 'body'/GreedyBytes)
175
176osmocore_log_ts_construct = Struct('sec'/Int32ub, 'usec'/Int32ub)
177osmocore_log_level_construct = Enum(Int8ub, debug=1, info=3, notice=5, error=7, fatal=8)
178gsmtap_osmocore_log_hdr_construct = Struct('ts'/osmocore_log_ts_construct,
179 'proc_name'/PaddedString(16, 'ascii'),
180 'pid'/Int32ub,
181 'level'/osmocore_log_level_construct,
182 Bytes(3),
183 'subsys'/PaddedString(16, 'ascii'),
184 'src_file'/Struct('name'/PaddedString(32, 'ascii'), 'line_nr'/Int32ub))
185
186
187class GsmtapMessage:
188 """Class whose objects represent a single GSMTAP message. Can encode and decode messages."""
189 def __init__(self, encoded = None):
190 self.encoded = encoded
191 self.decoded = None
192
193 def decode(self):
194 self.decoded = parse_construct(gsmtap_hdr_construct, self.encoded)
195 return self.decoded
196
197 def encode(self, decoded):
198 self.encoded = gsmtap_hdr_construct.build(decoded)
199 return self.encoded
200
201class GsmtapSource:
202 def __init__(self, bind_ip:str='127.0.0.1', bind_port:int=4729):
203 self.bind_ip = bind_ip
204 self.bind_port = bind_port
205 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
206 self.sock.bind((self.bind_ip, self.bind_port))
207
208 def read_packet(self) -> GsmtapMessage:
209 data, addr = self.sock.recvfrom(1024)
210 gsmtap_msg = GsmtapMessage(data)
211 gsmtap_msg.decode()
212 if gsmtap_msg.decoded['version'] != 0x02:
213 raise ValueError('Unknown GSMTAP version 0x%02x' % gsmtap_msg.decoded['version'])
214 return gsmtap_msg.decoded, addr