blob: e9a1f3040fbfb90e2517a24c77013153ce24e3fb [file] [log] [blame]
Neels Hofmeyr3531a192017-03-28 14:30:28 +02001# osmo_gsm_tester: language snippets
2#
3# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Neels Hofmeyr <neels@hofmeyr.de>
6#
7# This program is free software: you can redistribute it and/or modify
Harald Welte27205342017-06-03 09:51:45 +02008# it under the terms of the GNU General Public License as
Neels Hofmeyr3531a192017-03-28 14:30:28 +02009# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Harald Welte27205342017-06-03 09:51:45 +020015# GNU General Public License for more details.
Neels Hofmeyr3531a192017-03-28 14:30:28 +020016#
Harald Welte27205342017-06-03 09:51:45 +020017# You should have received a copy of the GNU General Public License
Neels Hofmeyr3531a192017-03-28 14:30:28 +020018# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import os
21import sys
22import time
23import fcntl
24import hashlib
25import tempfile
26import shutil
27import atexit
28import threading
29import importlib.util
Pau Espin Pedrol13143bc2017-05-08 17:07:28 +020030import subprocess
Neels Hofmeyr3531a192017-03-28 14:30:28 +020031
Neels Hofmeyr0af893c2017-12-14 15:18:05 +010032# This mirrors enum osmo_auth_algo in libosmocore/include/osmocom/crypt/auth.h
33# so that the index within the tuple matches the enum value.
34OSMO_AUTH_ALGO_NONE = 'none'
35ENUM_OSMO_AUTH_ALGO = (OSMO_AUTH_ALGO_NONE, 'comp128v1', 'comp128v2', 'comp128v3', 'xor', 'milenage')
36
37def osmo_auth_algo_by_name(algo_str):
38 'Return enum osmo_auth_algo numeric value as from libosmocore, raise ValueError if not defined.'
39 return ENUM_OSMO_AUTH_ALGO.index(algo_str.lower())
40
Pau Espin Pedrole39c6f12017-05-08 16:21:38 +020041def prepend_library_path(path):
42 lp = os.getenv('LD_LIBRARY_PATH')
43 if not lp:
44 return path
45 return path + ':' + lp
46
Pau Espin Pedroled6d4452017-10-25 17:06:53 +020047def change_elf_rpath(binary, paths, run_dir):
48 '''
49 Change RPATH field in ELF executable binary.
50 This feature can be used to tell the loaded to load the trial libraries, as
51 LD_LIBRARY_PATH is disabled for paths with modified capabilities.
52 '''
53 from .process import Process
54 proc = Process('patchelf', run_dir, ['patchelf', '--set-rpath', paths, binary])
55 proc.launch()
56 proc.wait()
57 if proc.result != 0:
58 raise RuntimeError('patchelf finished with err code %d' % proc.result)
59
Pau Espin Pedrol13143bc2017-05-08 17:07:28 +020060def ip_to_iface(ip):
61 try:
62 for iface in os.listdir('/sys/class/net'):
63 proc = subprocess.Popen(['ip', 'addr', 'show', 'dev', iface], stdout=subprocess.PIPE, universal_newlines=True)
64 for line in proc.stdout.readlines():
65 if 'inet' in line and ' ' + ip + '/' in line:
Pau Espin Pedrol74a76762017-08-10 12:07:49 +020066 return line.split()[-1]
Holger Hans Peter Freyther34dce0e2019-02-27 04:34:00 +000067 except Exception:
Pau Espin Pedrol13143bc2017-05-08 17:07:28 +020068 pass
69 return None
70
Pau Espin Pedrola238ed92018-03-26 13:06:12 +020071def dst_ip_get_local_bind(ip):
72 '''Retrieve default IP addr to bind to in order to route traffic to dst addr'''
73 try:
74 proc = subprocess.Popen(['ip', 'route', 'get', ip], stdout=subprocess.PIPE, universal_newlines=True)
75 output = proc.stdout.readlines()
76 words = output[0].split()
77 i = 0
78 while i < len(words):
79 if words[i] == 'src':
80 return words[i+1]
81 i += 1
Holger Hans Peter Freyther34dce0e2019-02-27 04:34:00 +000082 except Exception:
Pau Espin Pedrola238ed92018-03-26 13:06:12 +020083 pass
84 return None
85
Pau Espin Pedrol33c154b2017-10-25 16:16:25 +020086def setcap_net_raw(binary, run_dir):
87 '''
88 This functionality requires specific setup on the host running
89 osmo-gsm-tester. See osmo-gsm-tester manual for more information.
90 '''
91 from .process import Process
Pau Espin Pedrolf0b8e372017-12-13 16:08:35 +010092 SETCAP_NET_RAW_BIN = 'osmo-gsm-tester_setcap_net_raw.sh'
93 proc = Process(SETCAP_NET_RAW_BIN, run_dir, ['sudo', SETCAP_NET_RAW_BIN, binary])
Pau Espin Pedrol33c154b2017-10-25 16:16:25 +020094 proc.launch()
95 proc.wait()
96 if proc.result != 0:
Pau Espin Pedrolf0b8e372017-12-13 16:08:35 +010097 raise RuntimeError('%s finished with err code %d' % (SETCAP_NET_RAW_BIN, proc.result))
98
99def setcap_net_admin(binary, run_dir):
100 '''
101 This functionality requires specific setup on the host running
102 osmo-gsm-tester. See osmo-gsm-tester manual for more information.
103 '''
104 from .process import Process
105 SETCAP_NET_ADMIN_BIN = 'osmo-gsm-tester_setcap_net_admin.sh'
106 proc = Process(SETCAP_NET_ADMIN_BIN, run_dir, ['sudo', SETCAP_NET_ADMIN_BIN, binary])
107 proc.launch()
108 proc.wait()
109 if proc.result != 0:
110 raise RuntimeError('%s finished with err code %d' % (SETCAP_NET_ADMIN_BIN, proc.result))
Pau Espin Pedrol33c154b2017-10-25 16:16:25 +0200111
Pau Espin Pedrol7e02d202018-05-08 15:28:48 +0200112def import_path_prepend(pathname):
113 dir = os.path.realpath(pathname)
114 if dir not in sys.path:
115 sys.path.insert(0, dir)
116
117def import_path_remove(pathname):
118 dir = os.path.realpath(pathname)
Pau Espin Pedrolf32c4152018-05-14 15:34:40 +0200119 if dir in sys.path:
Pau Espin Pedrol7e02d202018-05-08 15:28:48 +0200120 sys.path.remove(dir)
121
Neels Hofmeyr803d87c2017-05-10 13:31:41 +0200122class listdict(dict):
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200123 'a dict of lists { "a": [1, 2, 3], "b": [1, 2] }'
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200124
Neels Hofmeyr803d87c2017-05-10 13:31:41 +0200125 def add(self, name, item):
126 l = self.get(name)
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200127 if not l:
128 l = []
Neels Hofmeyr803d87c2017-05-10 13:31:41 +0200129 self[name] = l
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200130 l.append(item)
131 return l
132
Neels Hofmeyr803d87c2017-05-10 13:31:41 +0200133 def add_dict(self, d):
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200134 for k,v in d.items():
Neels Hofmeyr803d87c2017-05-10 13:31:41 +0200135 self.add(k, v)
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200136
137class DictProxy:
138 '''
139 allow accessing dict entries like object members
140 syntactical sugar, adapted from http://stackoverflow.com/a/31569634
141 so that e.g. templates can do ${bts.member} instead of ${bts['member']}
142 '''
143 def __init__(self, obj):
144 self.obj = obj
145
146 def __getitem__(self, key):
147 return dict2obj(self.obj[key])
148
149 def __getattr__(self, key):
Your Name44af3412017-04-13 03:11:59 +0200150 'provide error information to know which template item was missing'
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200151 try:
152 return dict2obj(getattr(self.obj, key))
153 except AttributeError:
154 try:
155 return self[key]
156 except KeyError:
157 raise AttributeError(key)
158
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200159def dict2obj(value):
Your Name44af3412017-04-13 03:11:59 +0200160 if is_list(value) or is_dict(value):
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200161 return DictProxy(value)
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200162 return value
163
164
165class FileLock:
166 def __init__(self, path, owner):
167 self.path = path
168 self.owner = owner
169 self.f = None
170
171 def __enter__(self):
172 if self.f is not None:
173 return
Neels Hofmeyr936a81e2017-09-14 01:31:41 +0200174 self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200175 fcntl.flock(self.fd, fcntl.LOCK_EX)
176 os.truncate(self.fd, 0)
177 os.write(self.fd, str(self.owner).encode('utf-8'))
178 os.fsync(self.fd)
179
180 def __exit__(self, *exc_info):
181 #fcntl.flock(self.fd, fcntl.LOCK_UN)
182 os.truncate(self.fd, 0)
183 os.fsync(self.fd)
184 os.close(self.fd)
185 self.fd = -1
186
187 def lock(self):
188 self.__enter__()
189
190 def unlock(self):
191 self.__exit__()
192
193
194class Dir():
195 LOCK_FILE = 'lock'
196
197 def __init__(self, path):
198 self.path = path
199 self.lock_path = os.path.join(self.path, Dir.LOCK_FILE)
200
201 def lock(self, origin_id):
202 '''
203 return lock context, usage:
204
205 with my_dir.lock(origin):
206 read_from(my_dir.child('foo.txt'))
207 write_to(my_dir.child('bar.txt'))
208 '''
209 self.mkdir()
210 return FileLock(self.lock_path, origin_id)
211
212 @staticmethod
213 def ensure_abs_dir_exists(*path_elements):
214 l = len(path_elements)
215 if l < 1:
216 raise RuntimeError('Cannot create empty path')
217 if l == 1:
218 path = path_elements[0]
219 else:
220 path = os.path.join(*path_elements)
221 if not os.path.isdir(path):
222 os.makedirs(path)
223
224 def child(self, *rel_path):
225 if not rel_path:
226 return self.path
227 return os.path.join(self.path, *rel_path)
228
229 def mk_parentdir(self, *rel_path):
230 child = self.child(*rel_path)
231 child_parent = os.path.dirname(child)
232 Dir.ensure_abs_dir_exists(child_parent)
233 return child
234
235 def mkdir(self, *rel_path):
236 child = self.child(*rel_path)
237 Dir.ensure_abs_dir_exists(child)
238 return child
239
240 def children(self):
241 return os.listdir(self.path)
242
243 def exists(self, *rel_path):
244 return os.path.exists(self.child(*rel_path))
245
246 def isdir(self, *rel_path):
247 return os.path.isdir(self.child(*rel_path))
248
249 def isfile(self, *rel_path):
250 return os.path.isfile(self.child(*rel_path))
251
252 def new_child(self, *rel_path):
253 attempt = 1
254 prefix, suffix = os.path.splitext(self.child(*rel_path))
255 rel_path_fmt = '%s%%s%s' % (prefix, suffix)
256 while True:
257 path = rel_path_fmt % (('_%d'%attempt) if attempt > 1 else '')
258 if not os.path.exists(path):
259 break
260 attempt += 1
261 continue
262 Dir.ensure_abs_dir_exists(os.path.dirname(path))
263 return path
264
265 def rel_path(self, path):
266 return os.path.relpath(path, self.path)
267
268 def touch(self, *rel_path):
269 touch_file(self.child(*rel_path))
270
271 def new_file(self, *rel_path):
272 path = self.new_child(*rel_path)
273 touch_file(path)
274 return path
275
276 def new_dir(self, *rel_path):
277 path = self.new_child(*rel_path)
278 Dir.ensure_abs_dir_exists(path)
279 return path
280
281 def __str__(self):
282 return self.path
283 def __repr__(self):
284 return self.path
285
286def touch_file(path):
287 with open(path, 'a') as f:
288 f.close()
289
290def is_dict(l):
291 return isinstance(l, dict)
292
293def is_list(l):
294 return isinstance(l, (list, tuple))
295
296
297def dict_add(a, *b, **c):
298 for bb in b:
299 a.update(bb)
300 a.update(c)
301 return a
302
303def _hash_recurse(acc, obj, ignore_keys):
304 if is_dict(obj):
305 for key, val in sorted(obj.items()):
306 if key in ignore_keys:
307 continue
308 _hash_recurse(acc, val, ignore_keys)
309 return
310
311 if is_list(obj):
312 for item in obj:
313 _hash_recurse(acc, item, ignore_keys)
314 return
315
316 acc.update(str(obj).encode('utf-8'))
317
318def hash_obj(obj, *ignore_keys):
319 acc = hashlib.sha1()
320 _hash_recurse(acc, obj, ignore_keys)
321 return acc.hexdigest()
322
323
324def md5(of_content):
325 if isinstance(of_content, str):
326 of_content = of_content.encode('utf-8')
327 return hashlib.md5(of_content).hexdigest()
328
329def md5_of_file(path):
330 with open(path, 'rb') as f:
331 return md5(f.read())
332
333_tempdir = None
334
335def get_tempdir(remove_on_exit=True):
336 global _tempdir
337 if _tempdir is not None:
338 return _tempdir
339 _tempdir = tempfile.mkdtemp()
340 if remove_on_exit:
341 atexit.register(lambda: shutil.rmtree(_tempdir))
342 return _tempdir
343
344
345if hasattr(importlib.util, 'module_from_spec'):
346 def run_python_file(module_name, path):
347 spec = importlib.util.spec_from_file_location(module_name, path)
348 spec.loader.exec_module( importlib.util.module_from_spec(spec) )
349else:
350 from importlib.machinery import SourceFileLoader
351 def run_python_file(module_name, path):
352 SourceFileLoader(module_name, path).load_module()
353
354def msisdn_inc(msisdn_str):
355 'add 1 and preserve leading zeros'
356 return ('%%0%dd' % len(msisdn_str)) % (int(msisdn_str) + 1)
357
Neels Hofmeyracf0c932017-05-06 16:05:33 +0200358class InputThread(threading.Thread):
359 def __init__(self, prompt):
360 super().__init__()
361 self.prompt = prompt
362 self.result = None
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200363
Neels Hofmeyracf0c932017-05-06 16:05:33 +0200364 def run(self):
365 self.result = input(self.prompt)
366
367def input_polling(prompt, poll_func):
368 input_thread = InputThread(prompt)
369 input_thread.start()
370
371 while input_thread.is_alive():
372 poll_func()
373 time.sleep(1)
374
375 input_thread.join()
376 return input_thread.result
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200377
Pau Espin Pedrol404e1502017-08-22 11:17:43 +0200378def str2bool(val):
379 if val is None or not val:
380 return False
381 if val.upper() in ['FALSE', 'NO', 'OFF']:
382 return False
383 if val.upper() in ['TRUE','YES', 'ON']:
384 return True
385 raise ValueError('Invalid BOOL field: %r' % val)
386
Pau Espin Pedrol43737da2017-08-28 14:04:07 +0200387def list_validate_same_elem_type(li):
388 '''
389 Checks that all elements in the list are of the same type and returns that type.
390 If the list is empty, returns None
391 If one of the elements is not of the same type, it throws a ValueError exception.
392 '''
393 if len(li) == 0:
394 return None
395 t = type(li[0])
396 for elem in li:
397 if type(elem) != t:
398 raise ValueError('List contains elements of different types: %r vs %r' % (t, type(elem)))
399 return t
400
401def empty_instance_type(t):
402 if t == dict:
403 return {}
404 elif t == list:
405 return []
406 elif t == tuple:
407 return ()
408 raise ValueError('type %r not supported!' % t)
409
Pau Espin Pedrolabd556a2017-09-04 16:26:08 +0200410def encryption2osmovty(val):
411 assert val[:3] == 'a5_'
412 return 'a5 ' + val[3:]
413
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200414# vim: expandtab tabstop=4 shiftwidth=4