blob: 60d90a7c185d0fad0b5a82e814cc906eea77ad5e [file] [log] [blame]
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02001#!/usr/bin/env python
2
3# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +01004# (C) 2014 by Holger Hans Peter Freyther
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02005# based on vty_test_runner.py:
6# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
7# (C) 2013 by Holger Hans Peter Freyther
8# based on bsc_control.py.
9
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23import os
24import time
25import unittest
26import socket
27import sys
28import struct
29
30import osmopy.obscvty as obscvty
31import osmopy.osmoutil as osmoutil
32
33confpath = '.'
34verbose = False
35
36class TestCtrlBase(unittest.TestCase):
37
38 def ctrl_command(self):
39 raise Exception("Needs to be implemented by a subclass")
40
41 def ctrl_app(self):
42 raise Exception("Needs to be implemented by a subclass")
43
44 def setUp(self):
45 osmo_ctrl_cmd = self.ctrl_command()[:]
46 config_index = osmo_ctrl_cmd.index('-c')
47 if config_index:
48 cfi = config_index + 1
49 osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
50
51 try:
52 print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd())
53 self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
54 except OSError:
55 print >> sys.stderr, "Current directory: %s" % os.getcwd()
56 print >> sys.stderr, "Consider setting -b"
57 time.sleep(2)
58
59 appstring = self.ctrl_app()[2]
60 appport = self.ctrl_app()[0]
61 self.connect("127.0.0.1", appport)
62 self.next_id = 1000
63
64 def tearDown(self):
65 self.disconnect()
66 osmoutil.end_proc(self.proc)
67
68 def prefix_ipa_ctrl_header(self, data):
69 return struct.pack(">HBB", len(data)+1, 0xee, 0) + data
70
71 def remove_ipa_ctrl_header(self, data):
72 if (len(data) < 4):
73 raise BaseException("Answer too short!")
74 (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
75 if (plen + 3 > len(data)):
76 print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
77 if (ipa_proto != 0xee or osmo_proto != 0):
78 raise BaseException("Wrong protocol in answer!")
79
80 return data[4:plen+3], data[plen+3:]
81
82 def disconnect(self):
83 if not (self.sock is None):
84 self.sock.close()
85
86 def connect(self, host, port):
87 if verbose:
88 print "Connecting to host %s:%i" % (host, port)
89
90 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91 sck.setblocking(1)
92 sck.connect((host, port))
93 self.sock = sck
94 return sck
95
96 def send(self, data):
97 if verbose:
98 print "Sending \"%s\"" %(data)
99 data = self.prefix_ipa_ctrl_header(data)
100 return self.sock.send(data) == len(data)
101
102 def send_set(self, var, value, id):
103 setmsg = "SET %s %s %s" %(id, var, value)
104 return self.send(setmsg)
105
106 def send_get(self, var, id):
107 getmsg = "GET %s %s" %(id, var)
108 return self.send(getmsg)
109
110 def do_set(self, var, value):
111 id = self.next_id
112 self.next_id += 1
113 self.send_set(var, value, id)
114 return self.recv_msgs()[id]
115
116 def do_get(self, var):
117 id = self.next_id
118 self.next_id += 1
119 self.send_get(var, id)
120 return self.recv_msgs()[id]
121
122 def recv_msgs(self):
123 responses = {}
124 data = self.sock.recv(4096)
125 while (len(data)>0):
126 (answer, data) = self.remove_ipa_ctrl_header(data)
127 if verbose:
128 print "Got message:", answer
129 (mtype, id, msg) = answer.split(None, 2)
130 id = int(id)
131 rsp = {'mtype': mtype, 'id': id}
132 if mtype == "ERROR":
133 rsp['error'] = msg
134 else:
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100135 split = msg.split(None, 1)
136 rsp['var'] = split[0]
137 if len(split) > 1:
138 rsp['value'] = split[1]
139 else:
140 rsp['value'] = None
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200141 responses[id] = rsp
142
143 if verbose:
144 print "Decoded replies: ", responses
145
146 return responses
147
148
149class TestCtrlBSC(TestCtrlBase):
150
151 def tearDown(self):
152 TestCtrlBase.tearDown(self)
153 os.unlink("tmp_dummy_sock")
154
155 def ctrl_command(self):
156 return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
157 "doc/examples/osmo-bsc/osmo-bsc.cfg"]
158
159 def ctrl_app(self):
160 return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
161
162 def testCtrlErrs(self):
163 r = self.do_get('invalid')
164 self.assertEquals(r['mtype'], 'ERROR')
165 self.assertEquals(r['error'], 'Command not found')
166
Jacob Erlbeck21f6d152013-09-16 11:20:29 +0200167 r = self.do_set('rf_locked', '999')
168 self.assertEquals(r['mtype'], 'ERROR')
169 self.assertEquals(r['error'], 'Value failed verification.')
170
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200171 r = self.do_get('bts')
172 self.assertEquals(r['mtype'], 'ERROR')
173 self.assertEquals(r['error'], 'Error while parsing the index.')
174
175 r = self.do_get('bts.999')
176 self.assertEquals(r['mtype'], 'ERROR')
177 self.assertEquals(r['error'], 'Error while resolving object')
178
Holger Hans Peter Freyther2de46892014-03-23 11:17:27 +0100179 def testTrxPowerRed(self):
180 r = self.do_get('bts.0.trx.0.max-power-reduction')
181 self.assertEquals(r['mtype'], 'GET_REPLY')
182 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
183 self.assertEquals(r['value'], '20')
184
185 r = self.do_set('bts.0.trx.0.max-power-reduction', '22')
186 self.assertEquals(r['mtype'], 'SET_REPLY')
187 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
188 self.assertEquals(r['value'], '22')
189
190 r = self.do_get('bts.0.trx.0.max-power-reduction')
191 self.assertEquals(r['mtype'], 'GET_REPLY')
192 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
193 self.assertEquals(r['value'], '22')
194
195 r = self.do_set('bts.0.trx.0.max-power-reduction', '1')
196 self.assertEquals(r['mtype'], 'ERROR')
197 self.assertEquals(r['error'], 'Value must be even')
198
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200199 def testRfLock(self):
200 r = self.do_get('bts.0.rf_state')
201 self.assertEquals(r['mtype'], 'GET_REPLY')
202 self.assertEquals(r['var'], 'bts.0.rf_state')
203 self.assertEquals(r['value'], 'inoperational,unlocked,on')
204
205 r = self.do_set('rf_locked', '1')
206 self.assertEquals(r['mtype'], 'SET_REPLY')
207 self.assertEquals(r['var'], 'rf_locked')
208 self.assertEquals(r['value'], '1')
209
210 time.sleep(1.5)
211
212 r = self.do_get('bts.0.rf_state')
213 self.assertEquals(r['mtype'], 'GET_REPLY')
214 self.assertEquals(r['var'], 'bts.0.rf_state')
215 self.assertEquals(r['value'], 'inoperational,locked,off')
216
217 r = self.do_set('rf_locked', '0')
218 self.assertEquals(r['mtype'], 'SET_REPLY')
219 self.assertEquals(r['var'], 'rf_locked')
220 self.assertEquals(r['value'], '0')
221
222 time.sleep(1.5)
223
224 r = self.do_get('bts.0.rf_state')
225 self.assertEquals(r['mtype'], 'GET_REPLY')
226 self.assertEquals(r['var'], 'bts.0.rf_state')
227 self.assertEquals(r['value'], 'inoperational,unlocked,on')
228
Jacob Erlbeck64e143a2013-10-01 13:26:42 +0200229 def testTimezone(self):
230 r = self.do_get('bts.0.timezone')
231 self.assertEquals(r['mtype'], 'GET_REPLY')
232 self.assertEquals(r['var'], 'bts.0.timezone')
233 self.assertEquals(r['value'], 'off')
234
235 r = self.do_set('bts.0.timezone', '-2,15,2')
236 self.assertEquals(r['mtype'], 'SET_REPLY')
237 self.assertEquals(r['var'], 'bts.0.timezone')
238 self.assertEquals(r['value'], '-2,15,2')
239
240 r = self.do_get('bts.0.timezone')
241 self.assertEquals(r['mtype'], 'GET_REPLY')
242 self.assertEquals(r['var'], 'bts.0.timezone')
243 self.assertEquals(r['value'], '-2,15,2')
244
245 # Test invalid input
246 r = self.do_set('bts.0.timezone', '-2,15,2,5,6,7')
247 self.assertEquals(r['mtype'], 'SET_REPLY')
248 self.assertEquals(r['var'], 'bts.0.timezone')
249 self.assertEquals(r['value'], '-2,15,2')
250
251 r = self.do_set('bts.0.timezone', '-2,15')
252 self.assertEquals(r['mtype'], 'ERROR')
253 r = self.do_set('bts.0.timezone', '-2')
254 self.assertEquals(r['mtype'], 'ERROR')
255 r = self.do_set('bts.0.timezone', '1')
256
257 r = self.do_set('bts.0.timezone', 'off')
258 self.assertEquals(r['mtype'], 'SET_REPLY')
259 self.assertEquals(r['var'], 'bts.0.timezone')
260 self.assertEquals(r['value'], 'off')
261
262 r = self.do_get('bts.0.timezone')
263 self.assertEquals(r['mtype'], 'GET_REPLY')
264 self.assertEquals(r['var'], 'bts.0.timezone')
265 self.assertEquals(r['value'], 'off')
266
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100267 def testMccMncApply(self):
268 # Test some invalid input
269 r = self.do_set('mcc-mnc-apply', 'WRONG')
270 self.assertEquals(r['mtype'], 'ERROR')
271
272 r = self.do_set('mcc-mnc-apply', '1,')
273 self.assertEquals(r['mtype'], 'ERROR')
274
275 r = self.do_set('mcc-mnc-apply', '200,3')
276 self.assertEquals(r['mtype'], 'SET_REPLY')
277 self.assertEquals(r['var'], 'mcc-mnc-apply')
278 self.assertEquals(r['value'], 'Tried to drop the BTS')
279
280 # Set it again
281 r = self.do_set('mcc-mnc-apply', '200,3')
282 self.assertEquals(r['mtype'], 'SET_REPLY')
283 self.assertEquals(r['var'], 'mcc-mnc-apply')
284 self.assertEquals(r['value'], 'Nothing changed')
285
286 # Change it
287 r = self.do_set('mcc-mnc-apply', '200,4')
288 self.assertEquals(r['mtype'], 'SET_REPLY')
289 self.assertEquals(r['var'], 'mcc-mnc-apply')
290 self.assertEquals(r['value'], 'Tried to drop the BTS')
291
292 # Change it
293 r = self.do_set('mcc-mnc-apply', '201,4')
294 self.assertEquals(r['mtype'], 'SET_REPLY')
295 self.assertEquals(r['var'], 'mcc-mnc-apply')
296 self.assertEquals(r['value'], 'Tried to drop the BTS')
297
298 # Verify
299 r = self.do_get('mnc')
300 self.assertEquals(r['mtype'], 'GET_REPLY')
301 self.assertEquals(r['var'], 'mnc')
302 self.assertEquals(r['value'], '4')
303
304 r = self.do_get('mcc')
305 self.assertEquals(r['mtype'], 'GET_REPLY')
306 self.assertEquals(r['var'], 'mcc')
307 self.assertEquals(r['value'], '201')
308
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100309class TestCtrlNITB(TestCtrlBase):
310
311 def tearDown(self):
312 TestCtrlBase.tearDown(self)
313 os.unlink("test_hlr.sqlite3")
314
315 def ctrl_command(self):
316 return ["./src/osmo-nitb/osmo-nitb", "-c",
317 "doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-l", "test_hlr.sqlite3"]
318
319 def ctrl_app(self):
320 return (4249, "./src/osmo-nitb/osmo-nitb", "OsmoBSC", "nitb")
321
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100322 def testSubscriberAddRemove(self):
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100323 r = self.do_set('subscriber-modify-v1', '2620345,445566')
324 self.assertEquals(r['mtype'], 'SET_REPLY')
325 self.assertEquals(r['var'], 'subscriber-modify-v1')
326 self.assertEquals(r['value'], 'OK')
327
328 r = self.do_set('subscriber-modify-v1', '2620345,445567')
329 self.assertEquals(r['mtype'], 'SET_REPLY')
330 self.assertEquals(r['var'], 'subscriber-modify-v1')
331 self.assertEquals(r['value'], 'OK')
332
333 # TODO. verify that the entry has been created and modified? Invoke
334 # the sqlite3 CLI or do it through the DB libraries?
335
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100336 r = self.do_set('subscriber-delete-v1', '2620345')
337 self.assertEquals(r['mtype'], 'SET_REPLY')
338 self.assertEquals(r['value'], 'Removed')
339
340 r = self.do_set('subscriber-delete-v1', '2620345')
341 self.assertEquals(r['mtype'], 'ERROR')
342 self.assertEquals(r['error'], 'Failed to find subscriber')
343
Holger Hans Peter Freytherd41b7b72014-03-23 16:22:55 +0100344 def testSubscriberList(self):
345 # TODO. Add command to mark a subscriber as active
346 r = self.do_get('subscriber-list-active-v1')
347 self.assertEquals(r['mtype'], 'GET_REPLY')
348 self.assertEquals(r['var'], 'subscriber-list-active-v1')
349 self.assertEquals(r['value'], None)
350
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100351class TestCtrlNAT(TestCtrlBase):
352
353 def ctrl_command(self):
354 return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c",
355 "doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"]
356
357 def ctrl_app(self):
358 return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat")
359
360 def testAccessList(self):
361 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
362 self.assertEquals(r['mtype'], 'GET_REPLY')
363 self.assertEquals(r['var'], 'net')
364 self.assertEquals(r['value'], None)
365
366 r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla')
367 self.assertEquals(r['mtype'], 'SET_REPLY')
368 self.assertEquals(r['var'], 'net')
369 self.assertEquals(r['value'], 'bla')
370
371 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
372 self.assertEquals(r['mtype'], 'GET_REPLY')
373 self.assertEquals(r['var'], 'net')
374 self.assertEquals(r['value'], 'bla')
375
376 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
377 self.assertEquals(r['mtype'], 'SET_REPLY')
378 self.assertEquals(r['var'], 'net')
379 self.assertEquals(r['value'], None)
380
381 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
382 self.assertEquals(r['mtype'], 'SET_REPLY')
383 self.assertEquals(r['var'], 'net')
384 self.assertEquals(r['value'], None)
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100385
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200386def add_bsc_test(suite, workdir):
387 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
388 print("Skipping the BSC test")
389 return
390 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
391 suite.addTest(test)
392
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100393def add_nitb_test(suite, workdir):
394 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNITB)
395 suite.addTest(test)
396
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100397def add_nat_test(suite, workdir):
398 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")):
399 print("Skipping the NAT test")
400 return
401 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT)
402 suite.addTest(test)
403
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200404if __name__ == '__main__':
405 import argparse
406 import sys
407
408 workdir = '.'
409
410 parser = argparse.ArgumentParser()
411 parser.add_argument("-v", "--verbose", dest="verbose",
412 action="store_true", help="verbose mode")
413 parser.add_argument("-p", "--pythonconfpath", dest="p",
414 help="searchpath for config")
415 parser.add_argument("-w", "--workdir", dest="w",
416 help="Working directory")
417 args = parser.parse_args()
418
419 verbose_level = 1
420 if args.verbose:
421 verbose_level = 2
422 verbose = True
423
424 if args.w:
425 workdir = args.w
426
427 if args.p:
428 confpath = args.p
429
430 print "confpath %s, workdir %s" % (confpath, workdir)
431 os.chdir(workdir)
432 print "Running tests for specific control commands"
433 suite = unittest.TestSuite()
434 add_bsc_test(suite, workdir)
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100435 add_nitb_test(suite, workdir)
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100436 add_nat_test(suite, workdir)
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200437 res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
438 sys.exit(len(res.errors) + len(res.failures))