blob: 16e79cf35863b09029c599a8b5a87dd3b984c22f [file] [log] [blame]
Pau Espin Pedrolc9817a52017-12-13 19:52:27 +01001# osmo_gsm_tester: base classes to share code among BTS subclasses.
2#
3# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Pau Espin Pedrol <pespin@sysmocom.de>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as
9# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import os
21import pprint
22import tempfile
23from abc import ABCMeta, abstractmethod
24from . import log, config, util, template, process, event_loop, pcu_osmo
25
26class OsmoBts(log.Origin, metaclass=ABCMeta):
27 suite_run = None
28 proc_bts = None
29 bsc = None
30 sgsn = None
31 lac = None
32 rac = None
33 cellid = None
34 bvci = None
35 _pcu = None
36
37##############
38# PROTECTED
39##############
40 def __init__(self, suite_run, conf, name):
41 super().__init__(log.C_RUN, name)
42 self.suite_run = suite_run
43 self.conf = conf
44 if len(self.pcu_socket_path().encode()) > 107:
45 raise log.Error('Path for pcu socket is longer than max allowed len for unix socket path (107):', self.pcu_socket_path())
46
47########################
48# PUBLIC - INTERNAL API
49########################
50 @abstractmethod
51 def conf_for_bsc(self):
52 'Used by bsc objects to get path to socket.'
53 pass
54
55 @abstractmethod
56 def pcu_socket_path(self):
57 'Used by pcu objects to get path to socket.'
58 pass
59
60 @abstractmethod
61 def create_pcu(self):
62 'Used by base class. Subclass can create different pcu implementations.'
63 pass
64
65 def remote_addr(self):
66 return self.conf.get('addr')
67
68 def ready_for_pcu(self):
69 if not self.proc_bts or not self.proc_bts.is_running:
70 return False
71 return 'BTS is up' in (self.proc_bts.get_stderr() or '')
72
73 def cleanup(self):
74 'Nothing to do by default. Subclass can override if required.'
75 pass
76
77###################
78# PUBLIC (test API included)
79###################
80 @abstractmethod
81 def start(self):
82 'Starts BTS proccess and sets self.proc_bts with an object of Process interface'
83 pass
84
85 def pcu(self):
86 if self._pcu is None:
87 self._pcu = self.create_pcu(self.suite_run, self, self.conf)
88 return self._pcu
89
90 def set_bsc(self, bsc):
91 self.bsc = bsc
92
93 def set_sgsn(self, sgsn):
94 self.sgsn = sgsn
95
96 def set_lac(self, lac):
97 self.lac = lac
98
99 def set_rac(self, rac):
100 self.rac = rac
101
102 def set_cellid(self, cellid):
103 self.cellid = cellid
104
105 def set_bvci(self, bvci):
106 self.bvci = bvci
107
108
109class OsmoBtsMainUnit(OsmoBts, metaclass=ABCMeta):
110##############
111# PROTECTED
112##############
113 pcu_sk_tmp_dir = None
114
115 def __init__(self, suite_run, conf, name):
116 super().__init__(suite_run, conf, name)
117
118########################
119# PUBLIC - INTERNAL API
120########################
121 @abstractmethod
122 def conf_for_bsc(self):
123 pass
124
125 def cleanup(self):
126 if self.pcu_sk_tmp_dir:
127 try:
128 os.remove(self.pcu_socket_path())
129 except OSError:
130 pass
131 os.rmdir(self.pcu_sk_tmp_dir)
132
133 def create_pcu(self):
134 return pcu_osmo.OsmoPcu(self.suite_run, self, self.conf)
135
136 def pcu_socket_path(self):
137 if self.pcu_sk_tmp_dir is None:
138 self.pcu_sk_tmp_dir = tempfile.mkdtemp('', 'ogtpcusk')
139 return os.path.join(self.pcu_sk_tmp_dir, 'pcu_bts')
140
141###################
142# PUBLIC (test API included)
143###################
144 @abstractmethod
145 def start(self):
146 pass