blob: e132e21cd724e8184eb62f5813a313da85f68fe5 [file] [log] [blame]
Neels Hofmeyr3531a192017-03-28 14:30:28 +02001# osmo_gsm_tester: language snippets
2#
3# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Neels Hofmeyr <neels@hofmeyr.de>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as
9# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import os
21import sys
22import time
23import fcntl
24import hashlib
25import tempfile
26import shutil
27import atexit
28import threading
29import importlib.util
30import fcntl
31import tty
32import termios
33
34
35class listdict:
36 'a dict of lists { "a": [1, 2, 3], "b": [1, 2] }'
37 def __getattr__(ld, name):
38 if name == 'add':
39 return ld.__getattribute__(name)
40 return ld.__dict__.__getattribute__(name)
41
42 def add(ld, name, item):
43 l = ld.__dict__.get(name)
44 if not l:
45 l = []
46 ld.__dict__[name] = l
47 l.append(item)
48 return l
49
50 def add_dict(ld, d):
51 for k,v in d.items():
52 ld.add(k, v)
53
54 def __setitem__(ld, name, val):
55 return ld.__dict__.__setitem__(name, val)
56
57 def __getitem__(ld, name):
58 return ld.__dict__.__getitem__(name)
59
60 def __str__(ld):
61 return ld.__dict__.__str__()
62
63
64class DictProxy:
65 '''
66 allow accessing dict entries like object members
67 syntactical sugar, adapted from http://stackoverflow.com/a/31569634
68 so that e.g. templates can do ${bts.member} instead of ${bts['member']}
69 '''
70 def __init__(self, obj):
71 self.obj = obj
72
73 def __getitem__(self, key):
74 return dict2obj(self.obj[key])
75
76 def __getattr__(self, key):
Your Name44af3412017-04-13 03:11:59 +020077 'provide error information to know which template item was missing'
Neels Hofmeyr3531a192017-03-28 14:30:28 +020078 try:
79 return dict2obj(getattr(self.obj, key))
80 except AttributeError:
81 try:
82 return self[key]
83 except KeyError:
84 raise AttributeError(key)
85
Neels Hofmeyr3531a192017-03-28 14:30:28 +020086def dict2obj(value):
Your Name44af3412017-04-13 03:11:59 +020087 if is_list(value) or is_dict(value):
Neels Hofmeyr3531a192017-03-28 14:30:28 +020088 return DictProxy(value)
Neels Hofmeyr3531a192017-03-28 14:30:28 +020089 return value
90
91
92class FileLock:
93 def __init__(self, path, owner):
94 self.path = path
95 self.owner = owner
96 self.f = None
97
98 def __enter__(self):
99 if self.f is not None:
100 return
101 self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC)
102 fcntl.flock(self.fd, fcntl.LOCK_EX)
103 os.truncate(self.fd, 0)
104 os.write(self.fd, str(self.owner).encode('utf-8'))
105 os.fsync(self.fd)
106
107 def __exit__(self, *exc_info):
108 #fcntl.flock(self.fd, fcntl.LOCK_UN)
109 os.truncate(self.fd, 0)
110 os.fsync(self.fd)
111 os.close(self.fd)
112 self.fd = -1
113
114 def lock(self):
115 self.__enter__()
116
117 def unlock(self):
118 self.__exit__()
119
120
121class Dir():
122 LOCK_FILE = 'lock'
123
124 def __init__(self, path):
125 self.path = path
126 self.lock_path = os.path.join(self.path, Dir.LOCK_FILE)
127
128 def lock(self, origin_id):
129 '''
130 return lock context, usage:
131
132 with my_dir.lock(origin):
133 read_from(my_dir.child('foo.txt'))
134 write_to(my_dir.child('bar.txt'))
135 '''
136 self.mkdir()
137 return FileLock(self.lock_path, origin_id)
138
139 @staticmethod
140 def ensure_abs_dir_exists(*path_elements):
141 l = len(path_elements)
142 if l < 1:
143 raise RuntimeError('Cannot create empty path')
144 if l == 1:
145 path = path_elements[0]
146 else:
147 path = os.path.join(*path_elements)
148 if not os.path.isdir(path):
149 os.makedirs(path)
150
151 def child(self, *rel_path):
152 if not rel_path:
153 return self.path
154 return os.path.join(self.path, *rel_path)
155
156 def mk_parentdir(self, *rel_path):
157 child = self.child(*rel_path)
158 child_parent = os.path.dirname(child)
159 Dir.ensure_abs_dir_exists(child_parent)
160 return child
161
162 def mkdir(self, *rel_path):
163 child = self.child(*rel_path)
164 Dir.ensure_abs_dir_exists(child)
165 return child
166
167 def children(self):
168 return os.listdir(self.path)
169
170 def exists(self, *rel_path):
171 return os.path.exists(self.child(*rel_path))
172
173 def isdir(self, *rel_path):
174 return os.path.isdir(self.child(*rel_path))
175
176 def isfile(self, *rel_path):
177 return os.path.isfile(self.child(*rel_path))
178
179 def new_child(self, *rel_path):
180 attempt = 1
181 prefix, suffix = os.path.splitext(self.child(*rel_path))
182 rel_path_fmt = '%s%%s%s' % (prefix, suffix)
183 while True:
184 path = rel_path_fmt % (('_%d'%attempt) if attempt > 1 else '')
185 if not os.path.exists(path):
186 break
187 attempt += 1
188 continue
189 Dir.ensure_abs_dir_exists(os.path.dirname(path))
190 return path
191
192 def rel_path(self, path):
193 return os.path.relpath(path, self.path)
194
195 def touch(self, *rel_path):
196 touch_file(self.child(*rel_path))
197
198 def new_file(self, *rel_path):
199 path = self.new_child(*rel_path)
200 touch_file(path)
201 return path
202
203 def new_dir(self, *rel_path):
204 path = self.new_child(*rel_path)
205 Dir.ensure_abs_dir_exists(path)
206 return path
207
208 def __str__(self):
209 return self.path
210 def __repr__(self):
211 return self.path
212
213def touch_file(path):
214 with open(path, 'a') as f:
215 f.close()
216
217def is_dict(l):
218 return isinstance(l, dict)
219
220def is_list(l):
221 return isinstance(l, (list, tuple))
222
223
224def dict_add(a, *b, **c):
225 for bb in b:
226 a.update(bb)
227 a.update(c)
228 return a
229
230def _hash_recurse(acc, obj, ignore_keys):
231 if is_dict(obj):
232 for key, val in sorted(obj.items()):
233 if key in ignore_keys:
234 continue
235 _hash_recurse(acc, val, ignore_keys)
236 return
237
238 if is_list(obj):
239 for item in obj:
240 _hash_recurse(acc, item, ignore_keys)
241 return
242
243 acc.update(str(obj).encode('utf-8'))
244
245def hash_obj(obj, *ignore_keys):
246 acc = hashlib.sha1()
247 _hash_recurse(acc, obj, ignore_keys)
248 return acc.hexdigest()
249
250
251def md5(of_content):
252 if isinstance(of_content, str):
253 of_content = of_content.encode('utf-8')
254 return hashlib.md5(of_content).hexdigest()
255
256def md5_of_file(path):
257 with open(path, 'rb') as f:
258 return md5(f.read())
259
260_tempdir = None
261
262def get_tempdir(remove_on_exit=True):
263 global _tempdir
264 if _tempdir is not None:
265 return _tempdir
266 _tempdir = tempfile.mkdtemp()
267 if remove_on_exit:
268 atexit.register(lambda: shutil.rmtree(_tempdir))
269 return _tempdir
270
271
272if hasattr(importlib.util, 'module_from_spec'):
273 def run_python_file(module_name, path):
274 spec = importlib.util.spec_from_file_location(module_name, path)
275 spec.loader.exec_module( importlib.util.module_from_spec(spec) )
276else:
277 from importlib.machinery import SourceFileLoader
278 def run_python_file(module_name, path):
279 SourceFileLoader(module_name, path).load_module()
280
281def msisdn_inc(msisdn_str):
282 'add 1 and preserve leading zeros'
283 return ('%%0%dd' % len(msisdn_str)) % (int(msisdn_str) + 1)
284
285class polling_stdin:
286 def __init__(self, stream):
287 self.stream = stream
288 self.fd = self.stream.fileno()
289 def __enter__(self):
290 self.original_stty = termios.tcgetattr(self.stream)
291 tty.setcbreak(self.stream)
292 self.orig_fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
293 fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl | os.O_NONBLOCK)
294 def __exit__(self, *args):
295 fcntl.fcntl(self.fd, fcntl.F_SETFL, self.orig_fl)
296 termios.tcsetattr(self.stream, termios.TCSANOW, self.original_stty)
297
298def input_polling(poll_func, stream=None):
299 if stream is None:
300 stream = sys.stdin
301 unbuffered_stdin = os.fdopen(stream.fileno(), 'rb', buffering=0)
302 try:
303 with polling_stdin(unbuffered_stdin):
304 acc = []
305 while True:
306 poll_func()
307 got = unbuffered_stdin.read(1)
308 if got and len(got):
309 try:
310 # this is hacky: can't deal with multibyte sequences
311 got_str = got.decode('utf-8')
312 except:
313 got_str = '?'
314 acc.append(got_str)
315 sys.__stdout__.write(got_str)
316 sys.__stdout__.flush()
317 if '\n' in got_str:
318 return ''.join(acc)
319 time.sleep(.1)
320 finally:
321 unbuffered_stdin.close()
322
323# vim: expandtab tabstop=4 shiftwidth=4