blob: aae9df5cc1e75466cf2336f67370cedd337c1542 [file] [log] [blame]
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02001#!/usr/bin/env python
2
3# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +01004# (C) 2014 by Holger Hans Peter Freyther
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +02005# based on vty_test_runner.py:
6# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
7# (C) 2013 by Holger Hans Peter Freyther
8# based on bsc_control.py.
9
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23import os
24import time
25import unittest
26import socket
27import sys
28import struct
29
30import osmopy.obscvty as obscvty
31import osmopy.osmoutil as osmoutil
32
33confpath = '.'
34verbose = False
35
36class TestCtrlBase(unittest.TestCase):
37
38 def ctrl_command(self):
39 raise Exception("Needs to be implemented by a subclass")
40
41 def ctrl_app(self):
42 raise Exception("Needs to be implemented by a subclass")
43
44 def setUp(self):
45 osmo_ctrl_cmd = self.ctrl_command()[:]
46 config_index = osmo_ctrl_cmd.index('-c')
47 if config_index:
48 cfi = config_index + 1
49 osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
50
51 try:
52 print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd())
53 self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
54 except OSError:
55 print >> sys.stderr, "Current directory: %s" % os.getcwd()
56 print >> sys.stderr, "Consider setting -b"
57 time.sleep(2)
58
59 appstring = self.ctrl_app()[2]
60 appport = self.ctrl_app()[0]
61 self.connect("127.0.0.1", appport)
62 self.next_id = 1000
63
64 def tearDown(self):
65 self.disconnect()
66 osmoutil.end_proc(self.proc)
67
68 def prefix_ipa_ctrl_header(self, data):
69 return struct.pack(">HBB", len(data)+1, 0xee, 0) + data
70
71 def remove_ipa_ctrl_header(self, data):
72 if (len(data) < 4):
73 raise BaseException("Answer too short!")
74 (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
75 if (plen + 3 > len(data)):
76 print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
77 if (ipa_proto != 0xee or osmo_proto != 0):
78 raise BaseException("Wrong protocol in answer!")
79
80 return data[4:plen+3], data[plen+3:]
81
82 def disconnect(self):
83 if not (self.sock is None):
84 self.sock.close()
85
86 def connect(self, host, port):
87 if verbose:
88 print "Connecting to host %s:%i" % (host, port)
89
90 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91 sck.setblocking(1)
92 sck.connect((host, port))
93 self.sock = sck
94 return sck
95
96 def send(self, data):
97 if verbose:
98 print "Sending \"%s\"" %(data)
99 data = self.prefix_ipa_ctrl_header(data)
100 return self.sock.send(data) == len(data)
101
102 def send_set(self, var, value, id):
103 setmsg = "SET %s %s %s" %(id, var, value)
104 return self.send(setmsg)
105
106 def send_get(self, var, id):
107 getmsg = "GET %s %s" %(id, var)
108 return self.send(getmsg)
109
110 def do_set(self, var, value):
111 id = self.next_id
112 self.next_id += 1
113 self.send_set(var, value, id)
114 return self.recv_msgs()[id]
115
116 def do_get(self, var):
117 id = self.next_id
118 self.next_id += 1
119 self.send_get(var, id)
120 return self.recv_msgs()[id]
121
122 def recv_msgs(self):
123 responses = {}
124 data = self.sock.recv(4096)
125 while (len(data)>0):
126 (answer, data) = self.remove_ipa_ctrl_header(data)
127 if verbose:
128 print "Got message:", answer
129 (mtype, id, msg) = answer.split(None, 2)
130 id = int(id)
131 rsp = {'mtype': mtype, 'id': id}
132 if mtype == "ERROR":
133 rsp['error'] = msg
134 else:
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100135 split = msg.split(None, 1)
136 rsp['var'] = split[0]
137 if len(split) > 1:
138 rsp['value'] = split[1]
139 else:
140 rsp['value'] = None
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200141 responses[id] = rsp
142
143 if verbose:
144 print "Decoded replies: ", responses
145
146 return responses
147
148
149class TestCtrlBSC(TestCtrlBase):
150
151 def tearDown(self):
152 TestCtrlBase.tearDown(self)
153 os.unlink("tmp_dummy_sock")
154
155 def ctrl_command(self):
156 return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
157 "doc/examples/osmo-bsc/osmo-bsc.cfg"]
158
159 def ctrl_app(self):
160 return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
161
162 def testCtrlErrs(self):
163 r = self.do_get('invalid')
164 self.assertEquals(r['mtype'], 'ERROR')
165 self.assertEquals(r['error'], 'Command not found')
166
Jacob Erlbeck21f6d152013-09-16 11:20:29 +0200167 r = self.do_set('rf_locked', '999')
168 self.assertEquals(r['mtype'], 'ERROR')
169 self.assertEquals(r['error'], 'Value failed verification.')
170
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200171 r = self.do_get('bts')
172 self.assertEquals(r['mtype'], 'ERROR')
173 self.assertEquals(r['error'], 'Error while parsing the index.')
174
175 r = self.do_get('bts.999')
176 self.assertEquals(r['mtype'], 'ERROR')
177 self.assertEquals(r['error'], 'Error while resolving object')
178
Holger Hans Peter Freytherba8679d2014-11-10 11:41:03 +0100179 def testBtsLac(self):
180 r = self.do_get('bts.0.location-area-code')
181 self.assertEquals(r['mtype'], 'GET_REPLY')
182 self.assertEquals(r['var'], 'bts.0.location-area-code')
183 self.assertEquals(r['value'], '1')
184
185 r = self.do_set('bts.0.location-area-code', '23')
186 self.assertEquals(r['mtype'], 'SET_REPLY')
187 self.assertEquals(r['var'], 'bts.0.location-area-code')
188 self.assertEquals(r['value'], '23')
189
190 r = self.do_get('bts.0.location-area-code')
191 self.assertEquals(r['mtype'], 'GET_REPLY')
192 self.assertEquals(r['var'], 'bts.0.location-area-code')
193 self.assertEquals(r['value'], '23')
194
195 r = self.do_set('bts.0.location-area-code', '-1')
196 self.assertEquals(r['mtype'], 'ERROR')
197 self.assertEquals(r['error'], 'Input not within the range')
198
Holger Hans Peter Freyther2de46892014-03-23 11:17:27 +0100199 def testTrxPowerRed(self):
200 r = self.do_get('bts.0.trx.0.max-power-reduction')
201 self.assertEquals(r['mtype'], 'GET_REPLY')
202 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
203 self.assertEquals(r['value'], '20')
204
205 r = self.do_set('bts.0.trx.0.max-power-reduction', '22')
206 self.assertEquals(r['mtype'], 'SET_REPLY')
207 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
208 self.assertEquals(r['value'], '22')
209
210 r = self.do_get('bts.0.trx.0.max-power-reduction')
211 self.assertEquals(r['mtype'], 'GET_REPLY')
212 self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
213 self.assertEquals(r['value'], '22')
214
215 r = self.do_set('bts.0.trx.0.max-power-reduction', '1')
216 self.assertEquals(r['mtype'], 'ERROR')
217 self.assertEquals(r['error'], 'Value must be even')
218
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200219 def testRfLock(self):
220 r = self.do_get('bts.0.rf_state')
221 self.assertEquals(r['mtype'], 'GET_REPLY')
222 self.assertEquals(r['var'], 'bts.0.rf_state')
223 self.assertEquals(r['value'], 'inoperational,unlocked,on')
224
225 r = self.do_set('rf_locked', '1')
226 self.assertEquals(r['mtype'], 'SET_REPLY')
227 self.assertEquals(r['var'], 'rf_locked')
228 self.assertEquals(r['value'], '1')
229
230 time.sleep(1.5)
231
232 r = self.do_get('bts.0.rf_state')
233 self.assertEquals(r['mtype'], 'GET_REPLY')
234 self.assertEquals(r['var'], 'bts.0.rf_state')
235 self.assertEquals(r['value'], 'inoperational,locked,off')
236
237 r = self.do_set('rf_locked', '0')
238 self.assertEquals(r['mtype'], 'SET_REPLY')
239 self.assertEquals(r['var'], 'rf_locked')
240 self.assertEquals(r['value'], '0')
241
242 time.sleep(1.5)
243
244 r = self.do_get('bts.0.rf_state')
245 self.assertEquals(r['mtype'], 'GET_REPLY')
246 self.assertEquals(r['var'], 'bts.0.rf_state')
247 self.assertEquals(r['value'], 'inoperational,unlocked,on')
248
Jacob Erlbeck64e143a2013-10-01 13:26:42 +0200249 def testTimezone(self):
250 r = self.do_get('bts.0.timezone')
251 self.assertEquals(r['mtype'], 'GET_REPLY')
252 self.assertEquals(r['var'], 'bts.0.timezone')
253 self.assertEquals(r['value'], 'off')
254
255 r = self.do_set('bts.0.timezone', '-2,15,2')
256 self.assertEquals(r['mtype'], 'SET_REPLY')
257 self.assertEquals(r['var'], 'bts.0.timezone')
258 self.assertEquals(r['value'], '-2,15,2')
259
260 r = self.do_get('bts.0.timezone')
261 self.assertEquals(r['mtype'], 'GET_REPLY')
262 self.assertEquals(r['var'], 'bts.0.timezone')
263 self.assertEquals(r['value'], '-2,15,2')
264
265 # Test invalid input
266 r = self.do_set('bts.0.timezone', '-2,15,2,5,6,7')
267 self.assertEquals(r['mtype'], 'SET_REPLY')
268 self.assertEquals(r['var'], 'bts.0.timezone')
269 self.assertEquals(r['value'], '-2,15,2')
270
271 r = self.do_set('bts.0.timezone', '-2,15')
272 self.assertEquals(r['mtype'], 'ERROR')
273 r = self.do_set('bts.0.timezone', '-2')
274 self.assertEquals(r['mtype'], 'ERROR')
275 r = self.do_set('bts.0.timezone', '1')
276
277 r = self.do_set('bts.0.timezone', 'off')
278 self.assertEquals(r['mtype'], 'SET_REPLY')
279 self.assertEquals(r['var'], 'bts.0.timezone')
280 self.assertEquals(r['value'], 'off')
281
282 r = self.do_get('bts.0.timezone')
283 self.assertEquals(r['mtype'], 'GET_REPLY')
284 self.assertEquals(r['var'], 'bts.0.timezone')
285 self.assertEquals(r['value'], 'off')
286
Holger Hans Peter Freytherf9fe3232014-04-24 10:30:05 +0200287 def testMcc(self):
288 r = self.do_set('mcc', '23')
289 r = self.do_get('mcc')
290 self.assertEquals(r['mtype'], 'GET_REPLY')
291 self.assertEquals(r['var'], 'mcc')
292 self.assertEquals(r['value'], '23')
293
294 r = self.do_set('mcc', '023')
295 r = self.do_get('mcc')
296 self.assertEquals(r['mtype'], 'GET_REPLY')
297 self.assertEquals(r['var'], 'mcc')
298 self.assertEquals(r['value'], '23')
299
300 def testMnc(self):
301 r = self.do_set('mnc', '9')
302 r = self.do_get('mnc')
303 self.assertEquals(r['mtype'], 'GET_REPLY')
304 self.assertEquals(r['var'], 'mnc')
305 self.assertEquals(r['value'], '9')
306
307 r = self.do_set('mnc', '09')
308 r = self.do_get('mnc')
309 self.assertEquals(r['mtype'], 'GET_REPLY')
310 self.assertEquals(r['var'], 'mnc')
311 self.assertEquals(r['value'], '9')
312
313
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100314 def testMccMncApply(self):
315 # Test some invalid input
316 r = self.do_set('mcc-mnc-apply', 'WRONG')
317 self.assertEquals(r['mtype'], 'ERROR')
318
319 r = self.do_set('mcc-mnc-apply', '1,')
320 self.assertEquals(r['mtype'], 'ERROR')
321
322 r = self.do_set('mcc-mnc-apply', '200,3')
323 self.assertEquals(r['mtype'], 'SET_REPLY')
324 self.assertEquals(r['var'], 'mcc-mnc-apply')
325 self.assertEquals(r['value'], 'Tried to drop the BTS')
326
327 # Set it again
328 r = self.do_set('mcc-mnc-apply', '200,3')
329 self.assertEquals(r['mtype'], 'SET_REPLY')
330 self.assertEquals(r['var'], 'mcc-mnc-apply')
331 self.assertEquals(r['value'], 'Nothing changed')
332
333 # Change it
334 r = self.do_set('mcc-mnc-apply', '200,4')
335 self.assertEquals(r['mtype'], 'SET_REPLY')
336 self.assertEquals(r['var'], 'mcc-mnc-apply')
337 self.assertEquals(r['value'], 'Tried to drop the BTS')
338
339 # Change it
340 r = self.do_set('mcc-mnc-apply', '201,4')
341 self.assertEquals(r['mtype'], 'SET_REPLY')
342 self.assertEquals(r['var'], 'mcc-mnc-apply')
343 self.assertEquals(r['value'], 'Tried to drop the BTS')
344
345 # Verify
346 r = self.do_get('mnc')
347 self.assertEquals(r['mtype'], 'GET_REPLY')
348 self.assertEquals(r['var'], 'mnc')
349 self.assertEquals(r['value'], '4')
350
351 r = self.do_get('mcc')
352 self.assertEquals(r['mtype'], 'GET_REPLY')
353 self.assertEquals(r['var'], 'mcc')
354 self.assertEquals(r['value'], '201')
355
Holger Hans Peter Freytherf9fe3232014-04-24 10:30:05 +0200356 # Change it
357 r = self.do_set('mcc-mnc-apply', '202,03')
358 self.assertEquals(r['mtype'], 'SET_REPLY')
359 self.assertEquals(r['var'], 'mcc-mnc-apply')
360 self.assertEquals(r['value'], 'Tried to drop the BTS')
361
362 r = self.do_get('mnc')
363 self.assertEquals(r['mtype'], 'GET_REPLY')
364 self.assertEquals(r['var'], 'mnc')
365 self.assertEquals(r['value'], '3')
366
367 r = self.do_get('mcc')
368 self.assertEquals(r['mtype'], 'GET_REPLY')
369 self.assertEquals(r['var'], 'mcc')
370 self.assertEquals(r['value'], '202')
371
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100372class TestCtrlNITB(TestCtrlBase):
373
374 def tearDown(self):
375 TestCtrlBase.tearDown(self)
376 os.unlink("test_hlr.sqlite3")
377
378 def ctrl_command(self):
379 return ["./src/osmo-nitb/osmo-nitb", "-c",
380 "doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-l", "test_hlr.sqlite3"]
381
382 def ctrl_app(self):
383 return (4249, "./src/osmo-nitb/osmo-nitb", "OsmoBSC", "nitb")
384
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100385 def testSubscriberAddRemove(self):
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100386 r = self.do_set('subscriber-modify-v1', '2620345,445566')
387 self.assertEquals(r['mtype'], 'SET_REPLY')
388 self.assertEquals(r['var'], 'subscriber-modify-v1')
389 self.assertEquals(r['value'], 'OK')
390
391 r = self.do_set('subscriber-modify-v1', '2620345,445567')
392 self.assertEquals(r['mtype'], 'SET_REPLY')
393 self.assertEquals(r['var'], 'subscriber-modify-v1')
394 self.assertEquals(r['value'], 'OK')
395
396 # TODO. verify that the entry has been created and modified? Invoke
397 # the sqlite3 CLI or do it through the DB libraries?
398
Holger Hans Peter Freytherc652a5d2014-03-23 14:01:08 +0100399 r = self.do_set('subscriber-delete-v1', '2620345')
400 self.assertEquals(r['mtype'], 'SET_REPLY')
401 self.assertEquals(r['value'], 'Removed')
402
403 r = self.do_set('subscriber-delete-v1', '2620345')
404 self.assertEquals(r['mtype'], 'ERROR')
405 self.assertEquals(r['error'], 'Failed to find subscriber')
406
Holger Hans Peter Freytherd41b7b72014-03-23 16:22:55 +0100407 def testSubscriberList(self):
408 # TODO. Add command to mark a subscriber as active
409 r = self.do_get('subscriber-list-active-v1')
410 self.assertEquals(r['mtype'], 'GET_REPLY')
411 self.assertEquals(r['var'], 'subscriber-list-active-v1')
412 self.assertEquals(r['value'], None)
413
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100414class TestCtrlNAT(TestCtrlBase):
415
416 def ctrl_command(self):
417 return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c",
418 "doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"]
419
420 def ctrl_app(self):
421 return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat")
422
423 def testAccessList(self):
424 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
425 self.assertEquals(r['mtype'], 'GET_REPLY')
426 self.assertEquals(r['var'], 'net')
427 self.assertEquals(r['value'], None)
428
429 r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla')
430 self.assertEquals(r['mtype'], 'SET_REPLY')
431 self.assertEquals(r['var'], 'net')
432 self.assertEquals(r['value'], 'bla')
433
434 r = self.do_get('net.0.bsc_cfg.0.access-list-name')
435 self.assertEquals(r['mtype'], 'GET_REPLY')
436 self.assertEquals(r['var'], 'net')
437 self.assertEquals(r['value'], 'bla')
438
439 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
440 self.assertEquals(r['mtype'], 'SET_REPLY')
441 self.assertEquals(r['var'], 'net')
442 self.assertEquals(r['value'], None)
443
444 r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
445 self.assertEquals(r['mtype'], 'SET_REPLY')
446 self.assertEquals(r['var'], 'net')
447 self.assertEquals(r['value'], None)
Holger Hans Peter Freythereab2a3f2014-03-04 17:16:58 +0100448
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100449class TestCtrlSGSN(TestCtrlBase):
450 def ctrl_command(self):
451 return ["./src/gprs/osmo-sgsn", "-c",
452 "doc/examples/osmo-sgsn/osmo-sgsn.cfg"]
453
454 def ctrl_app(self):
455 return (4251, "./src/gprs/osmo-sgsn", "OsmoSGSN", "sgsn")
456
457 def testListSubscribers(self):
458 # TODO. Add command to mark a subscriber as active
459 r = self.do_get('subscriber-list-active-v1')
460 self.assertEquals(r['mtype'], 'GET_REPLY')
461 self.assertEquals(r['var'], 'subscriber-list-active-v1')
462 self.assertEquals(r['value'], None)
463
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200464def add_bsc_test(suite, workdir):
465 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
466 print("Skipping the BSC test")
467 return
468 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
469 suite.addTest(test)
470
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100471def add_nitb_test(suite, workdir):
472 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNITB)
473 suite.addTest(test)
474
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100475def add_nat_test(suite, workdir):
476 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")):
477 print("Skipping the NAT test")
478 return
479 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT)
480 suite.addTest(test)
481
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100482def add_sgsn_test(suite, workdir):
483 if not os.path.isfile(os.path.join(workdir, "src/gprs/osmo-sgsn")):
484 print("Skipping the SGSN test")
485 return
486 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlSGSN)
487 suite.addTest(test)
488
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200489if __name__ == '__main__':
490 import argparse
491 import sys
492
493 workdir = '.'
494
495 parser = argparse.ArgumentParser()
496 parser.add_argument("-v", "--verbose", dest="verbose",
497 action="store_true", help="verbose mode")
498 parser.add_argument("-p", "--pythonconfpath", dest="p",
499 help="searchpath for config")
500 parser.add_argument("-w", "--workdir", dest="w",
501 help="Working directory")
502 args = parser.parse_args()
503
504 verbose_level = 1
505 if args.verbose:
506 verbose_level = 2
507 verbose = True
508
509 if args.w:
510 workdir = args.w
511
512 if args.p:
513 confpath = args.p
514
515 print "confpath %s, workdir %s" % (confpath, workdir)
516 os.chdir(workdir)
517 print "Running tests for specific control commands"
518 suite = unittest.TestSuite()
519 add_bsc_test(suite, workdir)
Holger Hans Peter Freytherfba03162014-03-23 12:06:36 +0100520 add_nitb_test(suite, workdir)
Holger Hans Peter Freyther842137a2014-03-04 15:38:00 +0100521 add_nat_test(suite, workdir)
Holger Hans Peter Freythere8e5ef22014-03-23 18:08:26 +0100522 add_sgsn_test(suite, workdir)
Jacob Erlbeck5741e1f2013-09-16 11:20:28 +0200523 res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
524 sys.exit(len(res.errors) + len(res.failures))