Move all obj/ references in suite.py to testenv.py
Change-Id: If4ab39be7a97d33e82c5a34e2a10dfec38613a4e
diff --git a/selftest/suite_test/test_suite/hello_world.py b/selftest/suite_test/test_suite/hello_world.py
index 073d07f..a69f95a 100644
--- a/selftest/suite_test/test_suite/hello_world.py
+++ b/selftest/suite_test/test_suite/hello_world.py
@@ -1,5 +1,5 @@
from osmo_gsm_tester.testenv import *
print('hello world')
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
print('one\ntwo\nthree')
diff --git a/selftest/suite_test/test_suite/test_error.py b/selftest/suite_test/test_suite/test_error.py
index c0583ff..70d14a1 100755
--- a/selftest/suite_test/test_suite/test_error.py
+++ b/selftest/suite_test/test_suite/test_error.py
@@ -1,5 +1,5 @@
from osmo_gsm_tester.testenv import *
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
assert False
diff --git a/selftest/suite_test/test_suite/test_fail.py b/selftest/suite_test/test_suite/test_fail.py
index cbaeded..ffb7218 100755
--- a/selftest/suite_test/test_suite/test_fail.py
+++ b/selftest/suite_test/test_suite/test_fail.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from osmo_gsm_tester.testenv import *
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
test.set_fail('EpicFail', 'This failure is expected')
diff --git a/src/osmo_gsm_tester/core/test.py b/src/osmo_gsm_tester/core/test.py
index 93dbf6a..76c9ce9 100644
--- a/src/osmo_gsm_tester/core/test.py
+++ b/src/osmo_gsm_tester/core/test.py
@@ -21,9 +21,13 @@
import sys
import time
import traceback
-from .. import testenv
-from . import log, util, resource
+from . import log
+from . import util
+from . import resource
+from .event_loop import MainLoop
+
+from .. import testenv
class Test(log.Origin):
UNKNOWN = 'UNKNOWN' # matches junit 'error'
@@ -51,12 +55,13 @@
return self._run_dir
def run(self):
+ testenv_obj = None
try:
self.log_target = log.FileLogTarget(self.get_run_dir().new_child('log')).set_all_levels(log.L_DBG).style_change(trace=True)
log.large_separator(self.suite_run.trial.name(), self.suite_run.name(), self.name(), sublevel=3)
self.status = Test.UNKNOWN
self.start_timestamp = time.time()
- testenv.setup(self.suite_run, self)
+ testenv_obj = testenv.setup(self.suite_run, self)
with self.redirect_stdout():
util.run_python_file('%s.%s' % (self.suite_run.definition.name(), self.basename),
self.path)
@@ -83,6 +88,8 @@
self.err('TEST RUN ABORTED: %s' % type(e).__name__)
raise
finally:
+ if testenv_obj:
+ testenv_obj.stop()
if self.log_target:
self.log_target.remove()
diff --git a/src/osmo_gsm_tester/suite.py b/src/osmo_gsm_tester/suite.py
index 6952fd2..3416ff5 100644
--- a/src/osmo_gsm_tester/suite.py
+++ b/src/osmo_gsm_tester/suite.py
@@ -21,18 +21,12 @@
import sys
import time
import pprint
-from .core import config, log, util, process, schema, resource
+from .core import config
+from .core import log
+from .core import util
+from .core import schema
+from .core import resource
from .core import test
-from .core.event_loop import MainLoop
-from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
-from .obj import run_node
-from .obj import epc
-from .obj import enb
-from .obj import bts
-from .obj import ms
-
-class Timeout(Exception):
- pass
class SuiteDefinition(log.Origin):
'''A test suite reserves resources for a number of tests.
@@ -74,12 +68,9 @@
self.start_timestamp = None
self.duration = None
self.reserved_resources = None
- self.objects_to_clean_up = None
- self.test_import_modules_to_clean_up = []
self._resource_requirements = None
self._resource_modifiers = None
self._config = None
- self._processes = []
self._run_dir = None
self.trial = trial
self.definition = suite_definition
@@ -93,40 +84,6 @@
for test_basename in self.definition.test_basenames:
self.tests.append(test.Test(self, test_basename))
- def register_for_cleanup(self, *obj):
- assert all([hasattr(o, 'cleanup') for o in obj])
- self.objects_to_clean_up = self.objects_to_clean_up or []
- self.objects_to_clean_up.extend(obj)
-
- def objects_cleanup(self):
- while self.objects_to_clean_up:
- obj = self.objects_to_clean_up.pop()
- try:
- obj.cleanup()
- except Exception:
- log.log_exn()
-
- def test_import_modules_register_for_cleanup(self, mod):
- '''
- Tests are required to call this API for any module loaded from its own
- lib subdir, because they are loaded in the global namespace. Otherwise
- later tests importing modules with the same name will re-use an already
- loaded module.
- '''
- if mod not in self.test_import_modules_to_clean_up:
- self.dbg('registering module %r for cleanup' % mod)
- self.test_import_modules_to_clean_up.append(mod)
-
- def test_import_modules_cleanup(self):
- while self.test_import_modules_to_clean_up:
- mod = self.test_import_modules_to_clean_up.pop()
- try:
- self.dbg('Cleaning up module %r' % mod)
- del sys.modules[mod.__name__]
- del mod
- except Exception:
- log.log_exn()
-
def mark_start(self):
self.start_timestamp = time.time()
self.duration = 0
@@ -155,11 +112,6 @@
self._run_dir = util.Dir(self.trial.get_run_dir().new_dir(self.name()))
return self._run_dir
- def get_test_run_dir(self):
- if self.current_test:
- return self.current_test.get_run_dir()
- return self.get_run_dir()
-
def resource_requirements(self):
if self._resource_requirements is None:
self._resource_requirements = self.combined('resources')
@@ -175,19 +127,24 @@
self._config = self.combined('config', False)
return self._config
+ def resource_pool(self):
+ return self.resources_pool
+
def reserve_resources(self):
if self.reserved_resources:
raise RuntimeError('Attempt to reserve resources twice for a SuiteRun')
self.log('reserving resources in', self.resources_pool.state_dir, '...')
self.reserved_resources = self.resources_pool.reserve(self, self.resource_requirements(), self.resource_modifiers())
+ def get_reserved_resource(self, resource_class_str, specifics):
+ return self.reserved_resources.get(resource_class_str, specifics=specifics)
+
def run_tests(self, names=None):
suite_libdir = os.path.join(self.definition.suite_dir, 'lib')
try:
log.large_separator(self.trial.name(), self.name(), sublevel=2)
self.mark_start()
util.import_path_prepend(suite_libdir)
- MainLoop.register_poll_func(self.poll)
if not self.reserved_resources:
self.reserve_resources()
for t in self.tests:
@@ -196,9 +153,6 @@
continue
self.current_test = t
t.run()
- self.stop_processes()
- self.objects_cleanup()
- self.reserved_resources.put_all()
except Exception:
log.log_exn()
except BaseException as e:
@@ -206,14 +160,7 @@
self.err('SUITE RUN ABORTED: %s' % type(e).__name__)
raise
finally:
- # if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
- # base exception is raised. Make sure to stop processes in this
- # finally section. Resources are automatically freed with 'atexit'.
- self.stop_processes()
- self.objects_cleanup()
self.free_resources()
- MainLoop.unregister_poll_func(self.poll)
- self.test_import_modules_cleanup()
util.import_path_remove(suite_libdir)
self.duration = time.time() - self.start_timestamp
@@ -245,203 +192,11 @@
errors += 1
return (passed, skipped, failed, errors)
- def remember_to_stop(self, process, respawn=False):
- '''Ask suite to monitor and manage lifecycle of the Process object. If a
- process managed by suite finishes before cleanup time, the current test
- will be marked as FAIL and end immediatelly. If respwan=True, then suite
- will respawn() the process instead.'''
- self._processes.insert(0, (process, respawn))
-
- def stop_processes(self):
- if len(self._processes) == 0:
- return
- strategy = process.ParallelTerminationStrategy()
- while self._processes:
- proc, _ = self._processes.pop()
- strategy.add_process(proc)
- strategy.terminate_all()
-
- def stop_process(self, process):
- 'Remove process from monitored list and stop it'
- for proc_respawn in self._processes:
- proc, respawn = proc_respawn
- if proc == process:
- self._processes.remove(proc_respawn)
- proc.terminate()
-
def free_resources(self):
if self.reserved_resources is None:
return
self.reserved_resources.free()
- def ip_address(self, specifics=None):
- return self.reserved_resources.get(resource.R_IP_ADDRESS, specifics=specifics)
-
- def nitb(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return nitb_osmo.OsmoNitb(self, ip_address)
-
- def hlr(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return hlr_osmo.OsmoHlr(self, ip_address)
-
- def ggsn(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return ggsn_osmo.OsmoGgsn(self, ip_address)
-
- def sgsn(self, hlr, ggsn, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
-
- def mgcpgw(self, ip_address=None, bts_ip=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
-
- def mgw(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return mgw_osmo.OsmoMgw(self, ip_address)
-
- def msc(self, hlr, mgcpgw, stp, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
-
- def bsc(self, msc, mgw, stp, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
-
- def stp(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- return stp_osmo.OsmoStp(self, ip_address)
-
- def ms_driver(self):
- ms = ms_driver.MsDriver(self)
- self.register_for_cleanup(ms)
- return ms
-
- def bts(self, specifics=None):
- bts_obj = bts.Bts.get_instance_by_type(self, self.reserved_resources.get(resource.R_BTS, specifics=specifics))
- bts_obj.set_lac(self.lac())
- bts_obj.set_rac(self.rac())
- bts_obj.set_cellid(self.cellid())
- bts_obj.set_bvci(self.bvci())
- self.register_for_cleanup(bts_obj)
- return bts_obj
-
- def modem(self, specifics=None):
- conf = self.reserved_resources.get(resource.R_MODEM, specifics=specifics)
- ms_obj = ms.MS.get_instance_by_type(self, conf)
- self.register_for_cleanup(ms_obj)
- return ms_obj
-
- def modems(self, count):
- l = []
- for i in range(count):
- l.append(self.modem())
- return l
-
- def all_resources(self, resource_func):
- """Returns all yielded resource."""
- l = []
- while True:
- try:
- l.append(resource_func())
- except resource.NoResourceExn:
- return l
-
- def esme(self):
- esme_obj = esme.Esme(self.msisdn())
- self.register_for_cleanup(esme_obj)
- return esme_obj
-
- def run_node(self, specifics=None):
- return run_node.RunNode.from_conf(self.reserved_resources.get(resource.R_RUN_NODE, specifics=specifics))
-
- def enb(self, specifics=None):
- enb_obj = enb.eNodeB.get_instance_by_type(self, self.reserved_resources.get(resource.R_ENB, specifics=specifics))
- self.register_for_cleanup(enb_obj)
- return enb_obj
-
- def epc(self, run_node=None):
- if run_node is None:
- run_node = self.run_node()
- epc_obj = epc.EPC.get_instance_by_type(self, run_node)
- self.register_for_cleanup(epc_obj)
- return epc_obj
-
- def osmocon(self, specifics=None):
- conf = self.reserved_resources.get(resource.R_OSMOCON, specifics=specifics)
- osmocon_obj = osmocon.Osmocon(self, conf=conf)
- self.register_for_cleanup(osmocon_obj)
- return osmocon_obj
-
- def iperf3srv(self, ip_address=None):
- if ip_address is None:
- ip_address = self.ip_address()
- iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
- return iperf3srv_obj
-
- def msisdn(self):
- msisdn = self.resources_pool.next_msisdn(self)
- self.log('using MSISDN', msisdn)
- return msisdn
-
- def lac(self):
- lac = self.resources_pool.next_lac(self)
- self.log('using LAC', lac)
- return lac
-
- def rac(self):
- rac = self.resources_pool.next_rac(self)
- self.log('using RAC', rac)
- return rac
-
- def cellid(self):
- cellid = self.resources_pool.next_cellid(self)
- self.log('using CellId', cellid)
- return cellid
-
- def bvci(self):
- bvci = self.resources_pool.next_bvci(self)
- self.log('using BVCI', bvci)
- return bvci
-
- def poll(self):
- for proc, respawn in self._processes:
- if proc.terminated():
- if respawn == True:
- proc.respawn()
- else:
- proc.log_stdout_tail()
- proc.log_stderr_tail()
- log.ctx(proc)
- raise log.Error('Process ended prematurely: %s' % proc.name())
-
- def prompt(self, *msgs, **msg_details):
- 'ask for user interaction. Do not use in tests that should run automatically!'
- if msg_details:
- msgs = list(msgs)
- msgs.append('{%s}' %
- (', '.join(['%s=%r' % (k,v)
- for k,v in sorted(msg_details.items())])))
- msg = ' '.join(msgs) or 'Hit Enter to continue'
- self.log('prompt:', msg)
- sys.__stdout__.write('\n\n--- PROMPT ---\n')
- sys.__stdout__.write(msg)
- sys.__stdout__.write('\n')
- sys.__stdout__.flush()
- entered = util.input_polling('> ', MainLoop.poll)
- self.log('prompt entered:', repr(entered))
- return entered
-
def resource_status_str(self):
return '\n'.join(('',
'SUITE RUN: %s' % self.origin_id(),
diff --git a/src/osmo_gsm_tester/testenv.py b/src/osmo_gsm_tester/testenv.py
index 8c4743a..aa94f3c 100644
--- a/src/osmo_gsm_tester/testenv.py
+++ b/src/osmo_gsm_tester/testenv.py
@@ -20,6 +20,22 @@
# These will be initialized before each test run.
# A test script can thus establish its context by doing:
# from osmo_gsm_tester.testenv import *
+
+import sys
+
+from .core import process
+from .core import log as log_module
+from .core import process as process_module
+from .core import resource
+from .core.event_loop import MainLoop
+
+from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
+from .obj import run_node
+from .obj import epc
+from .obj import enb
+from .obj import bts
+from .obj import ms
+
trial = None
suite = None
test = None
@@ -32,22 +48,284 @@
sleep = None
poll = None
prompt = None
-Timeout = None
Sms = None
process = None
+class Timeout(Exception):
+ pass
+
+class TestEnv(log_module.Origin):
+ def __init__(self, suite_run, test):
+ super().__init__(log_module.C_TST, test.name())
+ self.suite_run = suite_run
+ self._test = test
+ self.trial = suite_run.trial # backward compat with objects
+ self.resources_pool = suite_run.resource_pool() # backward compat with objects
+ self._processes = []
+ self.test_import_modules_to_clean_up = []
+ self.objects_to_clean_up = None
+ MainLoop.register_poll_func(self.poll)
+
+ def suite(self):
+ return self.suite_run
+
+ # backward compat with objects
+ def get_test_run_dir(self):
+ return self._test.get_run_dir()
+
+ # backward compat with objects
+ def config(self):
+ return self.suite_run.config()
+
+ def remember_to_stop(self, process, respawn=False):
+ '''Ask suite to monitor and manage lifecycle of the Process object. If a
+ process managed by suite finishes before cleanup time, the current test
+ will be marked as FAIL and end immediatelly. If respwan=True, then suite
+ will respawn() the process instead.'''
+ self._processes.insert(0, (process, respawn))
+
+ def stop_processes(self):
+ if len(self._processes) == 0:
+ return
+ strategy = process_module.ParallelTerminationStrategy()
+ while self._processes:
+ proc, _ = self._processes.pop()
+ strategy.add_process(proc)
+ strategy.terminate_all()
+
+ def stop_process(self, process):
+ 'Remove process from monitored list and stop it'
+ for proc_respawn in self._processes:
+ proc, respawn = proc_respawn
+ if proc == process:
+ self._processes.remove(proc_respawn)
+ proc.terminate()
+
+ def register_for_cleanup(self, *obj):
+ assert all([hasattr(o, 'cleanup') for o in obj])
+ self.objects_to_clean_up = self.objects_to_clean_up or []
+ self.objects_to_clean_up.extend(obj)
+
+ def objects_cleanup(self):
+ while self.objects_to_clean_up:
+ obj = self.objects_to_clean_up.pop()
+ try:
+ obj.cleanup()
+ except Exception:
+ log_module.log_exn()
+
+ def test_import_modules_register_for_cleanup(self, mod):
+ '''
+ Tests are required to call this API for any module loaded from its own
+ lib subdir, because they are loaded in the global namespace. Otherwise
+ later tests importing modules with the same name will re-use an already
+ loaded module.
+ '''
+ if mod not in self.test_import_modules_to_clean_up:
+ self.dbg('registering module %r for cleanup' % mod)
+ self.test_import_modules_to_clean_up.append(mod)
+
+ def test_import_modules_cleanup(self):
+ while self.test_import_modules_to_clean_up:
+ mod = self.test_import_modules_to_clean_up.pop()
+ try:
+ self.dbg('Cleaning up module %r' % mod)
+ del sys.modules[mod.__name__]
+ del mod
+ except Exception:
+ log_module.log_exn()
+
+ def poll(self):
+ for proc, respawn in self._processes:
+ if proc.terminated():
+ if respawn == True:
+ proc.respawn()
+ else:
+ proc.log_stdout_tail()
+ proc.log_stderr_tail()
+ log_module.ctx(proc)
+ raise log_module.Error('Process ended prematurely: %s' % proc.name())
+
+ def stop(self):
+ # if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
+ # base exception is raised. Make sure to stop processes in this
+ # finally section. Resources are automatically freed with 'atexit'.
+ self.stop_processes()
+ self.objects_cleanup()
+ self.suite_run.reserved_resources.put_all()
+ MainLoop.unregister_poll_func(self.poll)
+ self.test_import_modules_cleanup()
+
+ def prompt(self, *msgs, **msg_details):
+ 'ask for user interaction. Do not use in tests that should run automatically!'
+ if msg_details:
+ msgs = list(msgs)
+ msgs.append('{%s}' %
+ (', '.join(['%s=%r' % (k,v)
+ for k,v in sorted(msg_details.items())])))
+ msg = ' '.join(msgs) or 'Hit Enter to continue'
+ self.log('prompt:', msg)
+ sys.__stdout__.write('\n\n--- PROMPT ---\n')
+ sys.__stdout__.write(msg)
+ sys.__stdout__.write('\n')
+ sys.__stdout__.flush()
+ entered = util.input_polling('> ', MainLoop.poll)
+ self.log('prompt entered:', repr(entered))
+ return entered
+
+ def get_reserved_resource(self, resource_class_str, specifics=None):
+ return self.suite_run.get_reserved_resource(resource_class_str, specifics)
+
+ def ip_address(self, specifics=None):
+ return self.get_reserved_resource(resource.R_IP_ADDRESS, specifics)
+
+ def nitb(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return nitb_osmo.OsmoNitb(self, ip_address)
+
+ def hlr(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return hlr_osmo.OsmoHlr(self, ip_address)
+
+ def ggsn(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return ggsn_osmo.OsmoGgsn(self, ip_address)
+
+ def sgsn(self, hlr, ggsn, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
+
+ def mgcpgw(self, ip_address=None, bts_ip=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
+
+ def mgw(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return mgw_osmo.OsmoMgw(self, ip_address)
+
+ def msc(self, hlr, mgcpgw, stp, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
+
+ def bsc(self, msc, mgw, stp, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
+
+ def stp(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ return stp_osmo.OsmoStp(self, ip_address)
+
+ def ms_driver(self):
+ ms = ms_driver.MsDriver(self)
+ self.register_for_cleanup(ms)
+ return ms
+
+ def bts(self, specifics=None):
+ bts_obj = bts.Bts.get_instance_by_type(self, self.get_reserved_resource(resource.R_BTS, specifics=specifics))
+ bts_obj.set_lac(self.lac())
+ bts_obj.set_rac(self.rac())
+ bts_obj.set_cellid(self.cellid())
+ bts_obj.set_bvci(self.bvci())
+ self.register_for_cleanup(bts_obj)
+ return bts_obj
+
+ def modem(self, specifics=None):
+ conf = self.get_reserved_resource(resource.R_MODEM, specifics=specifics)
+ ms_obj = ms.MS.get_instance_by_type(self, conf)
+ self.register_for_cleanup(ms_obj)
+ return ms_obj
+
+ def modems(self, count):
+ l = []
+ for i in range(count):
+ l.append(self.modem())
+ return l
+
+ def all_resources(self, resource_func):
+ """Returns all yielded resource."""
+ l = []
+ while True:
+ try:
+ l.append(resource_func())
+ except resource.NoResourceExn:
+ return l
+
+ def esme(self):
+ esme_obj = esme.Esme(self.msisdn())
+ self.register_for_cleanup(esme_obj)
+ return esme_obj
+
+ def run_node(self, specifics=None):
+ return run_node.RunNode.from_conf(self.get_reserved_resource(resource.R_RUN_NODE, specifics=specifics))
+
+ def enb(self, specifics=None):
+ enb_obj = enb.eNodeB.get_instance_by_type(self, self.get_reserved_resource(resource.R_ENB, specifics=specifics))
+ self.register_for_cleanup(enb_obj)
+ return enb_obj
+
+ def epc(self, run_node=None):
+ if run_node is None:
+ run_node = self.run_node()
+ epc_obj = epc.EPC.get_instance_by_type(self, run_node)
+ self.register_for_cleanup(epc_obj)
+ return epc_obj
+
+ def osmocon(self, specifics=None):
+ conf = self.get_reserved_resource(resource.R_OSMOCON, specifics=specifics)
+ osmocon_obj = osmocon.Osmocon(self, conf=conf)
+ self.register_for_cleanup(osmocon_obj)
+ return osmocon_obj
+
+ def iperf3srv(self, ip_address=None):
+ if ip_address is None:
+ ip_address = self.ip_address()
+ iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
+ return iperf3srv_obj
+
+ def msisdn(self):
+ msisdn = self.suite_run.resource_pool().next_msisdn(self)
+ self.log('using MSISDN', msisdn)
+ return msisdn
+
+ def lac(self):
+ lac = self.suite_run.resource_pool().next_lac(self)
+ self.log('using LAC', lac)
+ return lac
+
+ def rac(self):
+ rac = self.suite_run.resource_pool().next_rac(self)
+ self.log('using RAC', rac)
+ return rac
+
+ def cellid(self):
+ cellid = self.suite_run.resource_pool().next_cellid(self)
+ self.log('using CellId', cellid)
+ return cellid
+
+ def bvci(self):
+ bvci = self.suite_run.resource_pool().next_bvci(self)
+ self.log('using BVCI', bvci)
+ return bvci
+
+
def setup(suite_run, _test):
- from .core import process as process_module
from .core.event_loop import MainLoop
from .obj.sms import Sms as Sms_class
- from . import suite as suite_module
- global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Timeout, Sms, process
+ global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Sms, process
trial = suite_run.trial
- suite = suite_run
test = _test
- resources = suite_run.reserved_resources
+ resources = suite_run.reserved_resources # TODO: remove this global, only used in selftest
log = test.log
dbg = test.dbg
err = test.err
@@ -55,9 +333,10 @@
wait_no_raise = lambda *args, **kwargs: MainLoop.wait_no_raise(suite_run, *args, **kwargs)
sleep = lambda *args, **kwargs: MainLoop.sleep(suite_run, *args, **kwargs)
poll = MainLoop.poll
- prompt = suite_run.prompt
- Timeout = suite_module.Timeout
Sms = Sms_class
process = process_module
+ suite = TestEnv(suite_run, _test) # stored in "suite" for backward compatibility
+ prompt = suite.prompt
+ return suite
# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/trial.py b/src/osmo_gsm_tester/trial.py
index fb94a59..08b7d8f 100644
--- a/src/osmo_gsm_tester/trial.py
+++ b/src/osmo_gsm_tester/trial.py
@@ -22,7 +22,10 @@
import shutil
import tarfile
-from .core import log, util, report
+from .core import log
+from .core import util
+from .core import report
+
from . import suite
FILE_MARK_TAKEN = 'taken'