blob: 9a4e7285f187036375fea44ff36d8990acdb6b3c [file] [log] [blame]
Neels Hofmeyr3531a192017-03-28 14:30:28 +02001# osmo_gsm_tester: language snippets
2#
3# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Neels Hofmeyr <neels@hofmeyr.de>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as
9# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import os
21import sys
22import time
23import fcntl
24import hashlib
25import tempfile
26import shutil
27import atexit
28import threading
29import importlib.util
30import fcntl
31import tty
Neels Hofmeyracf0c932017-05-06 16:05:33 +020032import readline
Neels Hofmeyr3531a192017-03-28 14:30:28 +020033
34
Pau Espin Pedrole39c6f12017-05-08 16:21:38 +020035def prepend_library_path(path):
36 lp = os.getenv('LD_LIBRARY_PATH')
37 if not lp:
38 return path
39 return path + ':' + lp
40
Neels Hofmeyr3531a192017-03-28 14:30:28 +020041class listdict:
42 'a dict of lists { "a": [1, 2, 3], "b": [1, 2] }'
43 def __getattr__(ld, name):
44 if name == 'add':
45 return ld.__getattribute__(name)
46 return ld.__dict__.__getattribute__(name)
47
48 def add(ld, name, item):
49 l = ld.__dict__.get(name)
50 if not l:
51 l = []
52 ld.__dict__[name] = l
53 l.append(item)
54 return l
55
56 def add_dict(ld, d):
57 for k,v in d.items():
58 ld.add(k, v)
59
60 def __setitem__(ld, name, val):
61 return ld.__dict__.__setitem__(name, val)
62
63 def __getitem__(ld, name):
64 return ld.__dict__.__getitem__(name)
65
66 def __str__(ld):
67 return ld.__dict__.__str__()
68
69
70class DictProxy:
71 '''
72 allow accessing dict entries like object members
73 syntactical sugar, adapted from http://stackoverflow.com/a/31569634
74 so that e.g. templates can do ${bts.member} instead of ${bts['member']}
75 '''
76 def __init__(self, obj):
77 self.obj = obj
78
79 def __getitem__(self, key):
80 return dict2obj(self.obj[key])
81
82 def __getattr__(self, key):
Your Name44af3412017-04-13 03:11:59 +020083 'provide error information to know which template item was missing'
Neels Hofmeyr3531a192017-03-28 14:30:28 +020084 try:
85 return dict2obj(getattr(self.obj, key))
86 except AttributeError:
87 try:
88 return self[key]
89 except KeyError:
90 raise AttributeError(key)
91
Neels Hofmeyr3531a192017-03-28 14:30:28 +020092def dict2obj(value):
Your Name44af3412017-04-13 03:11:59 +020093 if is_list(value) or is_dict(value):
Neels Hofmeyr3531a192017-03-28 14:30:28 +020094 return DictProxy(value)
Neels Hofmeyr3531a192017-03-28 14:30:28 +020095 return value
96
97
98class FileLock:
99 def __init__(self, path, owner):
100 self.path = path
101 self.owner = owner
102 self.f = None
103
104 def __enter__(self):
105 if self.f is not None:
106 return
107 self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC)
108 fcntl.flock(self.fd, fcntl.LOCK_EX)
109 os.truncate(self.fd, 0)
110 os.write(self.fd, str(self.owner).encode('utf-8'))
111 os.fsync(self.fd)
112
113 def __exit__(self, *exc_info):
114 #fcntl.flock(self.fd, fcntl.LOCK_UN)
115 os.truncate(self.fd, 0)
116 os.fsync(self.fd)
117 os.close(self.fd)
118 self.fd = -1
119
120 def lock(self):
121 self.__enter__()
122
123 def unlock(self):
124 self.__exit__()
125
126
127class Dir():
128 LOCK_FILE = 'lock'
129
130 def __init__(self, path):
131 self.path = path
132 self.lock_path = os.path.join(self.path, Dir.LOCK_FILE)
133
134 def lock(self, origin_id):
135 '''
136 return lock context, usage:
137
138 with my_dir.lock(origin):
139 read_from(my_dir.child('foo.txt'))
140 write_to(my_dir.child('bar.txt'))
141 '''
142 self.mkdir()
143 return FileLock(self.lock_path, origin_id)
144
145 @staticmethod
146 def ensure_abs_dir_exists(*path_elements):
147 l = len(path_elements)
148 if l < 1:
149 raise RuntimeError('Cannot create empty path')
150 if l == 1:
151 path = path_elements[0]
152 else:
153 path = os.path.join(*path_elements)
154 if not os.path.isdir(path):
155 os.makedirs(path)
156
157 def child(self, *rel_path):
158 if not rel_path:
159 return self.path
160 return os.path.join(self.path, *rel_path)
161
162 def mk_parentdir(self, *rel_path):
163 child = self.child(*rel_path)
164 child_parent = os.path.dirname(child)
165 Dir.ensure_abs_dir_exists(child_parent)
166 return child
167
168 def mkdir(self, *rel_path):
169 child = self.child(*rel_path)
170 Dir.ensure_abs_dir_exists(child)
171 return child
172
173 def children(self):
174 return os.listdir(self.path)
175
176 def exists(self, *rel_path):
177 return os.path.exists(self.child(*rel_path))
178
179 def isdir(self, *rel_path):
180 return os.path.isdir(self.child(*rel_path))
181
182 def isfile(self, *rel_path):
183 return os.path.isfile(self.child(*rel_path))
184
185 def new_child(self, *rel_path):
186 attempt = 1
187 prefix, suffix = os.path.splitext(self.child(*rel_path))
188 rel_path_fmt = '%s%%s%s' % (prefix, suffix)
189 while True:
190 path = rel_path_fmt % (('_%d'%attempt) if attempt > 1 else '')
191 if not os.path.exists(path):
192 break
193 attempt += 1
194 continue
195 Dir.ensure_abs_dir_exists(os.path.dirname(path))
196 return path
197
198 def rel_path(self, path):
199 return os.path.relpath(path, self.path)
200
201 def touch(self, *rel_path):
202 touch_file(self.child(*rel_path))
203
204 def new_file(self, *rel_path):
205 path = self.new_child(*rel_path)
206 touch_file(path)
207 return path
208
209 def new_dir(self, *rel_path):
210 path = self.new_child(*rel_path)
211 Dir.ensure_abs_dir_exists(path)
212 return path
213
214 def __str__(self):
215 return self.path
216 def __repr__(self):
217 return self.path
218
219def touch_file(path):
220 with open(path, 'a') as f:
221 f.close()
222
223def is_dict(l):
224 return isinstance(l, dict)
225
226def is_list(l):
227 return isinstance(l, (list, tuple))
228
229
230def dict_add(a, *b, **c):
231 for bb in b:
232 a.update(bb)
233 a.update(c)
234 return a
235
236def _hash_recurse(acc, obj, ignore_keys):
237 if is_dict(obj):
238 for key, val in sorted(obj.items()):
239 if key in ignore_keys:
240 continue
241 _hash_recurse(acc, val, ignore_keys)
242 return
243
244 if is_list(obj):
245 for item in obj:
246 _hash_recurse(acc, item, ignore_keys)
247 return
248
249 acc.update(str(obj).encode('utf-8'))
250
251def hash_obj(obj, *ignore_keys):
252 acc = hashlib.sha1()
253 _hash_recurse(acc, obj, ignore_keys)
254 return acc.hexdigest()
255
256
257def md5(of_content):
258 if isinstance(of_content, str):
259 of_content = of_content.encode('utf-8')
260 return hashlib.md5(of_content).hexdigest()
261
262def md5_of_file(path):
263 with open(path, 'rb') as f:
264 return md5(f.read())
265
266_tempdir = None
267
268def get_tempdir(remove_on_exit=True):
269 global _tempdir
270 if _tempdir is not None:
271 return _tempdir
272 _tempdir = tempfile.mkdtemp()
273 if remove_on_exit:
274 atexit.register(lambda: shutil.rmtree(_tempdir))
275 return _tempdir
276
277
278if hasattr(importlib.util, 'module_from_spec'):
279 def run_python_file(module_name, path):
280 spec = importlib.util.spec_from_file_location(module_name, path)
281 spec.loader.exec_module( importlib.util.module_from_spec(spec) )
282else:
283 from importlib.machinery import SourceFileLoader
284 def run_python_file(module_name, path):
285 SourceFileLoader(module_name, path).load_module()
286
287def msisdn_inc(msisdn_str):
288 'add 1 and preserve leading zeros'
289 return ('%%0%dd' % len(msisdn_str)) % (int(msisdn_str) + 1)
290
Neels Hofmeyracf0c932017-05-06 16:05:33 +0200291class InputThread(threading.Thread):
292 def __init__(self, prompt):
293 super().__init__()
294 self.prompt = prompt
295 self.result = None
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200296
Neels Hofmeyracf0c932017-05-06 16:05:33 +0200297 def run(self):
298 self.result = input(self.prompt)
299
300def input_polling(prompt, poll_func):
301 input_thread = InputThread(prompt)
302 input_thread.start()
303
304 while input_thread.is_alive():
305 poll_func()
306 time.sleep(1)
307
308 input_thread.join()
309 return input_thread.result
Neels Hofmeyr3531a192017-03-28 14:30:28 +0200310
311# vim: expandtab tabstop=4 shiftwidth=4