resource: Support waiting for reserved resources until available

Before this patch, almost everything was in place to support concurrent
osmo-gsm-tester instances sharing a common state dir. However, during
resource reservation, if the reservation couldn't be done due to too
many resources being in use, osmo-gsm-tester would fail and skip the
test suite.
With this patch, OGT will wait until some reserved resources are
released and then try requesting the reservation again.

Change-Id: I938602ee890712fda82fd3f812d8edb1bcd05e08
diff --git a/src/osmo_gsm_tester/core/resource.py b/src/osmo_gsm_tester/core/resource.py
index 61a73aa..621522b 100644
--- a/src/osmo_gsm_tester/core/resource.py
+++ b/src/osmo_gsm_tester/core/resource.py
@@ -26,6 +26,7 @@
 from . import config
 from . import util
 from . import schema
+from .event_loop import MainLoop
 
 from .util import is_dict, is_list
 
@@ -48,6 +49,7 @@
     _registered_exit_handler = False
 
     def __init__(self):
+        self.reserved_modified = False
         self.config_path = config.get_main_config_value(config.CFG_RESOURCES_CONF)
         self.state_dir = config.get_state_dir()
         super().__init__(log.C_CNF, conf=self.config_path, state=self.state_dir.path)
@@ -57,6 +59,11 @@
         self.all_resources = Resources(config.read(self.config_path, schema.get_resources_schema()) or {})
         self.all_resources.set_hashes()
 
+    # Used by FileWatch in reserve() method below
+    def reserve_resources_fw_cb(self, event):
+        if event.event_type == 'modified':
+            self.reserved_modified = True
+
     def reserve(self, origin, want, modifiers):
         '''
         attempt to reserve the resources specified in the dict 'want' for
@@ -94,18 +101,42 @@
 
         origin_id = origin.origin_id()
 
-        with self.state_dir.lock(origin_id):
-            rrfile_path = self.state_dir.mk_parentdir(RESERVED_RESOURCES_FILE)
-            reserved = Resources(config.read(rrfile_path, if_missing_return={}))
-            to_be_reserved = self.all_resources.without(reserved).find(origin, want)
+        # Make sure wanted resources can ever be reserved, even if all
+        # resources are unallocated. It will throw an exception if not
+        # possible:
+        self.all_resources.find(origin, want, None, False, True, 'Verifying')
+        self.reserved_modified = True # go through on first attempt
+        rrfile_path = self.state_dir.mk_parentdir(RESERVED_RESOURCES_FILE)
+        fw = util.FileWatch(origin, rrfile_path, self.reserve_resources_fw_cb)
+        fw.start()
+        while True:
+            # First, figure out if  RESERVED_RESOURCES_FILE was modified since last time we checked:
+            modified = False
+            try:
+                fw.get_lock().acquire()
+                if self.reserved_modified:
+                    modified = True
+                    self.reserved_modified = False
+            finally:
+                fw.get_lock().release()
 
-            to_be_reserved.mark_reserved_by(origin_id)
-
-            reserved.add(to_be_reserved)
-            config.write(rrfile_path, reserved)
-
-            self.remember_to_free(to_be_reserved)
-            return ReservedResources(self, origin, to_be_reserved, modifiers)
+            if modified: # file was modified, attempt to reserve resources
+                # It should be possible at some point to reserve the wanted
+                # resources, so try and wait for some to be released if it's not
+                # possible to allocate them now:
+                try:
+                    with self.state_dir.lock(origin_id):
+                        reserved = Resources(config.read(rrfile_path, if_missing_return={}))
+                        to_be_reserved = self.all_resources.without(reserved).find(origin, want)
+                        to_be_reserved.mark_reserved_by(origin_id)
+                        reserved.add(to_be_reserved)
+                        fw.stop()
+                        config.write(rrfile_path, reserved)
+                        self.remember_to_free(to_be_reserved)
+                        return ReservedResources(self, origin, to_be_reserved, modifiers)
+                except NoResourceExn:
+                    origin.log('Unable to reserve resources, too many currently reserved. Waiting until some are available again')
+            MainLoop.sleep(1)
 
     def free(self, origin, to_be_freed):
         log.ctx(origin)
diff --git a/src/osmo_gsm_tester/core/util.py b/src/osmo_gsm_tester/core/util.py
index e035a72..691b489 100644
--- a/src/osmo_gsm_tester/core/util.py
+++ b/src/osmo_gsm_tester/core/util.py
@@ -28,6 +28,8 @@
 import threading
 import importlib.util
 import subprocess
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
 
 # This mirrors enum osmo_auth_algo in libosmocore/include/osmocom/crypt/auth.h
 # so that the index within the tuple matches the enum value.
@@ -302,6 +304,51 @@
     def __repr__(self):
         return self.path
 
+class FileWatch(FileSystemEventHandler):
+    def __init__(self, origin, watch_path, event_func):
+        FileSystemEventHandler.__init__(self)
+        self.origin = origin
+        self.watch_path = watch_path
+        self.event_func = event_func
+        self.observer = Observer()
+        self.watch = None
+        self.mutex = threading.Lock()
+
+    def get_lock(self):
+        return self.mutex
+
+    def start(self):
+        dir = os.path.abspath(os.path.dirname(self.watch_path))
+        self.origin.dbg('FileWatch: scheduling watch for directory %s' % dir)
+        self.watch = self.observer.schedule(self, dir, recursive = False)
+        self.observer.start()
+
+    def stop(self):
+        if self.watch:
+            self.origin.dbg('FileWatch: unscheduling watch %r' % self.watch)
+            self.observer.unschedule(self.watch)
+            self.watch = None
+        if self.observer.is_alive():
+            self.observer.stop()
+            self.observer.join()
+
+    def __del__(self):
+        self.stop()
+        self.observer = None
+
+    # Override from FileSystemEventHandler
+    def on_any_event(self, event):
+        if event.is_directory:
+            return None
+        if os.path.abspath(event.src_path) != os.path.abspath(self.watch_path):
+            return None
+        self.origin.dbg('FileWatch: received event %r' % event)
+        try:
+            self.mutex.acquire()
+            self.event_func(event)
+        finally:
+             self.mutex.release()
+
 def touch_file(path):
     with open(path, 'a') as f:
         f.close()