virtual: Separate starting virtphy/mobile from the test
Move the starting code out of the Update Location "test". In the mid
term we can have a SMS test run in addition to waiting the Update
Location tests.
A mass-test testcase will have a life-cycle of:
* Creation
* Configure (number of subscribers, probably all subs)
* Pre-Start trigger (same as configure so it can be omitted)
* Post-Start (all processes run)
* Query if the test has completed
The next step is an actual implementation to send SMS.
Change-Id: Ie15f5123775d11dd44243b2741d047ed93f318f9
diff --git a/src/osmo_gsm_tester/ms_driver.py b/src/osmo_gsm_tester/ms_driver.py
index 3cfcad6..329662a 100644
--- a/src/osmo_gsm_tester/ms_driver.py
+++ b/src/osmo_gsm_tester/ms_driver.py
@@ -20,7 +20,7 @@
from osmo_ms_driver.cdf import cdfs
from osmo_ms_driver.event_server import EventServer
from osmo_ms_driver.simple_loop import SimpleLoop
-from osmo_ms_driver.location_update_test import MassUpdateLocationTest
+from osmo_ms_driver.location_update_test import MassUpdateLocationTest, MobileTestStarter
from osmo_ms_driver.starter import BinaryOptions
import os.path
@@ -92,14 +92,18 @@
self._ev_server = EventServer("ev_server", event_server_path)
self._ev_server.listen(self._loop)
+ self._results = {}
options = self.build_binary_options()
- self._test_case = MassUpdateLocationTest("mass", options, self._cdf,
- self._ev_server,
- util.Dir(self.event_server_sk_tmp_dir),
- suite_run=self._suite_run)
+ self._starter = MobileTestStarter("mass", options, self._cdf,
+ self._ev_server,
+ util.Dir(self.event_server_sk_tmp_dir),
+ self._results, suite_run=self._suite_run)
+ self._test_case = MassUpdateLocationTest("mass", self._ev_server, self._results)
for sub in self._subscribers:
- self._test_case.subscriber_add(sub)
+ self._starter.subscriber_add(sub)
+
+ self._test_case.configure(len(self._subscribers))
self._configured = True
def run_test(self):
@@ -110,7 +114,8 @@
"""
if not self._configured:
self.configure()
- self._test_case.run_test(self._loop, self._test_duration)
+ deadline = self._starter.start_all(self._loop, self._test_duration)
+ self._test_case.wait_for_test(self._loop, deadline)
def print_stats(self):
"""
diff --git a/src/osmo_ms_driver/__main__.py b/src/osmo_ms_driver/__main__.py
index d753897..642002f 100644
--- a/src/osmo_ms_driver/__main__.py
+++ b/src/osmo_ms_driver/__main__.py
@@ -18,7 +18,7 @@
# Local modules
from .event_server import EventServer
from .simple_loop import SimpleLoop
-from .location_update_test import MassUpdateLocationTest
+from .location_update_test import MassUpdateLocationTest, MobileTestStarter
from .cdf import cdfs
from .starter import BinaryOptions
from .test_support import imsi_ki_gen
@@ -86,7 +86,9 @@
# Just a single test for now.
options = BinaryOptions("virtphy", "mobile", os.environ)
- test = MassUpdateLocationTest("lu_test", options, cdf, ev_server, tmp_dir)
+ result = {}
+ starter = MobileTestStarter("lu_test", options, cdf, ev_server, tmp_dir, result)
+ test = MassUpdateLocationTest("lu_test", ev_server, result)
# Add subscribers to the test.
imsi_gen = imsi_ki_gen()
@@ -97,12 +99,14 @@
'ki': ki,
'auth_algo': 'comp128v1',
}
- test.subscriber_add(ms_osmo_mobile.MSOsmoMobile("ms_%d" % i, conf))
+ starter.subscriber_add(ms_osmo_mobile.MSOsmoMobile("ms_%d" % i, conf))
+ test.configure(args.num_ms)
- atexit.register(test.stop_all)
+ atexit.register(starter.stop_all)
# Run until everything has been launched
- test.run_test(loop, timedelta(seconds=args.test_duration))
+ deadline = starter.start_all(loop, timedelta(seconds=args.test_duration))
+ test.wait_for_test(loop, deadline)
# Print stats
test.print_stats()
diff --git a/src/osmo_ms_driver/location_update_test.py b/src/osmo_ms_driver/location_update_test.py
index 29abf73..efb161e 100644
--- a/src/osmo_ms_driver/location_update_test.py
+++ b/src/osmo_ms_driver/location_update_test.py
@@ -24,6 +24,7 @@
from datetime import timedelta
import collections
+import json
import time
# Key used for the result dictionary
@@ -58,6 +59,85 @@
"min_latency", "max_latency"])
class MassUpdateLocationTest(log.Origin):
+ def __init__(self, name, event_server, results):
+ super().__init__(log.C_RUN, name)
+ self._event_server = event_server
+ self._event_server.register(self.handle_msg)
+ self._results = results
+
+ def configure(self, num_subscribers):
+ self._num_subscribers = num_subscribers
+ self._outstanding = num_subscribers
+
+ def handle_msg(self, _data, addr, time):
+ data = json.loads(_data.decode())
+
+ if data['type'] == 'event':
+ if data['data']['lu_done'] == 1:
+ ms = self._results[data['ms']]
+ if not has_lu_time(ms):
+ self._outstanding = self._outstanding - 1
+ set_lu_time(ms, time)
+ self.log("MS performed LU ", ms=ms, at=time, lu_delay=lu_delay(ms))
+
+ def all_completed(self):
+ return self._outstanding == 0
+
+ def wait_for_test(self, loop, deadline):
+ """Waits up to the absolute deadline for the test to complete."""
+ while not self.all_completed():
+ now_time = time.clock_gettime(time.CLOCK_MONOTONIC)
+ sleep_time = deadline - now_time
+ if sleep_time < 0:
+ break
+ loop.schedule_timeout(sleep_time)
+ loop.select()
+
+ def find_min_max(self, results):
+ min_value = max_value = None
+ for result in results:
+ if min_value is None or lu_delay(result) < min_value:
+ min_value = lu_delay(result)
+ if max_value is None or lu_delay(result) > max_value:
+ max_value = lu_delay(result)
+ return min_value, max_value
+
+ def get_result_values(self):
+ """
+ Returns the raw result values of the test run in any order.
+ """
+ return self._results.values()
+
+ def get_stats(self):
+ """
+ Returns a statistical summary of the test.
+ """
+ attempted = self._num_subscribers
+ completed = attempted - self._outstanding
+ min_latency, max_latency = self.find_min_max(filter(lambda x: has_lu_time(x), self._results.values()))
+ return LUStats(attempted, completed, min_latency, max_latency)
+
+ def print_stats(self):
+ stats = self.get_stats()
+ all_completed = stats.num_attempted == stats.num_completed
+
+ self.log("Tests done", all_completed=all_completed,
+ min=stats.min_latency, max=stats.max_latency)
+
+ def lus_less_than(self, acceptable_delay):
+ """
+ Returns LUs that completed within the acceptable delay.
+ """
+ res = []
+ for result in self._results.values():
+ if not has_lu_time(result):
+ continue
+ if timedelta(seconds=lu_delay(result)) >= acceptable_delay:
+ continue
+ res.append(result)
+ return res
+
+class MobileTestStarter(log.Origin):
"""
A test to launch a configurable amount of MS and make them
execute a Location Updating Procedure.
@@ -70,22 +150,23 @@
TEMPLATE_CFG = "osmo-mobile.cfg"
def __init__(self, name, options, cdf_function,
- event_server, tmp_dir, suite_run=None):
+ event_server, tmp_dir, results, suite_run=None):
super().__init__(log.C_RUN, name)
self._binary_options = options
self._cdf = cdf_function
self._suite_run = suite_run
self._tmp_dir = tmp_dir
+ self._event_server = event_server
+ self._results = results
self._unstarted = []
self._mobiles = []
self._phys = []
- self._results = {}
- self._event_server = event_server
- self._event_server.register(self.handle_msg)
self._started = []
self._subscribers = []
+ self._event_server.register(self.handle_msg)
+
def subscriber_add(self, subscriber):
"""
Adds a subscriber to the list of subscribers.
@@ -179,28 +260,31 @@
return current_time + step_size, sleep_time
- def run_test(self, loop, test_duration):
+ def start_all(self, loop, test_duration):
+ """
+ Starts all processes according to the schedule set by the CDF.
+ """
self.prepare(loop)
- to_complete_time = self._start_time + test_duration.total_seconds()
+ self._to_complete_time = self._start_time + test_duration.total_seconds()
tick_time = self._start_time
- while not self.all_completed():
+ while len(self._unstarted) > 0:
tick_time, sleep_time = self.step_once(loop, tick_time)
now_time = time.clock_gettime(time.CLOCK_MONOTONIC)
if sleep_time is None:
- sleep_time = to_complete_time - now_time
+ sleep_time = self._to_complete_time - now_time
if sleep_time < 0:
break
loop.schedule_timeout(sleep_time)
loop.select()
+ return self._to_complete_time
def stop_all(self):
for launcher in self._started:
launcher.terminate()
def handle_msg(self, _data, addr, time):
- import json
data = json.loads(_data.decode())
if data['type'] == 'register':
@@ -208,61 +292,3 @@
ms.set_start_time(time)
launch_delay = ms.start_time() - ms.launch_time()
self.log("MS start registered ", ms=ms, at=time, delay=launch_delay)
- elif data['type'] == 'event':
- if data['data']['lu_done'] == 1:
- ms = self._results[data['ms']]
- if not has_lu_time(ms):
- self._outstanding = self._outstanding - 1
- set_lu_time(ms, time)
- self.log("MS performed LU ", ms=ms, at=time, lu_delay=lu_delay(ms))
- else:
- print(time, data)
- raise Exception("Unknown event type..:" + _data.decode())
-
-
- def all_completed(self):
- return self._outstanding == 0
-
- def find_min_max(self, results):
- min_value = max_value = None
- for result in results:
- if min_value is None or lu_delay(result) < min_value:
- min_value = lu_delay(result)
- if max_value is None or lu_delay(result) > max_value:
- max_value = lu_delay(result)
- return min_value, max_value
-
- def get_result_values(self):
- """
- Returns the raw result values of the test run in any order.
- """
- return self._results.values()
-
- def get_stats(self):
- """
- Returns a statistical summary of the test.
- """
- attempted = len(self._subscribers)
- completed = attempted - self._outstanding
- min_latency, max_latency = self.find_min_max(filter(lambda x: has_lu_time(x), self._results.values()))
- return LUStats(attempted, completed, min_latency, max_latency)
-
- def print_stats(self):
- stats = self.get_stats()
- all_completed = stats.num_attempted == stats.num_completed
-
- self.log("Tests done", all_completed=all_completed,
- min=stats.min_latency, max=stats.max_latency)
-
- def lus_less_than(self, acceptable_delay):
- """
- Returns LUs that completed within the acceptable delay.
- """
- res = []
- for result in self._results.values():
- if not has_lu_time(result):
- continue
- if timedelta(seconds=lu_delay(result)) >= acceptable_delay:
- continue
- res.append(result)
- return res