blob: dfdec403cfe9683bdd76ca57c5a56324aba95005 [file] [log] [blame]
Jacob Erlbeck0760a832013-09-16 11:20:28 +02001#!/usr/bin/env python
2
3# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
4# based on vty_test_runner.py:
5# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
6# (C) 2013 by Holger Hans Peter Freyther
7# based on bsc_control.py.
8
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22import os
23import time
24import unittest
25import socket
26import sys
27import struct
28
29import osmopy.obscvty as obscvty
30import osmopy.osmoutil as osmoutil
31
32confpath = '.'
33verbose = False
34
35class TestCtrlBase(unittest.TestCase):
36
37 def ctrl_command(self):
38 raise Exception("Needs to be implemented by a subclass")
39
40 def ctrl_app(self):
41 raise Exception("Needs to be implemented by a subclass")
42
43 def setUp(self):
44 osmo_ctrl_cmd = self.ctrl_command()[:]
45 config_index = osmo_ctrl_cmd.index('-c')
46 if config_index:
47 cfi = config_index + 1
48 osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
49
50 try:
51 print "Launch: %s from %s" % (' '.join(osmo_ctrl_cmd), os.getcwd())
52 self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
53 except OSError:
54 print >> sys.stderr, "Current directory: %s" % os.getcwd()
55 print >> sys.stderr, "Consider setting -b"
56 time.sleep(2)
57
58 appstring = self.ctrl_app()[2]
59 appport = self.ctrl_app()[0]
60 self.connect("127.0.0.1", appport)
61 self.next_id = 1000
62
63 def tearDown(self):
64 self.disconnect()
65 osmoutil.end_proc(self.proc)
66
67 def prefix_ipa_ctrl_header(self, data):
68 return struct.pack(">HBB", len(data)+1, 0xee, 0) + data
69
70 def remove_ipa_ctrl_header(self, data):
71 if (len(data) < 4):
72 raise BaseException("Answer too short!")
73 (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
74 if (plen + 3 > len(data)):
75 print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
76 if (ipa_proto != 0xee or osmo_proto != 0):
77 raise BaseException("Wrong protocol in answer!")
78
79 return data[4:plen+3], data[plen+3:]
80
81 def disconnect(self):
82 if not (self.sock is None):
83 self.sock.close()
84
85 def connect(self, host, port):
86 if verbose:
87 print "Connecting to host %s:%i" % (host, port)
88
89 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
90 sck.setblocking(1)
91 sck.connect((host, port))
92 self.sock = sck
93 return sck
94
95 def send(self, data):
96 if verbose:
97 print "Sending \"%s\"" %(data)
98 data = self.prefix_ipa_ctrl_header(data)
99 return self.sock.send(data) == len(data)
100
101 def send_set(self, var, value, id):
102 setmsg = "SET %s %s %s" %(id, var, value)
103 return self.send(setmsg)
104
105 def send_get(self, var, id):
106 getmsg = "GET %s %s" %(id, var)
107 return self.send(getmsg)
108
109 def do_set(self, var, value):
110 id = self.next_id
111 self.next_id += 1
112 self.send_set(var, value, id)
113 return self.recv_msgs()[id]
114
115 def do_get(self, var):
116 id = self.next_id
117 self.next_id += 1
118 self.send_get(var, id)
119 return self.recv_msgs()[id]
120
121 def recv_msgs(self):
122 responses = {}
123 data = self.sock.recv(4096)
124 while (len(data)>0):
125 (answer, data) = self.remove_ipa_ctrl_header(data)
126 if verbose:
127 print "Got message:", answer
128 (mtype, id, msg) = answer.split(None, 2)
129 id = int(id)
130 rsp = {'mtype': mtype, 'id': id}
131 if mtype == "ERROR":
132 rsp['error'] = msg
133 else:
134 [rsp['var'], rsp['value']] = msg.split(None, 2)
135
136 responses[id] = rsp
137
138 if verbose:
139 print "Decoded replies: ", responses
140
141 return responses
142
143
144class TestCtrlBSC(TestCtrlBase):
145
146 def tearDown(self):
147 TestCtrlBase.tearDown(self)
148 os.unlink("tmp_dummy_sock")
149
150 def ctrl_command(self):
151 return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
152 "doc/examples/osmo-bsc/osmo-bsc.cfg"]
153
154 def ctrl_app(self):
155 return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
156
157 def testCtrlErrs(self):
158 r = self.do_get('invalid')
159 self.assertEquals(r['mtype'], 'ERROR')
160 self.assertEquals(r['error'], 'Command not found')
161
Jacob Erlbeck4f13d032013-09-16 11:20:29 +0200162 r = self.do_set('rf_locked', '999')
163 self.assertEquals(r['mtype'], 'ERROR')
164 self.assertEquals(r['error'], 'Value failed verification.')
165
Jacob Erlbeck0760a832013-09-16 11:20:28 +0200166 r = self.do_get('bts')
167 self.assertEquals(r['mtype'], 'ERROR')
168 self.assertEquals(r['error'], 'Error while parsing the index.')
169
170 r = self.do_get('bts.999')
171 self.assertEquals(r['mtype'], 'ERROR')
172 self.assertEquals(r['error'], 'Error while resolving object')
173
174 def testRfLock(self):
175 r = self.do_get('bts.0.rf_state')
176 self.assertEquals(r['mtype'], 'GET_REPLY')
177 self.assertEquals(r['var'], 'bts.0.rf_state')
178 self.assertEquals(r['value'], 'inoperational,unlocked,on')
179
180 r = self.do_set('rf_locked', '1')
181 self.assertEquals(r['mtype'], 'SET_REPLY')
182 self.assertEquals(r['var'], 'rf_locked')
183 self.assertEquals(r['value'], '1')
184
185 time.sleep(1.5)
186
187 r = self.do_get('bts.0.rf_state')
188 self.assertEquals(r['mtype'], 'GET_REPLY')
189 self.assertEquals(r['var'], 'bts.0.rf_state')
190 self.assertEquals(r['value'], 'inoperational,locked,off')
191
192 r = self.do_set('rf_locked', '0')
193 self.assertEquals(r['mtype'], 'SET_REPLY')
194 self.assertEquals(r['var'], 'rf_locked')
195 self.assertEquals(r['value'], '0')
196
197 time.sleep(1.5)
198
199 r = self.do_get('bts.0.rf_state')
200 self.assertEquals(r['mtype'], 'GET_REPLY')
201 self.assertEquals(r['var'], 'bts.0.rf_state')
202 self.assertEquals(r['value'], 'inoperational,unlocked,on')
203
204def add_bsc_test(suite, workdir):
205 if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
206 print("Skipping the BSC test")
207 return
208 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
209 suite.addTest(test)
210
211if __name__ == '__main__':
212 import argparse
213 import sys
214
215 workdir = '.'
216
217 parser = argparse.ArgumentParser()
218 parser.add_argument("-v", "--verbose", dest="verbose",
219 action="store_true", help="verbose mode")
220 parser.add_argument("-p", "--pythonconfpath", dest="p",
221 help="searchpath for config")
222 parser.add_argument("-w", "--workdir", dest="w",
223 help="Working directory")
224 args = parser.parse_args()
225
226 verbose_level = 1
227 if args.verbose:
228 verbose_level = 2
229 verbose = True
230
231 if args.w:
232 workdir = args.w
233
234 if args.p:
235 confpath = args.p
236
237 print "confpath %s, workdir %s" % (confpath, workdir)
238 os.chdir(workdir)
239 print "Running tests for specific control commands"
240 suite = unittest.TestSuite()
241 add_bsc_test(suite, workdir)
242 res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
243 sys.exit(len(res.errors) + len(res.failures))