blob: 71cbf45a4671038a87d59fb270b7b37113ba2599 [file] [log] [blame]
Maxc3b94f92016-11-14 18:46:40 +01001#!/usr/bin/python3
2# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
3"""
4/*
5 * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
6 *
7 * All Rights Reserved
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 */
23"""
24
25import struct, random, sys
26
27class IPA(object):
28 """
29 Stateless IPA protocol multiplexer: add/remove/parse (extended) header
30 """
31 version = "0.0.5"
32 TCP_PORT_OML = 3002
33 TCP_PORT_RSL = 3003
34 # OpenBSC extensions: OSMO, MGCP_OLD
35 PROTO = dict(RSL=0x00, CCM=0xFE, SCCP=0xFD, OML=0xFF, OSMO=0xEE, MGCP_OLD=0xFC)
36 # ...OML Router Control, GSUP GPRS extension, Osmocom Authn Protocol
37 EXT = dict(CTRL=0, MGCP=1, LAC=2, SMSC=3, ORC=4, GSUP=5, OAP=6)
38 # OpenBSC extension: SCCP_OLD
39 MSGT = dict(PING=0x00, PONG=0x01, ID_GET=0x04, ID_RESP=0x05, ID_ACK=0x06, SCCP_OLD=0xFF)
40 _IDTAG = dict(SERNR=0, UNITNAME=1, LOCATION=2, TYPE=3, EQUIPVERS=4, SWVERSION=5, IPADDR=6, MACADDR=7, UNIT=8)
41 CTRL_GET = 'GET'
42 CTRL_SET = 'SET'
43 CTRL_REP = 'REPLY'
44 CTRL_ERR = 'ERR'
45 CTRL_TRAP = 'TRAP'
46
47 def _l(self, d, p):
48 """
49 Reverse dictionary lookup: return key for a given value
50 """
51 if p is None:
52 return 'UNKNOWN'
53 return list(d.keys())[list(d.values()).index(p)]
54
55 def _tag(self, t, v):
56 """
57 Create TAG as TLV data
58 """
59 return struct.pack(">HB", len(v) + 1, t) + v
60
61 def proto(self, p):
62 """
63 Lookup protocol name
64 """
65 return self._l(self.PROTO, p)
66
67 def ext(self, p):
68 """
69 Lookup protocol extension name
70 """
71 return self._l(self.EXT, p)
72
73 def msgt(self, p):
74 """
75 Lookup message type name
76 """
77 return self._l(self.MSGT, p)
78
79 def idtag(self, p):
80 """
81 Lookup ID tag name
82 """
83 return self._l(self._IDTAG, p)
84
85 def ext_name(self, proto, exten):
86 """
87 Return proper extension byte name depending on the protocol used
88 """
89 if self.PROTO['CCM'] == proto:
90 return self.msgt(exten)
91 if self.PROTO['OSMO'] == proto:
92 return self.ext(exten)
93 return None
94
95 def add_header(self, data, proto, ext=None):
96 """
97 Add IPA header (with extension if necessary), data must be represented as bytes
98 """
99 if ext is None:
100 return struct.pack(">HB", len(data) + 1, proto) + data
101 return struct.pack(">HBB", len(data) + 1, proto, ext) + data
102
103 def del_header(self, data):
104 """
105 Strip IPA protocol header correctly removing extension if present
106 Returns data length, IPA protocol, extension (or None if not defined for a give protocol) and the data without header
107 """
108 if not len(data):
109 return None, None, None, None
110 (dlen, proto) = struct.unpack('>HB', data[:3])
111 if self.PROTO['OSMO'] == proto or self.PROTO['CCM'] == proto: # there's extension which we have to unpack
112 return struct.unpack('>HBB', data[:4]) + (data[4:], ) # length, protocol, extension, data
113 return dlen, proto, None, data[3:] # length, protocol, _, data
114
115 def split_combined(self, data):
116 """
117 Split the data which contains multiple concatenated IPA messages into tuple (first, rest) where rest contains remaining messages, first is the single IPA message
118 """
119 (length, _, _, _) = self.del_header(data)
120 return data[:(length + 3)], data[(length + 3):]
121
122 def tag_serial(self, data):
123 """
124 Make TAG for serial number
125 """
126 return self._tag(self._IDTAG['SERNR'], data)
127
128 def tag_name(self, data):
129 """
130 Make TAG for unit name
131 """
132 return self._tag(self._IDTAG['UNITNAME'], data)
133
134 def tag_loc(self, data):
135 """
136 Make TAG for location
137 """
138 return self._tag(self._IDTAG['LOCATION'], data)
139
140 def tag_type(self, data):
141 """
142 Make TAG for unit type
143 """
144 return self._tag(self._IDTAG['TYPE'], data)
145
146 def tag_equip(self, data):
147 """
148 Make TAG for equipment version
149 """
150 return self._tag(self._IDTAG['EQUIPVERS'], data)
151
152 def tag_sw(self, data):
153 """
154 Make TAG for software version
155 """
156 return self._tag(self._IDTAG['SWVERSION'], data)
157
158 def tag_ip(self, data):
159 """
160 Make TAG for IP address
161 """
162 return self._tag(self._IDTAG['IPADDR'], data)
163
164 def tag_mac(self, data):
165 """
166 Make TAG for MAC address
167 """
168 return self._tag(self._IDTAG['MACADDR'], data)
169
170 def tag_unit(self, data):
171 """
172 Make TAG for unit ID
173 """
174 return self._tag(self._IDTAG['UNIT'], data)
175
176 def identity(self, unit=b'', mac=b'', location=b'', utype=b'', equip=b'', sw=b'', name=b'', serial=b''):
177 """
178 Make IPA IDENTITY tag list, by default returns empty concatenated bytes of tag list
179 """
180 return self.tag_unit(unit) + self.tag_mac(mac) + self.tag_loc(location) + self.tag_type(utype) + self.tag_equip(equip) + self.tag_sw(sw) + self.tag_name(name) + self.tag_serial(serial)
181
182 def ping(self):
183 """
184 Make PING message
185 """
186 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PING'])
187
188 def pong(self):
189 """
190 Make PONG message
191 """
192 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PONG'])
193
194 def id_ack(self):
195 """
196 Make ID_ACK CCM message
197 """
198 return self.add_header(b'', self.PROTO['CCM'], self.MSGT['ID_ACK'])
199
200 def id_get(self):
201 """
202 Make ID_GET CCM message
203 """
204 return self.add_header(self.identity(), self.PROTO['CCM'], self.MSGT['ID_GET'])
205
206 def id_resp(self, data):
207 """
208 Make ID_RESP CCM message
209 """
210 return self.add_header(data, self.PROTO['CCM'], self.MSGT['ID_RESP'])
211
212class Ctrl(IPA):
213 """
214 Osmocom CTRL protocol implemented on top of IPA multiplexer
215 """
216 def __init__(self):
217 random.seed()
218
219 def add_header(self, data):
220 """
221 Add CTRL header
222 """
223 return super(Ctrl, self).add_header(data.encode('utf-8'), IPA.PROTO['OSMO'], IPA.EXT['CTRL'])
224
225 def rem_header(self, data):
226 """
227 Remove CTRL header, check for appropriate protocol and extension
228 """
229 (_, proto, ext, d) = super(Ctrl, self).del_header(data)
230 if self.PROTO['OSMO'] != proto or self.EXT['CTRL'] != ext:
231 return None
232 return d
233
234 def parse(self, data, op=None):
235 """
236 Parse Ctrl string returning (var, value) pair
237 var could be None in case of ERROR message
238 value could be None in case of GET message
239 """
240 (s, i, v) = data.split(' ', 2)
241 if s == self.CTRL_ERR:
242 return None, v
243 if s == self.CTRL_GET:
244 return v, None
245 (s, i, var, val) = data.split(' ', 3)
246 if s == self.CTRL_TRAP and i != '0':
247 return None, '%s with non-zero id %s' % (s, i)
248 if op is not None and i != op:
249 if s == self.CTRL_GET + '_' + self.CTRL_REP or s == self.CTRL_SET + '_' + self.CTRL_REP:
250 return None, '%s with unexpected id %s' % (s, i)
251 return var, val
252
253 def trap(self, var, val):
254 """
255 Make TRAP message with given (vak, val) pair
256 """
257 return self.add_header("%s 0 %s %s" % (self.CTRL_TRAP, var, val))
258
259 def cmd(self, var, val=None):
260 """
261 Make SET/GET command message: returns (r, m) tuple where r is random operation id and m is assembled message
262 """
263 r = random.randint(1, sys.maxsize)
264 if val is not None:
265 return r, self.add_header("%s %s %s %s" % (self.CTRL_SET, r, var, val))
266 return r, self.add_header("%s %s %s" % (self.CTRL_GET, r, var))
267
268 def verify(self, reply, r, var, val=None):
269 """
270 Verify reply to SET/GET command: returns (b, v) tuple where v is True/False verification result and v is the variable value
271 """
272 (k, v) = self.parse(reply)
273 if k != var or (val is not None and v != val):
274 return False, v
275 return True, v
276
277if __name__ == '__main__':
278 print("IPA multiplexer v%s loaded." % IPA.version)