event_loop: Use glib as mainloop impl and move modem to use event_loop
Several benefits:
- We can add APIs to poll on fds in the future (for smpp socket for
instance) instead of using busy polling.
- During wait(), we now block in the glib mainloop instead of sleeping
0.1 secs and not handling events during that time.
- We remove glib mainloop specific bits from modem.py
Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767
diff --git a/src/osmo_gsm_tester/event_loop.py b/src/osmo_gsm_tester/event_loop.py
index ebe6afb..068eca9 100644
--- a/src/osmo_gsm_tester/event_loop.py
+++ b/src/osmo_gsm_tester/event_loop.py
@@ -18,48 +18,129 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
+from gi.repository import GLib, GObject
+
from . import log
-poll_funcs = []
+class DeferredHandling:
+ defer_queue = []
+
+ def handle_queue(self):
+ while DeferredHandling.defer_queue:
+ handler, args, kwargs = self.defer_queue.pop(0)
+ handler(*args, **kwargs)
+
+ def defer(self, handler, *args, **kwargs):
+ self.defer_queue.append((handler, args, kwargs))
+
+class WaitRequest:
+ timeout_ack = False
+ condition_ack = False
+
+ def __init__(self, condition, condition_args, condition_kwargs, timeout, timestep):
+ self.timeout_started = time.time()
+ self.timeout = timeout
+ self.condition = condition
+ self.condition_args = condition_args
+ self.condition_kwargs = condition_kwargs
+
+ def condition_check(self):
+ #print("_wait_condition_check")
+ waited = time.time() - self.timeout_started
+ if self.condition(*self.condition_args, **self.condition_kwargs):
+ self.condition_ack = True
+ elif waited > self.timeout:
+ self.timeout_ack = True
+
+class EventLoop:
+ poll_funcs = []
+ gloop = None
+ gctx = None
+ deferred_handling = None
+
+ def __init__(self):
+ self.gloop = GLib.MainLoop()
+ self.gctx = self.gloop.get_context()
+ self.deferred_handling = DeferredHandling()
+
+ def _trigger_cb_func(self, user_data):
+ self.defer(user_data)
+ return True #to retrigger the timeout
+
+ def defer(self, handler, *args, **kwargs):
+ self.deferred_handling.defer(handler, *args, **kwargs)
+
+ def register_poll_func(self, func, timestep=1):
+ id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, func) # in 1/1000th of a sec
+ self.poll_funcs.append((func, id))
+
+ def unregister_poll_func(self, func):
+ for pair in self.poll_funcs:
+ f, id = pair
+ if f == func:
+ GObject.source_remove(id)
+ self.poll_funcs.remove(pair)
+ return
+
+ def poll(self, may_block=False):
+ self.gctx.iteration(may_block)
+ self.deferred_handling.handle_queue()
+
+ def wait_no_raise(self, log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+ if not timeout or timeout < 0:
+ self = log_obj
+ raise log.Error('wait() *must* time out at some point.', timeout=timeout)
+ if timestep < 0.1:
+ timestep = 0.1
+
+ wait_req = WaitRequest(condition, condition_args, condition_kwargs, timeout, timestep)
+ wait_id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, wait_req.condition_check)
+ while True:
+ self.poll(may_block=True)
+ if wait_req.condition_ack or wait_req.timeout_ack:
+ GObject.source_remove(wait_id)
+ success = wait_req.condition_ack
+ return success
+
+ def wait(self, log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
+ if not self.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+ log.ctx(log_obj)
+ raise log.Error('Wait timeout')
+
+ def sleep(self, log_obj, seconds):
+ assert seconds > 0.
+ self.wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=seconds)
+
+
+evloop = EventLoop()
def register_poll_func(func):
- global poll_funcs
- poll_funcs.append(func)
+ global evloop
+ evloop.register_poll_func(func)
def unregister_poll_func(func):
- global poll_funcs
- poll_funcs.remove(func)
+ global evloop
+ evloop.unregister_poll_func(func)
def poll():
- global poll_funcs
- for func in poll_funcs:
- func()
+ global evloop
+ evloop.poll()
def wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
- if not timeout or timeout < 0:
- self = log_obj
- raise log.Error('wait() *must* time out at some point.', timeout=timeout)
- if timestep < 0.1:
- timestep = 0.1
-
- started = time.time()
- while True:
- poll()
- if condition(*condition_args, **condition_kwargs):
- return True
- waited = time.time() - started
- if waited > timeout:
- return False
- time.sleep(timestep)
+ global evloop
+ evloop.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep)
def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
- if not wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
- log.ctx(log_obj)
- raise log.Error('Wait timeout')
+ global evloop
+ evloop.wait(log_obj, condition, *condition_args, timeout=timeout, timestep=timestep, **condition_kwargs)
def sleep(log_obj, seconds):
- assert seconds > 0.
- wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1))
+ global evloop
+ evloop.sleep(log_obj, seconds)
+
+def defer(handler, *args, **kwargs):
+ global evloop
+ evloop.defer(handler, *args, **kwargs)
# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/modem.py b/src/osmo_gsm_tester/modem.py
index f50a291..8d41935 100644
--- a/src/osmo_gsm_tester/modem.py
+++ b/src/osmo_gsm_tester/modem.py
@@ -30,8 +30,6 @@
Gio = get_introspection_module('Gio')
from gi.repository import GLib
-glib_main_loop = GLib.MainLoop()
-glib_main_ctx = glib_main_loop.get_context()
bus = SystemBus()
I_MODEM = 'org.ofono.Modem'
@@ -49,24 +47,14 @@
NETREG_MAX_REGISTER_ATTEMPTS = 3
-class DeferredHandling:
- defer_queue = []
+class DeferredDBus:
def __init__(self, dbus_iface, handler):
self.handler = handler
self.subscription_id = dbus_iface.connect(self.receive_signal)
def receive_signal(self, *args, **kwargs):
- DeferredHandling.defer_queue.append((self.handler, args, kwargs))
-
- @staticmethod
- def handle_queue():
- while DeferredHandling.defer_queue:
- handler, args, kwargs = DeferredHandling.defer_queue.pop(0)
- handler(*args, **kwargs)
-
-def defer(handler, *args, **kwargs):
- DeferredHandling.defer_queue.append((handler, args, kwargs))
+ event_loop.defer(self.handler, *args, **kwargs)
def dbus_connect(dbus_iface, handler):
'''This function shall be used instead of directly connecting DBus signals.
@@ -75,15 +63,7 @@
so that a signal handler is invoked only after the DBus polling is through
by enlisting signals that should be handled in the
DeferredHandling.defer_queue.'''
- return DeferredHandling(dbus_iface, handler).subscription_id
-
-def poll_glib():
- global glib_main_ctx
- while glib_main_ctx.pending():
- glib_main_ctx.iteration()
- DeferredHandling.handle_queue()
-
-event_loop.register_poll_func(poll_glib)
+ return DeferredDBus(dbus_iface, handler).subscription_id
def systembus_get(path):
global bus
@@ -493,8 +473,8 @@
# waiting for that. Make it async and try to register when the scan is
# finished.
register_func = self.scan_cb_register_automatic if mcc_mnc is None else self.scan_cb_register
- result_handler = lambda obj, result, user_data: defer(register_func, result, user_data)
- error_handler = lambda obj, e, user_data: defer(self.scan_cb_error_handler, e, mcc_mnc)
+ result_handler = lambda obj, result, user_data: event_loop.defer(register_func, result, user_data)
+ error_handler = lambda obj, e, user_data: event_loop.defer(self.scan_cb_error_handler, e, mcc_mnc)
dbus_async_call(netreg, netreg.Scan, timeout=30, cancellable=self.cancellable,
result_handler=result_handler, error_handler=error_handler,
user_data=mcc_mnc)
@@ -559,7 +539,7 @@
self.cancellable.cancel()
# Cancel op is applied as a signal coming from glib mainloop, so we
# need to run it and wait for the callbacks to handle cancellations.
- poll_glib()
+ event_loop.poll()
# once it has been triggered, create a new one for next operation:
self.cancellable = Gio.Cancellable.new()