blob: 65e4003cf0ae188cf0ea2b2a1cd1189ee82785e0 [file] [log] [blame]
Maxc3b94f92016-11-14 18:46:40 +01001#!/usr/bin/python3
2# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
3"""
4/*
5 * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
6 *
7 * All Rights Reserved
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
Maxc3b94f92016-11-14 18:46:40 +010018 */
19"""
20
21import struct, random, sys
22
23class IPA(object):
24 """
25 Stateless IPA protocol multiplexer: add/remove/parse (extended) header
26 """
27 version = "0.0.5"
28 TCP_PORT_OML = 3002
29 TCP_PORT_RSL = 3003
30 # OpenBSC extensions: OSMO, MGCP_OLD
31 PROTO = dict(RSL=0x00, CCM=0xFE, SCCP=0xFD, OML=0xFF, OSMO=0xEE, MGCP_OLD=0xFC)
32 # ...OML Router Control, GSUP GPRS extension, Osmocom Authn Protocol
33 EXT = dict(CTRL=0, MGCP=1, LAC=2, SMSC=3, ORC=4, GSUP=5, OAP=6)
34 # OpenBSC extension: SCCP_OLD
35 MSGT = dict(PING=0x00, PONG=0x01, ID_GET=0x04, ID_RESP=0x05, ID_ACK=0x06, SCCP_OLD=0xFF)
36 _IDTAG = dict(SERNR=0, UNITNAME=1, LOCATION=2, TYPE=3, EQUIPVERS=4, SWVERSION=5, IPADDR=6, MACADDR=7, UNIT=8)
37 CTRL_GET = 'GET'
38 CTRL_SET = 'SET'
39 CTRL_REP = 'REPLY'
40 CTRL_ERR = 'ERR'
41 CTRL_TRAP = 'TRAP'
42
43 def _l(self, d, p):
44 """
45 Reverse dictionary lookup: return key for a given value
46 """
47 if p is None:
48 return 'UNKNOWN'
49 return list(d.keys())[list(d.values()).index(p)]
50
51 def _tag(self, t, v):
52 """
53 Create TAG as TLV data
54 """
55 return struct.pack(">HB", len(v) + 1, t) + v
56
57 def proto(self, p):
58 """
59 Lookup protocol name
60 """
61 return self._l(self.PROTO, p)
62
63 def ext(self, p):
64 """
65 Lookup protocol extension name
66 """
67 return self._l(self.EXT, p)
68
69 def msgt(self, p):
70 """
71 Lookup message type name
72 """
73 return self._l(self.MSGT, p)
74
75 def idtag(self, p):
76 """
77 Lookup ID tag name
78 """
79 return self._l(self._IDTAG, p)
80
81 def ext_name(self, proto, exten):
82 """
83 Return proper extension byte name depending on the protocol used
84 """
85 if self.PROTO['CCM'] == proto:
86 return self.msgt(exten)
87 if self.PROTO['OSMO'] == proto:
88 return self.ext(exten)
89 return None
90
91 def add_header(self, data, proto, ext=None):
92 """
93 Add IPA header (with extension if necessary), data must be represented as bytes
94 """
95 if ext is None:
96 return struct.pack(">HB", len(data) + 1, proto) + data
97 return struct.pack(">HBB", len(data) + 1, proto, ext) + data
98
99 def del_header(self, data):
100 """
101 Strip IPA protocol header correctly removing extension if present
102 Returns data length, IPA protocol, extension (or None if not defined for a give protocol) and the data without header
103 """
104 if not len(data):
105 return None, None, None, None
106 (dlen, proto) = struct.unpack('>HB', data[:3])
107 if self.PROTO['OSMO'] == proto or self.PROTO['CCM'] == proto: # there's extension which we have to unpack
108 return struct.unpack('>HBB', data[:4]) + (data[4:], ) # length, protocol, extension, data
109 return dlen, proto, None, data[3:] # length, protocol, _, data
110
111 def split_combined(self, data):
112 """
113 Split the data which contains multiple concatenated IPA messages into tuple (first, rest) where rest contains remaining messages, first is the single IPA message
114 """
115 (length, _, _, _) = self.del_header(data)
116 return data[:(length + 3)], data[(length + 3):]
117
118 def tag_serial(self, data):
119 """
120 Make TAG for serial number
121 """
122 return self._tag(self._IDTAG['SERNR'], data)
123
124 def tag_name(self, data):
125 """
126 Make TAG for unit name
127 """
128 return self._tag(self._IDTAG['UNITNAME'], data)
129
130 def tag_loc(self, data):
131 """
132 Make TAG for location
133 """
134 return self._tag(self._IDTAG['LOCATION'], data)
135
136 def tag_type(self, data):
137 """
138 Make TAG for unit type
139 """
140 return self._tag(self._IDTAG['TYPE'], data)
141
142 def tag_equip(self, data):
143 """
144 Make TAG for equipment version
145 """
146 return self._tag(self._IDTAG['EQUIPVERS'], data)
147
148 def tag_sw(self, data):
149 """
150 Make TAG for software version
151 """
152 return self._tag(self._IDTAG['SWVERSION'], data)
153
154 def tag_ip(self, data):
155 """
156 Make TAG for IP address
157 """
158 return self._tag(self._IDTAG['IPADDR'], data)
159
160 def tag_mac(self, data):
161 """
162 Make TAG for MAC address
163 """
164 return self._tag(self._IDTAG['MACADDR'], data)
165
166 def tag_unit(self, data):
167 """
168 Make TAG for unit ID
169 """
170 return self._tag(self._IDTAG['UNIT'], data)
171
172 def identity(self, unit=b'', mac=b'', location=b'', utype=b'', equip=b'', sw=b'', name=b'', serial=b''):
173 """
174 Make IPA IDENTITY tag list, by default returns empty concatenated bytes of tag list
175 """
176 return self.tag_unit(unit) + self.tag_mac(mac) + self.tag_loc(location) + self.tag_type(utype) + self.tag_equip(equip) + self.tag_sw(sw) + self.tag_name(name) + self.tag_serial(serial)
177
178 def ping(self):
179 """
180 Make PING message
181 """
182 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PING'])
183
184 def pong(self):
185 """
186 Make PONG message
187 """
188 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PONG'])
189
190 def id_ack(self):
191 """
192 Make ID_ACK CCM message
193 """
194 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['ID_ACK'])
195
196 def id_get(self):
197 """
198 Make ID_GET CCM message
199 """
200 return self.add_header(self.identity(), self.PROTO['CCM'], self.MSGT['ID_GET'])
201
202 def id_resp(self, data):
203 """
204 Make ID_RESP CCM message
205 """
206 return self.add_header(data, self.PROTO['CCM'], self.MSGT['ID_RESP'])
207
208class Ctrl(IPA):
209 """
210 Osmocom CTRL protocol implemented on top of IPA multiplexer
211 """
212 def __init__(self):
213 random.seed()
214
215 def add_header(self, data):
216 """
217 Add CTRL header
218 """
219 return super(Ctrl, self).add_header(data.encode('utf-8'), IPA.PROTO['OSMO'], IPA.EXT['CTRL'])
220
221 def rem_header(self, data):
222 """
223 Remove CTRL header, check for appropriate protocol and extension
224 """
225 (_, proto, ext, d) = super(Ctrl, self).del_header(data)
226 if self.PROTO['OSMO'] != proto or self.EXT['CTRL'] != ext:
227 return None
228 return d
229
230 def parse(self, data, op=None):
231 """
232 Parse Ctrl string returning (var, value) pair
233 var could be None in case of ERROR message
234 value could be None in case of GET message
235 """
236 (s, i, v) = data.split(' ', 2)
237 if s == self.CTRL_ERR:
238 return None, v
239 if s == self.CTRL_GET:
240 return v, None
241 (s, i, var, val) = data.split(' ', 3)
242 if s == self.CTRL_TRAP and i != '0':
243 return None, '%s with non-zero id %s' % (s, i)
244 if op is not None and i != op:
245 if s == self.CTRL_GET + '_' + self.CTRL_REP or s == self.CTRL_SET + '_' + self.CTRL_REP:
246 return None, '%s with unexpected id %s' % (s, i)
247 return var, val
248
249 def trap(self, var, val):
250 """
251 Make TRAP message with given (vak, val) pair
252 """
253 return self.add_header("%s 0 %s %s" % (self.CTRL_TRAP, var, val))
254
255 def cmd(self, var, val=None):
256 """
257 Make SET/GET command message: returns (r, m) tuple where r is random operation id and m is assembled message
258 """
259 r = random.randint(1, sys.maxsize)
260 if val is not None:
261 return r, self.add_header("%s %s %s %s" % (self.CTRL_SET, r, var, val))
262 return r, self.add_header("%s %s %s" % (self.CTRL_GET, r, var))
263
264 def verify(self, reply, r, var, val=None):
265 """
266 Verify reply to SET/GET command: returns (b, v) tuple where v is True/False verification result and v is the variable value
267 """
268 (k, v) = self.parse(reply)
269 if k != var or (val is not None and v != val):
270 return False, v
271 return True, v
272
273if __name__ == '__main__':
274 print("IPA multiplexer v%s loaded." % IPA.version)