implement modem and nitb bits for a real test
Change-Id: I1ca3bd874aed1265d83a46340e9b3e469075c7ee
diff --git a/selftest/real_suite/suites/sms/mo_mt_sms.py b/selftest/real_suite/suites/sms/mo_mt_sms.py
index 05be48c..b97d332 100755
--- a/selftest/real_suite/suites/sms/mo_mt_sms.py
+++ b/selftest/real_suite/suites/sms/mo_mt_sms.py
@@ -8,19 +8,18 @@
ms_mt = suite.modem()
print('start nitb and bts...')
-nitb.add_bts(bts)
+nitb.bts_add(bts)
nitb.start()
sleep(.1)
assert nitb.running()
bts.start()
-nitb.add_subscriber(ms_mo)
-nitb.add_subscriber(ms_mt)
+nitb.subscriber_add(ms_mo)
+nitb.subscriber_add(ms_mt)
ms_mo.connect(nitb)
ms_mt.connect(nitb)
wait(nitb.subscriber_attached, ms_mo, ms_mt)
sms = ms_mo.sms_send(ms_mt.msisdn)
-sleep(3)
-wait(nitb.sms_received, sms)
+wait(ms_mt.sms_received, sms)
diff --git a/selftest/sms_test.py b/selftest/sms_test.py
new file mode 100755
index 0000000..25fb551
--- /dev/null
+++ b/selftest/sms_test.py
@@ -0,0 +1,20 @@
+#!/usr/bin/env python3
+
+import _prep
+from osmo_gsm_tester import ofono_client
+
+print(ofono_client.Sms())
+print(ofono_client.Sms())
+print(ofono_client.Sms())
+sms = ofono_client.Sms('123', '456')
+print(str(sms))
+
+sms2 = ofono_client.Sms('123', '456')
+print(str(sms2))
+assert sms != sms2
+
+sms2.msg = str(sms.msg)
+print(str(sms2))
+assert sms == sms2
+
+# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/ofono_client.py b/src/osmo_gsm_tester/ofono_client.py
index dcca4e2..5f91781 100644
--- a/src/osmo_gsm_tester/ofono_client.py
+++ b/src/osmo_gsm_tester/ofono_client.py
@@ -17,7 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from . import log
+from . import log, test
from pydbus import SystemBus, Variant
import time
@@ -28,6 +28,9 @@
glib_main_ctx = glib_main_loop.get_context()
bus = SystemBus()
+I_NETREG = 'org.ofono.NetworkRegistration'
+I_SMS = 'org.ofono.MessageManager'
+
def poll():
global glib_main_ctx
while glib_main_ctx.pending():
@@ -52,8 +55,8 @@
self.set_name(self.path)
self.set_log_category(log.C_BUS)
self._dbus_obj = None
- self._interfaces_was = set()
- poll()
+ self._interfaces = set()
+ test.poll()
def set_msisdn(self, msisdn):
self.msisdn = msisdn
@@ -65,7 +68,9 @@
return self.conf.get('ki')
def set_powered(self, on=True):
+ test.poll()
self.dbus_obj.SetProperty('Powered', Variant('b', on))
+ test.poll()
def dbus_obj(self):
if self._dbus_obj is not None:
@@ -83,9 +88,9 @@
def _on_interfaces_change(self, interfaces_now):
now = set(interfaces_now)
- additions = now - self._interfaces_was
- removals = self._interfaces_was - now
- self._interfaces_was = now
+ additions = now - self._interfaces
+ removals = self._interfaces - now
+ self._interfaces = now
for iface in removals:
with log.Origin('modem.disable(%s)' % iface):
try:
@@ -101,19 +106,62 @@
def _on_interface_enabled(self, interface_name):
self.dbg('Interface enabled:', interface_name)
- # todo: when the messages service comes up, connect a message reception signal
+ if interface_name == I_SMS:
+ self._dbus_obj[I_SMS].IncomingMessage.connect(self._on_incoming_message)
def _on_interface_disabled(self, interface_name):
self.dbg('Interface disabled:', interface_name)
+ def has_interface(self, name):
+ return name in self._interfaces
+
def connect(self, nitb):
'set the modem up to connect to MCC+MNC from NITB config'
self.log('connect to', nitb)
- self.err('Modem.connect() is still a fake and does not do anything')
+ self.set_powered()
+ if not self.has_interface(I_NETREG):
+ self.log('No %r interface, hoping that the modem connects by itself' % I_NETREG)
+ else:
+ self.log('Use of %r interface not implemented yet, hoping that the modem connects by itself' % I_NETREG)
- def sms_send(self, msisdn):
- self.log('send sms to MSISDN', msisdn)
- self.err('Modem.sms_send() is still a fake and does not do anything')
- return 'todo'
+ def sms_send(self, to_msisdn):
+ if hasattr(to_msisdn, 'msisdn'):
+ to_msisdn = to_msisdn.msisdn
+ self.log('sending sms to MSISDN', to_msisdn)
+ if not self.has_interface(I_SMS):
+ raise RuntimeError('Modem cannot send SMS, interface not active: %r' % I_SMS)
+ sms = Sms(self.msisdn(), to_msisdn)
+ mm = self.dbus_obj[I_SMS]
+ mm.SendMessage(to_msisdn, str(sms))
+ return sms
+
+ def _on_incoming_message(self, message, info):
+ self.log('Incoming SMS:', repr(message), **info)
+
+ def sms_received(self, sms):
+ pass
+
+class Sms:
+ _last_sms_idx = 0
+ msg = None
+
+ def __init__(self, from_msisdn=None, to_msisdn=None):
+ Sms._last_sms_idx += 1
+ msgs = ['message nr. %d' % Sms._last_sms_idx]
+ if from_msisdn or to_msisdn:
+ msgs.append(' sent')
+ if from_msisdn:
+ msgs.append(' from %s' % from_msisdn)
+ if to_msisdn:
+ msgs.append(' to %s' % to_msisdn)
+ self.msg = ''.join(msgs)
+
+ def __str__(self):
+ return self.msg
+
+ def __eq__(self, other):
+ if isinstance(other, Sms):
+ return self.msg == other.msg
+ return inself.msg == other
# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/osmo_nitb.py b/src/osmo_gsm_tester/osmo_nitb.py
index 027897d..8fce371 100644
--- a/src/osmo_gsm_tester/osmo_nitb.py
+++ b/src/osmo_gsm_tester/osmo_nitb.py
@@ -82,28 +82,28 @@
def addr(self):
return self.nitb_iface.get('addr')
- def add_bts(self, bts):
+ def bts_add(self, bts):
self.bts.append(bts)
bts.set_nitb(self)
- def add_subscriber(self, modem, msisdn=None):
+ def subscriber_add(self, modem, msisdn=None):
if msisdn is None:
msisdn = self.suite_run.resources_pool.next_msisdn(modem)
modem.set_msisdn(msisdn)
self.log('Add subscriber', msisdn=msisdn, imsi=modem.imsi())
with self:
- OsmoNitbCtrl(self).add_subscriber(modem.imsi(), msisdn, modem.ki())
+ OsmoNitbCtrl(self).subscriber_add(modem.imsi(), msisdn, modem.ki())
def subscriber_attached(self, *modems):
- return all([self.imsi_attached(m.imsi()) for m in modems])
+ return self.imsi_attached([m.imsi() for m in modems])
- def imsi_attached(self, imsi):
- self.err('OsmoNitb.imsi_attached() is still fake and does not do anything')
- return random.choice((True, False))
+ def imsi_attached(self, *imsis):
+ attached = self.imsi_list_attached()
+ return all([imsi in attached for imsi in imsis])
- def sms_received(self, sms):
- self.err('OsmoNitb.sms_received() is still fake and does not do anything')
- return random.choice((True, False))
+ def imsi_list_attached(self):
+ with self:
+ OsmoNitbCtrl(self).subscriber_list_active()
def running(self):
return not self.process.terminated()
@@ -123,7 +123,7 @@
def ctrl(self):
return osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT)
- def add_subscriber(self, imsi, msisdn, ki=None, algo=None):
+ def subscriber_add(self, imsi, msisdn, ki=None, algo=None):
created = False
if ki and not algo:
algo = 'comp128v1'
@@ -149,7 +149,8 @@
aslist_str = ""
with osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT) as ctrl:
self.ctrl.do_get(OsmoNitbCtrl.SUBSCR_LIST_ACTIVE_VAR)
- # this looks like it doesn't work for long data. It's legacy code from the old osmo-gsm-tester.
+ # This is legacy code from the old osmo-gsm-tester.
+ # looks like this doesn't work for long data.
data = self.ctrl.receive()
while (len(data) > 0):
(answer, data) = self.ctrl.remove_ipa_ctrl_header(data)