blob: b50e93c515f1b6e9f66414178e3db0df22efd3aa [file] [log] [blame]
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02001#!/usr/bin/env python
2
3# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +01004# (C) 2014 by Holger Hans Peter Freyther
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02005# based on vty_test_runner.py:
6# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
7# (C) 2013 by Holger Hans Peter Freyther
8# based on bsc_control.py.
9
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23import os
24import time
25import unittest
26import socket
27import sys
28import struct
29
30import osmopy.obscvty as obscvty
31import osmopy.osmoutil as osmoutil
32
33confpath = '.'
34verbose = False
35
36class TestCtrlBase(unittest.TestCase):
37
38 def ctrl_command(self):
39 raise Exception("Needs to be implemented by a subclass")
40
41 def ctrl_app(self):
42 raise Exception("Needs to be implemented by a subclass")
43
44 def setUp(self):
45 osmo_ctrl_cmd = self.ctrl_command()[:]
46 config_index = osmo_ctrl_cmd.index('-c')
47 if config_index:
48 cfi = config_index + 1
49 osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
50
51 try:
52 print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd())
53 self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
54 except OSError:
55 print >> sys.stderr, "Current directory: %s" % os.getcwd()
56 print >> sys.stderr, "Consider setting -b"
57 time.sleep(2)
58
59 appstring = self.ctrl_app()[2]
60 appport = self.ctrl_app()[0]
61 self.connect("127.0.0.1", appport)
62 self.next_id = 1000
63
64 def tearDown(self):
65 self.disconnect()
66 osmoutil.end_proc(self.proc)
67
68 def prefix_ipa_ctrl_header(self, data):
69 return struct.pack(">HBB", len(data)+1, 0xee, 0) + data
70
71 def remove_ipa_ctrl_header(self, data):
72 if (len(data) < 4):
73 raise BaseException("Answer too short!")
74 (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
75 if (plen + 3 > len(data)):
76 print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
77 if (ipa_proto != 0xee or osmo_proto != 0):
78 raise BaseException("Wrong protocol in answer!")
79
80 return data[4:plen+3], data[plen+3:]
81
82 def disconnect(self):
83 if not (self.sock is None):
84 self.sock.close()
85
86 def connect(self, host, port):
87 if verbose:
88 print "Connecting to host %s:%i" % (host, port)
89
90 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91 sck.setblocking(1)
92 sck.connect((host, port))
93 self.sock = sck
94 return sck
95
96 def send(self, data):
97 if verbose:
98 print "Sending \"%s\"" %(data)
99 data = self.prefix_ipa_ctrl_header(data)
100 return self.sock.send(data) == len(data)
101
102 def send_set(self, var, value, id):
103 setmsg = "SET %s %s %s" %(id, var, value)
104 return self.send(setmsg)
105
106 def send_get(self, var, id):
107 getmsg = "GET %s %s" %(id, var)
108 return self.send(getmsg)
109
110 def do_set(self, var, value):
111 id = self.next_id
112 self.next_id += 1
113 self.send_set(var, value, id)
114 return self.recv_msgs()[id]
115
116 def do_get(self, var):
117 id = self.next_id
118 self.next_id += 1
119 self.send_get(var, id)
120 return self.recv_msgs()[id]
121
122 def recv_msgs(self):
123 responses = {}
124 data = self.sock.recv(4096)
125 while (len(data)>0):
126 (answer, data) = self.remove_ipa_ctrl_header(data)
127 if verbose:
128 print "Got message:", answer
129 (mtype, id, msg) = answer.split(None, 2)
130 id = int(id)
131 rsp = {'mtype': mtype, 'id': id}
132 if mtype == "ERROR":
133 rsp['error'] = msg
134 else:
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100135 split = msg.split(None, 1)
136 rsp['var'] = split[0]
137 if len(split) > 1:
138 rsp['value'] = split[1]
139 else:
140 rsp['value'] = None
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200141 responses[id] = rsp
142
143 if verbose:
144 print "Decoded replies: ", responses
145
146 return responses
147
148
149class TestCtrlBSC(TestCtrlBase):
150
151 def tearDown(self):
152 TestCtrlBase.tearDown(self)
153 os.unlink("tmp_dummy_sock")
154
155 def ctrl_command(self):
156 return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
157 "doc/examples/osmo-bsc/osmo-bsc.cfg"]
158
159 def ctrl_app(self):
160 return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
161
162 def testCtrlErrs(self):
163 r = self.do_get('invalid')
164 self.assertEquals(r['mtype'], 'ERROR')
165 self.assertEquals(r['error'], 'Command not found')
166
Jacob Erlbeck21f6d152013-09-16 11:20:29 +0200167 r = self.do_set('rf_locked', '999')
168 self.assertEquals(r['mtype'], 'ERROR')
169 self.assertEquals(r['error'], 'Value failed verification.')
170
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200171 r = self.do_get('bts')
172 self.assertEquals(r['mtype'], 'ERROR')
173 self.assertEquals(r['error'], 'Error while parsing the index.')
174
175 r = self.do_get('bts.999')
176 self.assertEquals(r['mtype'], 'ERROR')
177 self.assertEquals(r['error'], 'Error while resolving object')
178
Holger Hans Peter Freyther2de46892014-03-23 11:17:27 +0100179 def testTrxPowerRed(self):
180 r = self.do_get('bts.0.trx.0.max-power-reduction')
181 self.assertEquals(r['mtype'], 'GET_REPLY')
182 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
183 self.assertEquals(r['value'], '20')
184
185 r = self.do_set('bts.0.trx.0.max-power-reduction', '22')
186 self.assertEquals(r['mtype'], 'SET_REPLY')
187 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
188 self.assertEquals(r['value'], '22')
189
190 r = self.do_get('bts.0.trx.0.max-power-reduction')
191 self.assertEquals(r['mtype'], 'GET_REPLY')
192 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
193 self.assertEquals(r['value'], '22')
194
195 r = self.do_set('bts.0.trx.0.max-power-reduction', '1')
196 self.assertEquals(r['mtype'], 'ERROR')
197 self.assertEquals(r['error'], 'Value must be even')
198
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200199 def testRfLock(self):
200 r = self.do_get('bts.0.rf_state')
201 self.assertEquals(r['mtype'], 'GET_REPLY')
202 self.assertEquals(r['var'], 'bts.0.rf_state')
203 self.assertEquals(r['value'], 'inoperational,unlocked,on')
204
205 r = self.do_set('rf_locked', '1')
206 self.assertEquals(r['mtype'], 'SET_REPLY')
207 self.assertEquals(r['var'], 'rf_locked')
208 self.assertEquals(r['value'], '1')
209
210 time.sleep(1.5)
211
212 r = self.do_get('bts.0.rf_state')
213 self.assertEquals(r['mtype'], 'GET_REPLY')
214 self.assertEquals(r['var'], 'bts.0.rf_state')
215 self.assertEquals(r['value'], 'inoperational,locked,off')
216
217 r = self.do_set('rf_locked', '0')
218 self.assertEquals(r['mtype'], 'SET_REPLY')
219 self.assertEquals(r['var'], 'rf_locked')
220 self.assertEquals(r['value'], '0')
221
222 time.sleep(1.5)
223
224 r = self.do_get('bts.0.rf_state')
225 self.assertEquals(r['mtype'], 'GET_REPLY')
226 self.assertEquals(r['var'], 'bts.0.rf_state')
227 self.assertEquals(r['value'], 'inoperational,unlocked,on')
228
Jacob Erlbeck64e143a2013-10-01 13:26:42 +0200229 def testTimezone(self):
230 r = self.do_get('bts.0.timezone')
231 self.assertEquals(r['mtype'], 'GET_REPLY')
232 self.assertEquals(r['var'], 'bts.0.timezone')
233 self.assertEquals(r['value'], 'off')
234
235 r = self.do_set('bts.0.timezone', '-2,15,2')
236 self.assertEquals(r['mtype'], 'SET_REPLY')
237 self.assertEquals(r['var'], 'bts.0.timezone')
238 self.assertEquals(r['value'], '-2,15,2')
239
240 r = self.do_get('bts.0.timezone')
241 self.assertEquals(r['mtype'], 'GET_REPLY')
242 self.assertEquals(r['var'], 'bts.0.timezone')
243 self.assertEquals(r['value'], '-2,15,2')
244
245 # Test invalid input
246 r = self.do_set('bts.0.timezone', '-2,15,2,5,6,7')
247 self.assertEquals(r['mtype'], 'SET_REPLY')
248 self.assertEquals(r['var'], 'bts.0.timezone')
249 self.assertEquals(r['value'], '-2,15,2')
250
251 r = self.do_set('bts.0.timezone', '-2,15')
252 self.assertEquals(r['mtype'], 'ERROR')
253 r = self.do_set('bts.0.timezone', '-2')
254 self.assertEquals(r['mtype'], 'ERROR')
255 r = self.do_set('bts.0.timezone', '1')
256
257 r = self.do_set('bts.0.timezone', 'off')
258 self.assertEquals(r['mtype'], 'SET_REPLY')
259 self.assertEquals(r['var'], 'bts.0.timezone')
260 self.assertEquals(r['value'], 'off')
261
262 r = self.do_get('bts.0.timezone')
263 self.assertEquals(r['mtype'], 'GET_REPLY')
264 self.assertEquals(r['var'], 'bts.0.timezone')
265 self.assertEquals(r['value'], 'off')
266
Holger Hans Peter Freytherf9fe3232014-04-24 10:30:05 +0200267 def testMcc(self):
268 r = self.do_set('mcc', '23')
269 r = self.do_get('mcc')
270 self.assertEquals(r['mtype'], 'GET_REPLY')
271 self.assertEquals(r['var'], 'mcc')
272 self.assertEquals(r['value'], '23')
273
274 r = self.do_set('mcc', '023')
275 r = self.do_get('mcc')
276 self.assertEquals(r['mtype'], 'GET_REPLY')
277 self.assertEquals(r['var'], 'mcc')
278 self.assertEquals(r['value'], '23')
279
280 def testMnc(self):
281 r = self.do_set('mnc', '9')
282 r = self.do_get('mnc')
283 self.assertEquals(r['mtype'], 'GET_REPLY')
284 self.assertEquals(r['var'], 'mnc')
285 self.assertEquals(r['value'], '9')
286
287 r = self.do_set('mnc', '09')
288 r = self.do_get('mnc')
289 self.assertEquals(r['mtype'], 'GET_REPLY')
290 self.assertEquals(r['var'], 'mnc')
291 self.assertEquals(r['value'], '9')
292
293
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100294 def testMccMncApply(self):
295 # Test some invalid input
296 r = self.do_set('mcc-mnc-apply', 'WRONG')
297 self.assertEquals(r['mtype'], 'ERROR')
298
299 r = self.do_set('mcc-mnc-apply', '1,')
300 self.assertEquals(r['mtype'], 'ERROR')
301
302 r = self.do_set('mcc-mnc-apply', '200,3')
303 self.assertEquals(r['mtype'], 'SET_REPLY')
304 self.assertEquals(r['var'], 'mcc-mnc-apply')
305 self.assertEquals(r['value'], 'Tried to drop the BTS')
306
307 # Set it again
308 r = self.do_set('mcc-mnc-apply', '200,3')
309 self.assertEquals(r['mtype'], 'SET_REPLY')
310 self.assertEquals(r['var'], 'mcc-mnc-apply')
311 self.assertEquals(r['value'], 'Nothing changed')
312
313 # Change it
314 r = self.do_set('mcc-mnc-apply', '200,4')
315 self.assertEquals(r['mtype'], 'SET_REPLY')
316 self.assertEquals(r['var'], 'mcc-mnc-apply')
317 self.assertEquals(r['value'], 'Tried to drop the BTS')
318
319 # Change it
320 r = self.do_set('mcc-mnc-apply', '201,4')
321 self.assertEquals(r['mtype'], 'SET_REPLY')
322 self.assertEquals(r['var'], 'mcc-mnc-apply')
323 self.assertEquals(r['value'], 'Tried to drop the BTS')
324
325 # Verify
326 r = self.do_get('mnc')
327 self.assertEquals(r['mtype'], 'GET_REPLY')
328 self.assertEquals(r['var'], 'mnc')
329 self.assertEquals(r['value'], '4')
330
331 r = self.do_get('mcc')
332 self.assertEquals(r['mtype'], 'GET_REPLY')
333 self.assertEquals(r['var'], 'mcc')
334 self.assertEquals(r['value'], '201')
335
Holger Hans Peter Freytherf9fe3232014-04-24 10:30:05 +0200336 # Change it
337 r = self.do_set('mcc-mnc-apply', '202,03')
338 self.assertEquals(r['mtype'], 'SET_REPLY')
339 self.assertEquals(r['var'], 'mcc-mnc-apply')
340 self.assertEquals(r['value'], 'Tried to drop the BTS')
341
342 r = self.do_get('mnc')
343 self.assertEquals(r['mtype'], 'GET_REPLY')
344 self.assertEquals(r['var'], 'mnc')
345 self.assertEquals(r['value'], '3')
346
347 r = self.do_get('mcc')
348 self.assertEquals(r['mtype'], 'GET_REPLY')
349 self.assertEquals(r['var'], 'mcc')
350 self.assertEquals(r['value'], '202')
351
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100352class TestCtrlNITB(TestCtrlBase):
353
354 def tearDown(self):
355 TestCtrlBase.tearDown(self)
356 os.unlink("test_hlr.sqlite3")
357
358 def ctrl_command(self):
359 return ["./src/osmo-nitb/osmo-nitb", "-c",
360 "doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-l", "test_hlr.sqlite3"]
361
362 def ctrl_app(self):
363 return (4249, "./src/osmo-nitb/osmo-nitb", "OsmoBSC", "nitb")
364
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100365 def testSubscriberAddRemove(self):
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100366 r = self.do_set('subscriber-modify-v1', '2620345,445566')
367 self.assertEquals(r['mtype'], 'SET_REPLY')
368 self.assertEquals(r['var'], 'subscriber-modify-v1')
369 self.assertEquals(r['value'], 'OK')
370
371 r = self.do_set('subscriber-modify-v1', '2620345,445567')
372 self.assertEquals(r['mtype'], 'SET_REPLY')
373 self.assertEquals(r['var'], 'subscriber-modify-v1')
374 self.assertEquals(r['value'], 'OK')
375
376 # TODO. verify that the entry has been created and modified? Invoke
377 # the sqlite3 CLI or do it through the DB libraries?
378
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100379 r = self.do_set('subscriber-delete-v1', '2620345')
380 self.assertEquals(r['mtype'], 'SET_REPLY')
381 self.assertEquals(r['value'], 'Removed')
382
383 r = self.do_set('subscriber-delete-v1', '2620345')
384 self.assertEquals(r['mtype'], 'ERROR')
385 self.assertEquals(r['error'], 'Failed to find subscriber')
386
Holger Hans Peter Freytherd41b7b72014-03-23 16:22:55 +0100387 def testSubscriberList(self):
388 # TODO. Add command to mark a subscriber as active
389 r = self.do_get('subscriber-list-active-v1')
390 self.assertEquals(r['mtype'], 'GET_REPLY')
391 self.assertEquals(r['var'], 'subscriber-list-active-v1')
392 self.assertEquals(r['value'], None)
393
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100394class TestCtrlNAT(TestCtrlBase):
395
396 def ctrl_command(self):
397 return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c",
398 "doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"]
399
400 def ctrl_app(self):
401 return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat")
402
403 def testAccessList(self):
404 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
405 self.assertEquals(r['mtype'], 'GET_REPLY')
406 self.assertEquals(r['var'], 'net')
407 self.assertEquals(r['value'], None)
408
409 r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla')
410 self.assertEquals(r['mtype'], 'SET_REPLY')
411 self.assertEquals(r['var'], 'net')
412 self.assertEquals(r['value'], 'bla')
413
414 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
415 self.assertEquals(r['mtype'], 'GET_REPLY')
416 self.assertEquals(r['var'], 'net')
417 self.assertEquals(r['value'], 'bla')
418
419 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
420 self.assertEquals(r['mtype'], 'SET_REPLY')
421 self.assertEquals(r['var'], 'net')
422 self.assertEquals(r['value'], None)
423
424 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
425 self.assertEquals(r['mtype'], 'SET_REPLY')
426 self.assertEquals(r['var'], 'net')
427 self.assertEquals(r['value'], None)
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100428
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100429class TestCtrlSGSN(TestCtrlBase):
430 def ctrl_command(self):
431 return ["./src/gprs/osmo-sgsn", "-c",
432 "doc/examples/osmo-sgsn/osmo-sgsn.cfg"]
433
434 def ctrl_app(self):
435 return (4251, "./src/gprs/osmo-sgsn", "OsmoSGSN", "sgsn")
436
437 def testListSubscribers(self):
438 # TODO. Add command to mark a subscriber as active
439 r = self.do_get('subscriber-list-active-v1')
440 self.assertEquals(r['mtype'], 'GET_REPLY')
441 self.assertEquals(r['var'], 'subscriber-list-active-v1')
442 self.assertEquals(r['value'], None)
443
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200444def add_bsc_test(suite, workdir):
445 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
446 print("Skipping the BSC test")
447 return
448 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
449 suite.addTest(test)
450
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100451def add_nitb_test(suite, workdir):
452 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNITB)
453 suite.addTest(test)
454
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100455def add_nat_test(suite, workdir):
456 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")):
457 print("Skipping the NAT test")
458 return
459 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT)
460 suite.addTest(test)
461
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100462def add_sgsn_test(suite, workdir):
463 if not os.path.isfile(os.path.join(workdir, "src/gprs/osmo-sgsn")):
464 print("Skipping the SGSN test")
465 return
466 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlSGSN)
467 suite.addTest(test)
468
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200469if __name__ == '__main__':
470 import argparse
471 import sys
472
473 workdir = '.'
474
475 parser = argparse.ArgumentParser()
476 parser.add_argument("-v", "--verbose", dest="verbose",
477 action="store_true", help="verbose mode")
478 parser.add_argument("-p", "--pythonconfpath", dest="p",
479 help="searchpath for config")
480 parser.add_argument("-w", "--workdir", dest="w",
481 help="Working directory")
482 args = parser.parse_args()
483
484 verbose_level = 1
485 if args.verbose:
486 verbose_level = 2
487 verbose = True
488
489 if args.w:
490 workdir = args.w
491
492 if args.p:
493 confpath = args.p
494
495 print "confpath %s, workdir %s" % (confpath, workdir)
496 os.chdir(workdir)
497 print "Running tests for specific control commands"
498 suite = unittest.TestSuite()
499 add_bsc_test(suite, workdir)
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100500 add_nitb_test(suite, workdir)
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100501 add_nat_test(suite, workdir)
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100502 add_sgsn_test(suite, workdir)
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200503 res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
504 sys.exit(len(res.errors) + len(res.failures))