blob: 622a18f9637da5f9d33ee1e9ce326f0355c921f7 [file] [log] [blame]
Neels Hofmeyr3531a192017-03-28 14:30:28 +02001# osmo_gsm_tester: DBUS client to talk to ofono
2#
3# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Neels Hofmeyr <neels@hofmeyr.de>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as
9# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20from . import log
21
22from pydbus import SystemBus, Variant
23import time
24import pprint
25
26from gi.repository import GLib
27glib_main_loop = GLib.MainLoop()
28glib_main_ctx = glib_main_loop.get_context()
29bus = SystemBus()
30
31def poll():
32 global glib_main_ctx
33 while glib_main_ctx.pending():
34 glib_main_ctx.iteration()
35
36def get(path):
37 global bus
38 return bus.get('org.ofono', path)
39
40def list_modems():
41 root = get('/')
42 return sorted(root.GetModems())
43
44
45class Modem(log.Origin):
46 'convenience for ofono Modem interaction'
47 msisdn = None
48
49 def __init__(self, conf):
50 self.conf = conf
51 self.path = conf.get('path')
52 self.set_name(self.path)
53 self.set_log_category(log.C_BUS)
54 self._dbus_obj = None
55 self._interfaces_was = set()
56 poll()
57
58 def set_msisdn(self, msisdn):
59 self.msisdn = msisdn
60
61 def imsi(self):
62 return self.conf.get('imsi')
63
64 def ki(self):
65 return self.conf.get('ki')
66
67 def set_powered(self, on=True):
68 self.dbus_obj.SetProperty('Powered', Variant('b', on))
69
70 def dbus_obj(self):
71 if self._dbus_obj is not None:
72 return self._dbus_obj
73 self._dbus_obj = get(self.path)
74 self._dbus_obj.PropertyChanged.connect(self._on_property_change)
75 self._on_interfaces_change(self.properties().get('Interfaces'))
76
77 def properties(self):
78 return self.dbus_obj().GetProperties()
79
80 def _on_property_change(self, name, value):
81 if name == 'Interfaces':
82 self._on_interfaces_change(value)
83
84 def _on_interfaces_change(self, interfaces_now):
85 now = set(interfaces_now)
86 additions = now - self._interfaces_was
87 removals = self._interfaces_was - now
88 self._interfaces_was = now
89 for iface in removals:
90 with log.Origin('modem.disable(%s)' % iface):
91 try:
92 self._on_interface_disabled(iface)
93 except:
94 self.log_exn()
95 for iface in additions:
96 with log.Origin('modem.enable(%s)' % iface):
97 try:
98 self._on_interface_enabled(iface)
99 except:
100 self.log_exn()
101
102 def _on_interface_enabled(self, interface_name):
103 self.dbg('Interface enabled:', interface_name)
104 # todo: when the messages service comes up, connect a message reception signal
105
106 def _on_interface_disabled(self, interface_name):
107 self.dbg('Interface disabled:', interface_name)
108
109 def connect(self, nitb):
110 'set the modem up to connect to MCC+MNC from NITB config'
111 self.log('connect to', nitb)
112
113 def sms_send(self, msisdn):
114 self.log('send sms to MSISDN', msisdn)
115 return 'todo'
116
117# vim: expandtab tabstop=4 shiftwidth=4