| #!/usr/bin/env python |
| |
| # (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de> |
| # (C) 2014 by Holger Hans Peter Freyther |
| # based on vty_test_runner.py: |
| # (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com> |
| # (C) 2013 by Holger Hans Peter Freyther |
| # based on bsc_control.py. |
| |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU General Public License as published by |
| # the Free Software Foundation, either version 3 of the License, or |
| # (at your option) any later version. |
| |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| |
| # You should have received a copy of the GNU General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| import os |
| import time |
| import unittest |
| import socket |
| import sys |
| import struct |
| |
| import osmopy.obscvty as obscvty |
| import osmopy.osmoutil as osmoutil |
| |
| confpath = '.' |
| verbose = False |
| |
| class TestCtrlBase(unittest.TestCase): |
| |
| def ctrl_command(self): |
| raise Exception("Needs to be implemented by a subclass") |
| |
| def ctrl_app(self): |
| raise Exception("Needs to be implemented by a subclass") |
| |
| def setUp(self): |
| osmo_ctrl_cmd = self.ctrl_command()[:] |
| config_index = osmo_ctrl_cmd.index('-c') |
| if config_index: |
| cfi = config_index + 1 |
| osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi]) |
| |
| try: |
| print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd()) |
| self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd) |
| except OSError: |
| print >> sys.stderr, "Current directory: %s" % os.getcwd() |
| print >> sys.stderr, "Consider setting -b" |
| time.sleep(2) |
| |
| appstring = self.ctrl_app()[2] |
| appport = self.ctrl_app()[0] |
| self.connect("127.0.0.1", appport) |
| self.next_id = 1000 |
| |
| def tearDown(self): |
| self.disconnect() |
| osmoutil.end_proc(self.proc) |
| |
| def prefix_ipa_ctrl_header(self, data): |
| return struct.pack(">HBB", len(data)+1, 0xee, 0) + data |
| |
| def remove_ipa_ctrl_header(self, data): |
| if (len(data) < 4): |
| raise BaseException("Answer too short!") |
| (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4]) |
| if (plen + 3 > len(data)): |
| print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3) |
| if (ipa_proto != 0xee or osmo_proto != 0): |
| raise BaseException("Wrong protocol in answer!") |
| |
| return data[4:plen+3], data[plen+3:] |
| |
| def disconnect(self): |
| if not (self.sock is None): |
| self.sock.close() |
| |
| def connect(self, host, port): |
| if verbose: |
| print "Connecting to host %s:%i" % (host, port) |
| |
| sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sck.setblocking(1) |
| sck.connect((host, port)) |
| self.sock = sck |
| return sck |
| |
| def send(self, data): |
| if verbose: |
| print "Sending \"%s\"" %(data) |
| data = self.prefix_ipa_ctrl_header(data) |
| return self.sock.send(data) == len(data) |
| |
| def send_set(self, var, value, id): |
| setmsg = "SET %s %s %s" %(id, var, value) |
| return self.send(setmsg) |
| |
| def send_get(self, var, id): |
| getmsg = "GET %s %s" %(id, var) |
| return self.send(getmsg) |
| |
| def do_set(self, var, value): |
| id = self.next_id |
| self.next_id += 1 |
| self.send_set(var, value, id) |
| return self.recv_msgs()[id] |
| |
| def do_get(self, var): |
| id = self.next_id |
| self.next_id += 1 |
| self.send_get(var, id) |
| return self.recv_msgs()[id] |
| |
| def recv_msgs(self): |
| responses = {} |
| data = self.sock.recv(4096) |
| while (len(data)>0): |
| (answer, data) = self.remove_ipa_ctrl_header(data) |
| if verbose: |
| print "Got message:", answer |
| (mtype, id, msg) = answer.split(None, 2) |
| id = int(id) |
| rsp = {'mtype': mtype, 'id': id} |
| if mtype == "ERROR": |
| rsp['error'] = msg |
| else: |
| split = msg.split(None, 1) |
| rsp['var'] = split[0] |
| if len(split) > 1: |
| rsp['value'] = split[1] |
| else: |
| rsp['value'] = None |
| responses[id] = rsp |
| |
| if verbose: |
| print "Decoded replies: ", responses |
| |
| return responses |
| |
| |
| class TestCtrlBSC(TestCtrlBase): |
| |
| def tearDown(self): |
| TestCtrlBase.tearDown(self) |
| os.unlink("tmp_dummy_sock") |
| |
| def ctrl_command(self): |
| return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c", |
| "doc/examples/osmo-bsc/osmo-bsc.cfg"] |
| |
| def ctrl_app(self): |
| return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc") |
| |
| def testCtrlErrs(self): |
| r = self.do_get('invalid') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Command not found') |
| |
| r = self.do_set('rf_locked', '999') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Value failed verification.') |
| |
| r = self.do_get('bts') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Error while parsing the index.') |
| |
| r = self.do_get('bts.999') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Error while resolving object') |
| |
| def testBtsLac(self): |
| r = self.do_get('bts.0.location-area-code') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.location-area-code') |
| self.assertEquals(r['value'], '1') |
| |
| r = self.do_set('bts.0.location-area-code', '23') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.location-area-code') |
| self.assertEquals(r['value'], '23') |
| |
| r = self.do_get('bts.0.location-area-code') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.location-area-code') |
| self.assertEquals(r['value'], '23') |
| |
| r = self.do_set('bts.0.location-area-code', '-1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Input not within the range') |
| |
| def testBtsCi(self): |
| r = self.do_get('bts.0.cell-identity') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.cell-identity') |
| self.assertEquals(r['value'], '0') |
| |
| r = self.do_set('bts.0.cell-identity', '23') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.cell-identity') |
| self.assertEquals(r['value'], '23') |
| |
| r = self.do_get('bts.0.cell-identity') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.cell-identity') |
| self.assertEquals(r['value'], '23') |
| |
| r = self.do_set('bts.0.cell-identity', '-1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Input not within the range') |
| |
| def testBtsGenerateSystemInformation(self): |
| r = self.do_get('bts.0.send-new-system-informations') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Write only attribute') |
| |
| # No RSL links so it will fail |
| r = self.do_set('bts.0.send-new-system-informations', '1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Failed to generate SI') |
| |
| def testBtsChannelLoad(self): |
| r = self.do_set('bts.0.channel-load', '1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Read only attribute') |
| |
| # No RSL link so everything is 0 |
| r = self.do_get('bts.0.channel-load') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['value'], 'CCCH+SDCCH4,0,0 TCH/F,0,0 TCH/H,0,0 SDCCH8,0,0 TCH/F_PDCH,0,0 CCCH+SDCCH4+CBCH,0,0 SDCCH8+CBCH,0,0') |
| |
| def testBtsOmlConnectionState(self): |
| """Check OML state. It will not be connected""" |
| r = self.do_set('bts.0.oml-connection-state', '1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Read only attribute') |
| |
| # No RSL link so everything is 0 |
| r = self.do_get('bts.0.oml-connection-state') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['value'], 'disconnected') |
| |
| def testTrxPowerRed(self): |
| r = self.do_get('bts.0.trx.0.max-power-reduction') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction') |
| self.assertEquals(r['value'], '20') |
| |
| r = self.do_set('bts.0.trx.0.max-power-reduction', '22') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction') |
| self.assertEquals(r['value'], '22') |
| |
| r = self.do_get('bts.0.trx.0.max-power-reduction') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction') |
| self.assertEquals(r['value'], '22') |
| |
| r = self.do_set('bts.0.trx.0.max-power-reduction', '1') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Value must be even') |
| |
| def testTrxArfcn(self): |
| r = self.do_get('bts.0.trx.0.arfcn') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.arfcn') |
| self.assertEquals(r['value'], '871') |
| |
| r = self.do_set('bts.0.trx.0.arfcn', '873') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.arfcn') |
| self.assertEquals(r['value'], '873') |
| |
| r = self.do_get('bts.0.trx.0.arfcn') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.trx.0.arfcn') |
| self.assertEquals(r['value'], '873') |
| |
| r = self.do_set('bts.0.trx.0.arfcn', '2000') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Input not within the range') |
| |
| def testRfLock(self): |
| r = self.do_get('bts.0.rf_state') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.rf_state') |
| self.assertEquals(r['value'], 'inoperational,unlocked,on') |
| |
| r = self.do_set('rf_locked', '1') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'rf_locked') |
| self.assertEquals(r['value'], '1') |
| |
| time.sleep(1.5) |
| |
| r = self.do_get('bts.0.rf_state') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.rf_state') |
| self.assertEquals(r['value'], 'inoperational,locked,off') |
| |
| r = self.do_get('rf_locked') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'rf_locked') |
| self.assertEquals(r['value'], 'state=off,policy=off') |
| |
| r = self.do_set('rf_locked', '0') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'rf_locked') |
| self.assertEquals(r['value'], '0') |
| |
| time.sleep(1.5) |
| |
| r = self.do_get('bts.0.rf_state') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.rf_state') |
| self.assertEquals(r['value'], 'inoperational,unlocked,on') |
| |
| r = self.do_get('rf_locked') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'rf_locked') |
| self.assertEquals(r['value'], 'state=off,policy=on') |
| |
| def testTimezone(self): |
| r = self.do_get('bts.0.timezone') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], 'off') |
| |
| r = self.do_set('bts.0.timezone', '-2,15,2') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], '-2,15,2') |
| |
| r = self.do_get('bts.0.timezone') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], '-2,15,2') |
| |
| # Test invalid input |
| r = self.do_set('bts.0.timezone', '-2,15,2,5,6,7') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], '-2,15,2') |
| |
| r = self.do_set('bts.0.timezone', '-2,15') |
| self.assertEquals(r['mtype'], 'ERROR') |
| r = self.do_set('bts.0.timezone', '-2') |
| self.assertEquals(r['mtype'], 'ERROR') |
| r = self.do_set('bts.0.timezone', '1') |
| |
| r = self.do_set('bts.0.timezone', 'off') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], 'off') |
| |
| r = self.do_get('bts.0.timezone') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.timezone') |
| self.assertEquals(r['value'], 'off') |
| |
| def testMcc(self): |
| r = self.do_set('mcc', '23') |
| r = self.do_get('mcc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mcc') |
| self.assertEquals(r['value'], '23') |
| |
| r = self.do_set('mcc', '023') |
| r = self.do_get('mcc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mcc') |
| self.assertEquals(r['value'], '23') |
| |
| def testMnc(self): |
| r = self.do_set('mnc', '9') |
| r = self.do_get('mnc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mnc') |
| self.assertEquals(r['value'], '9') |
| |
| r = self.do_set('mnc', '09') |
| r = self.do_get('mnc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mnc') |
| self.assertEquals(r['value'], '9') |
| |
| |
| def testMccMncApply(self): |
| # Test some invalid input |
| r = self.do_set('mcc-mnc-apply', 'WRONG') |
| self.assertEquals(r['mtype'], 'ERROR') |
| |
| r = self.do_set('mcc-mnc-apply', '1,') |
| self.assertEquals(r['mtype'], 'ERROR') |
| |
| r = self.do_set('mcc-mnc-apply', '200,3') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'mcc-mnc-apply') |
| self.assertEquals(r['value'], 'Tried to drop the BTS') |
| |
| # Set it again |
| r = self.do_set('mcc-mnc-apply', '200,3') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'mcc-mnc-apply') |
| self.assertEquals(r['value'], 'Nothing changed') |
| |
| # Change it |
| r = self.do_set('mcc-mnc-apply', '200,4') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'mcc-mnc-apply') |
| self.assertEquals(r['value'], 'Tried to drop the BTS') |
| |
| # Change it |
| r = self.do_set('mcc-mnc-apply', '201,4') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'mcc-mnc-apply') |
| self.assertEquals(r['value'], 'Tried to drop the BTS') |
| |
| # Verify |
| r = self.do_get('mnc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mnc') |
| self.assertEquals(r['value'], '4') |
| |
| r = self.do_get('mcc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mcc') |
| self.assertEquals(r['value'], '201') |
| |
| # Change it |
| r = self.do_set('mcc-mnc-apply', '202,03') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'mcc-mnc-apply') |
| self.assertEquals(r['value'], 'Tried to drop the BTS') |
| |
| r = self.do_get('mnc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mnc') |
| self.assertEquals(r['value'], '3') |
| |
| r = self.do_get('mcc') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'mcc') |
| self.assertEquals(r['value'], '202') |
| |
| class TestCtrlNITB(TestCtrlBase): |
| |
| def tearDown(self): |
| TestCtrlBase.tearDown(self) |
| os.unlink("test_hlr.sqlite3") |
| |
| def ctrl_command(self): |
| return ["./src/osmo-nitb/osmo-nitb", "-c", |
| "doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-l", "test_hlr.sqlite3"] |
| |
| def ctrl_app(self): |
| return (4249, "./src/osmo-nitb/osmo-nitb", "OsmoBSC", "nitb") |
| |
| def testNumberOfBTS(self): |
| r = self.do_get('number-of-bts') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'number-of-bts') |
| self.assertEquals(r['value'], '1') |
| |
| def testSubscriberAddWithKi(self): |
| """Test that we can set the algorithm to none, xor, comp128v1""" |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445566') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445566,none') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445566,xor') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Value failed verification.') |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445566,comp128v1,00112233445566778899AABBCCDDEEFF') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445566,none') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| def testSubscriberAddRemove(self): |
| r = self.do_set('subscriber-modify-v1', '2620345,445566') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| r = self.do_set('subscriber-modify-v1', '2620345,445567') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-modify-v1') |
| self.assertEquals(r['value'], 'OK') |
| |
| # TODO. verify that the entry has been created and modified? Invoke |
| # the sqlite3 CLI or do it through the DB libraries? |
| |
| r = self.do_set('subscriber-delete-v1', '2620345') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['value'], 'Removed') |
| |
| r = self.do_set('subscriber-delete-v1', '2620345') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Failed to find subscriber') |
| |
| def testSubscriberList(self): |
| # TODO. Add command to mark a subscriber as active |
| r = self.do_get('subscriber-list-active-v1') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-list-active-v1') |
| self.assertEquals(r['value'], None) |
| |
| def testApplyConfiguration(self): |
| r = self.do_get('bts.0.apply-configuration') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Write only attribute') |
| |
| r = self.do_set('bts.0.apply-configuration', '1') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['value'], 'Tried to drop the BTS') |
| |
| def testGprsMode(self): |
| r = self.do_get('bts.0.gprs-mode') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.gprs-mode') |
| self.assertEquals(r['value'], 'none') |
| |
| r = self.do_set('bts.0.gprs-mode', 'bla') |
| self.assertEquals(r['mtype'], 'ERROR') |
| self.assertEquals(r['error'], 'Mode is not known') |
| |
| r = self.do_set('bts.0.gprs-mode', 'egprs') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['value'], 'egprs') |
| |
| r = self.do_get('bts.0.gprs-mode') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'bts.0.gprs-mode') |
| self.assertEquals(r['value'], 'egprs') |
| |
| class TestCtrlNAT(TestCtrlBase): |
| |
| def ctrl_command(self): |
| return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c", |
| "doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"] |
| |
| def ctrl_app(self): |
| return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat") |
| |
| def testAccessList(self): |
| r = self.do_get('net.0.bsc_cfg.0.access-list-name') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'net') |
| self.assertEquals(r['value'], None) |
| |
| r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'net') |
| self.assertEquals(r['value'], 'bla') |
| |
| r = self.do_get('net.0.bsc_cfg.0.access-list-name') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'net') |
| self.assertEquals(r['value'], 'bla') |
| |
| r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1') |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'net') |
| self.assertEquals(r['value'], None) |
| |
| r = self.do_get('net.0.bsc_cfg.0.access-list-name') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'net') |
| self.assertEquals(r['value'], None) |
| |
| def testAccessListManagement(self): |
| r = self.do_set("net.0.add.allow.access-list.404", "abc") |
| self.assertEquals(r['mtype'], 'ERROR') |
| |
| r = self.do_set("net.0.add.allow.access-list.bla", "^234$") |
| self.assertEquals(r['mtype'], 'SET_REPLY') |
| self.assertEquals(r['var'], 'net.0.add.allow.access-list.bla') |
| self.assertEquals(r['value'], 'IMSI allow added to access list') |
| |
| # TODO.. find a way to actually see if this rule has been |
| # added. e.g. by implementing a get for the list. |
| |
| class TestCtrlSGSN(TestCtrlBase): |
| def ctrl_command(self): |
| return ["./src/gprs/osmo-sgsn", "-c", |
| "doc/examples/osmo-sgsn/osmo-sgsn.cfg"] |
| |
| def ctrl_app(self): |
| return (4251, "./src/gprs/osmo-sgsn", "OsmoSGSN", "sgsn") |
| |
| def testListSubscribers(self): |
| # TODO. Add command to mark a subscriber as active |
| r = self.do_get('subscriber-list-active-v1') |
| self.assertEquals(r['mtype'], 'GET_REPLY') |
| self.assertEquals(r['var'], 'subscriber-list-active-v1') |
| self.assertEquals(r['value'], None) |
| |
| def add_bsc_test(suite, workdir): |
| if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")): |
| print("Skipping the BSC test") |
| return |
| test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC) |
| suite.addTest(test) |
| |
| def add_nitb_test(suite, workdir): |
| test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNITB) |
| suite.addTest(test) |
| |
| def add_nat_test(suite, workdir): |
| if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")): |
| print("Skipping the NAT test") |
| return |
| test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT) |
| suite.addTest(test) |
| |
| def add_sgsn_test(suite, workdir): |
| if not os.path.isfile(os.path.join(workdir, "src/gprs/osmo-sgsn")): |
| print("Skipping the SGSN test") |
| return |
| test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlSGSN) |
| suite.addTest(test) |
| |
| if __name__ == '__main__': |
| import argparse |
| import sys |
| |
| workdir = '.' |
| |
| parser = argparse.ArgumentParser() |
| parser.add_argument("-v", "--verbose", dest="verbose", |
| action="store_true", help="verbose mode") |
| parser.add_argument("-p", "--pythonconfpath", dest="p", |
| help="searchpath for config") |
| parser.add_argument("-w", "--workdir", dest="w", |
| help="Working directory") |
| args = parser.parse_args() |
| |
| verbose_level = 1 |
| if args.verbose: |
| verbose_level = 2 |
| verbose = True |
| |
| if args.w: |
| workdir = args.w |
| |
| if args.p: |
| confpath = args.p |
| |
| print "confpath %s, workdir %s" % (confpath, workdir) |
| os.chdir(workdir) |
| print "Running tests for specific control commands" |
| suite = unittest.TestSuite() |
| add_bsc_test(suite, workdir) |
| add_nitb_test(suite, workdir) |
| add_nat_test(suite, workdir) |
| add_sgsn_test(suite, workdir) |
| res = unittest.TextTestRunner(verbosity=verbose_level).run(suite) |
| sys.exit(len(res.errors) + len(res.failures)) |