blob: 1c5ec2a89dfebadb3b52bfd92a7ae26c05e7a9e0 [file] [log] [blame]
Neels Hofmeyrf95ce042017-09-25 23:22:02 +02001#!/usr/bin/env python
2
3# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
4# (C) 2014 by Holger Hans Peter Freyther
5# based on vty_test_runner.py:
6# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
7# (C) 2013 by Holger Hans Peter Freyther
8# based on bsc_control.py.
9
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23import os
24import time
25import unittest
26import socket
27import sys
28import struct
29import subprocess
30
31import osmopy.osmoutil as osmoutil
32
33# add $top_srcdir/contrib to find ipa.py
34sys.path.append(os.path.join(sys.path[0], '..', 'contrib'))
35
36from ipa import Ctrl, IPA
37
38# to be able to find $top_srcdir/doc/...
39confpath = os.path.join(sys.path[0], '..')
40verbose = False
41
42class TestCtrlBase(unittest.TestCase):
43
44 def ctrl_command(self):
45 raise Exception("Needs to be implemented by a subclass")
46
47 def ctrl_app(self):
48 raise Exception("Needs to be implemented by a subclass")
49
50 def setUp(self):
51 osmo_ctrl_cmd = self.ctrl_command()[:]
52 config_index = osmo_ctrl_cmd.index('-c')
53 if config_index:
54 cfi = config_index + 1
55 osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
56
57 try:
58 self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
59 except OSError:
60 print >> sys.stderr, "Current directory: %s" % os.getcwd()
61 print >> sys.stderr, "Consider setting -b"
62 time.sleep(2)
63
64 appstring = self.ctrl_app()[2]
65 appport = self.ctrl_app()[0]
66 self.connect("127.0.0.1", appport)
67 self.next_id = 1000
68
69 def tearDown(self):
70 self.disconnect()
71 osmoutil.end_proc(self.proc)
72
73 def disconnect(self):
74 if not (self.sock is None):
75 self.sock.close()
76
77 def connect(self, host, port):
78 if verbose:
79 print "Connecting to host %s:%i" % (host, port)
80
81 retries = 30
82 while True:
83 try:
84 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
85 sck.setblocking(1)
86 sck.connect((host, port))
87 except IOError:
88 retries -= 1
89 if retries <= 0:
90 raise
91 time.sleep(.1)
92 continue
93 break
94 self.sock = sck
95 return sck
96
97 def send(self, data):
98 if verbose:
99 print "Sending \"%s\"" %(data)
100 data = Ctrl().add_header(data)
101 return self.sock.send(data) == len(data)
102
103 def send_set(self, var, value, id):
104 setmsg = "SET %s %s %s" %(id, var, value)
105 return self.send(setmsg)
106
107 def send_get(self, var, id):
108 getmsg = "GET %s %s" %(id, var)
109 return self.send(getmsg)
110
111 def do_set(self, var, value):
112 id = self.next_id
113 self.next_id += 1
114 self.send_set(var, value, id)
115 return self.recv_msgs()[id]
116
117 def do_get(self, var):
118 id = self.next_id
119 self.next_id += 1
120 self.send_get(var, id)
121 return self.recv_msgs()[id]
122
123 def assert_reply(self, r, mtype, var, val):
124 expect = dict(mtype=mtype, var=var, value=val)
125 result_matches = all([r.get(k) == expect.get(k) for k in expect.keys()])
126 if not result_matches:
127 print('\nError details:\nGot reply: %r\nExpected reply: %r\n' % (r, expect))
128 self.assertTrue(result_matches)
129
130 def assert_set(self, var, val, result_val):
131 r = self.do_set(var, val)
132 self.assert_reply(r, 'SET_REPLY', var, result_val)
133
134 def assert_get(self, var, result_val):
135 r = self.do_get(var)
136 self.assert_reply(r, 'GET_REPLY', var, result_val)
137
138 def recv_msgs(self):
139 responses = {}
140 data = self.sock.recv(4096)
141 while (len(data)>0):
142 (head, data) = IPA().split_combined(data)
143 answer = Ctrl().rem_header(head)
144 if verbose:
145 print "Got message:", answer
146 (mtype, id, msg) = answer.split(None, 2)
147 id = int(id)
148 rsp = {'mtype': mtype, 'id': id}
149 if mtype == "ERROR":
150 rsp['error'] = msg
151 else:
152 split = msg.split(None, 1)
153 rsp['var'] = split[0]
154 if len(split) > 1:
155 rsp['value'] = split[1]
156 else:
157 rsp['value'] = None
158 responses[id] = rsp
159
160 if verbose:
161 print "Decoded replies: ", responses
162
163 return responses
164
165
Neels Hofmeyrf88c9142017-09-25 23:23:57 +0200166class TestCtrlHLR(TestCtrlBase):
167
168 HLR_DB = 'hlr_ctrl_test.db'
169 HLR_SQL = '%s/sql/hlr.sql' % confpath
170 HLR_TEST_SQL = '%s/tests/test_subscriber.sql' % confpath
171
172 def setUp(self):
173 print('\n')
174 print(os.getcwd())
175 assert subprocess.call('sqlite3 %s < %s' % (self.HLR_DB, self.HLR_SQL), shell=True) == 0
176 assert subprocess.call('sqlite3 %s < %s' % (self.HLR_DB, self.HLR_TEST_SQL), shell=True) == 0
177 super(TestCtrlHLR, self).setUp()
178
179 def tearDown(self):
180 super(TestCtrlHLR, self).tearDown()
Neels Hofmeyrd4bb51b2017-10-10 21:59:02 +0200181 os.unlink(self.HLR_DB)
Neels Hofmeyrf88c9142017-09-25 23:23:57 +0200182
183 def ctrl_command(self):
184 return ["./src/osmo-hlr", "-c", "doc/examples/osmo-hlr.cfg", '-l', 'hlr_ctrl_test.db']
185
186 def ctrl_app(self):
187 return (4259, "./src/osmo-hlr", "OsmoHLR", "hlr")
188
189 def testCtrlErrs(self):
190 r = self.do_get('invalid')
191 self.assertEquals(r['mtype'], 'ERROR')
192 self.assertEquals(r['error'], 'Command not found')
193
194 def testEnableDisablePs(self):
195 self.assert_set('enable-ps', '901990000000001', 'OK')
196 self.assert_set('status-ps', '901990000000001', '1')
197 self.assert_set('enable-ps', '901990000000001', 'OK')
198 self.assert_set('status-ps', '901990000000001', '1')
199 self.assert_set('disable-ps', '901990000000001', 'OK')
200 self.assert_set('status-ps', '901990000000001', '0')
201 self.assert_set('disable-ps', '901990000000001', 'OK')
202 self.assert_set('status-ps', '901990000000001', '0')
203 self.assert_set('enable-ps', '901990000000001', 'OK')
204 self.assert_set('status-ps', '901990000000001', '1')
205
Neels Hofmeyrf95ce042017-09-25 23:22:02 +0200206if __name__ == '__main__':
207 import argparse
208 import sys
209
210 workdir = '.'
211
212 parser = argparse.ArgumentParser()
213 parser.add_argument("-v", "--verbose", dest="verbose",
214 action="store_true", help="verbose mode")
215 parser.add_argument("-p", "--pythonconfpath", dest="p",
216 help="searchpath for config")
217 parser.add_argument("-w", "--workdir", dest="w",
218 help="Working directory")
219 args = parser.parse_args()
220
221 verbose_level = 1
222 if args.verbose:
223 verbose_level = 2
224 verbose = True
225
226 if args.w:
227 workdir = args.w
228
229 if args.p:
230 confpath = args.p
231
232 print "confpath %s, workdir %s" % (confpath, workdir)
233 os.chdir(workdir)
234 print "Running tests for specific control commands"
235 suite = unittest.TestSuite()
Neels Hofmeyrf88c9142017-09-25 23:23:57 +0200236 test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlHLR)
237 suite.addTest(test)
Neels Hofmeyrf95ce042017-09-25 23:22:02 +0200238 res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
239 sys.exit(len(res.errors) + len(res.failures))
240
241# vim: tabstop=4 shiftwidth=4 expandtab