blob: 3f8abf7a010635b402671b58cbce51cae5d28680 [file] [log] [blame]
Neels Hofmeyr106865a2020-11-27 08:19:42 +01001# osmo_gsm_tester: VTY connection
2#
3# Copyright (C) 2020 by sysmocom - s.f.m.c. GmbH
4#
5# Author: Neels Hofmeyr <neels@hofmeyr.de>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as
9# published by the Free Software Foundation, either version 3 of the
10# License, or (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import socket
21import struct
22import re
23import time
24import sys
25
26from ..core import log
27from ..core.event_loop import MainLoop
28
29class VtyInterfaceExn(Exception):
30 pass
31
32class OsmoVty(log.Origin):
33 '''Suggested usage:
34 with OsmoVty(...) as vty:
35 vty.cmds('enable', 'configure network', 'net')
36 response = vty.cmd('foo 1 2 3')
37 print('\n'.join(response))
38
39 Using 'with' ensures that the connection is closed again.
40 There should not be nested 'with' statements on this object.
41 '''
42
43##############
44# PROTECTED
45##############
46
47 def __init__(self, host, port, prompt=None):
48 super().__init__(log.C_BUS, 'Vty', host=host, port=port)
49 self.host = host
50 self.port = port
51 self.sck = None
52 self.prompt = prompt
53 self.re_prompt = None
54 self.this_node = None
55 self.this_prompt_char = None
56 self.last_node = None
57 self.last_prompt_char = None
58
59 def try_connect(self):
60 '''Do a connection attempt, return True when successful, False otherwise.
61 Does not raise exceptions, but logs them to the debug log.'''
62 assert self.sck is None
63 try:
64 self.dbg('Connecting')
65 sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
66 try:
67 sck.connect((self.host, self.port))
68 except:
69 sck.close()
70 raise
71 # set self.sck only after the connect was successful
72 self.sck = sck
73 return True
74 except:
75 self.dbg('Failed to connect', sys.exc_info()[0])
76 return False
77
78 def _command(self, command_str, timeout=10, strict=True):
79 '''Send a command and return the response.'''
80 # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
81 self.dbg('Sending', command_str=command_str)
82 self.sck.send(command_str.encode())
83
84 waited_since = time.time()
85 received_lines = []
86 last_line = ''
87
88 # (not using MainLoop.wait() to accumulate received responses across
89 # iterations)
90 while True:
91 new_data = self.sck.recv(4096).decode('utf-8')
92
93 last_line = "%s%s" % (last_line, new_data)
94
95 if last_line:
96 # Separate the received response into lines.
97 # But note: the VTY logging currently separates with '\n\r', not '\r\n',
98 # see _vty_output() in libosmocore logging_vty.c.
99 # So we need to jump through hoops to not separate 'abc\n\rdef' as
100 # [ 'abc', '', 'def' ]; but also not to convert '\r\n\r\n' to '\r\n\n' ('\r{\r\n}\n')
101 # Simplest is to just drop all the '\r' and only care about the '\n'.
102 last_line = last_line.replace('\r', '')
103 lines = last_line.splitlines()
104 if last_line.endswith('\n'):
105 received_lines.extend(lines)
106 last_line = ""
107 else:
108 # if pkt buffer ends in the middle of a line, we need to keep
109 # last non-finished line:
110 received_lines.extend(lines[:-1])
111 last_line = lines[-1]
112
113 match = self.re_prompt.match(last_line)
114 if not match:
115 if time.time() - waited_since > timeout:
116 raise IOError("Failed to read data (did the app crash?)")
117 MainLoop.sleep(.1)
118 continue
119
120 self.last_node = self.this_node
121 self.last_prompt_char = self.this_prompt_char
122 self.this_node = match.group(1) or None
123 self.this_prompt_char = match.group(2)
124 break
125
126 # expecting to have received the command we sent as echo, remove it
127 clean_command_str = command_str.strip()
128 if clean_command_str.endswith('?'):
129 clean_command_str = clean_command_str[:-1]
130 if received_lines and received_lines[0] == clean_command_str:
131 received_lines = received_lines[1:]
132 if len(received_lines) > 1:
133 self.dbg('Received\n|', '\n| '.join(received_lines), '\n')
134 elif len(received_lines) == 1:
135 self.dbg('Received', repr(received_lines[0]))
136
137 if received_lines == ['% Unknown command.']:
138 errmsg = 'VTY reports unknown command: %r' % command_str
139 if strict:
140 raise VtyInterfaceExn(errmsg)
141 else:
142 self.log('ignoring error:', errmsg)
143
144 return received_lines
145
146########################
147# PUBLIC - INTERNAL API
148########################
149
150 def connect(self, timeout=30):
151 '''Connect to the VTY self.host and self.port, retry for 'timeout' seconds.
152 connect() and disconnect() are called implicitly when using the 'with' statement.
153 See class OsmoVty's doc.
154 '''
155 MainLoop.wait(self.try_connect, timestep=3, timeout=timeout)
156 self.sck.setblocking(1)
157
158 # read first prompt
159 # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
160 self.this_node = None
161 self.this_prompt_char = '>' # slight cheat for initial prompt char
162 self.last_node = None
163 self.last_prompt_char = None
164
165 data = self.sck.recv(4096)
166 if not self.prompt:
167 b = data
168 b = b[b.rfind(b'\n') + 1:]
169 while b and (b[0] < ord('A') or b[0] > ord('z')):
170 b = b[1:]
171 prompt_str = b.decode('utf-8')
172 if '>' in prompt_str:
173 self.prompt = prompt_str[:prompt_str.find('>')]
174 self.dbg(prompt=self.prompt)
175 if not self.prompt:
176 raise VtyInterfaceExn('Could not find application name; needed to decode prompts.'
177 ' Initial data was: %r' % data)
178 self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
179
180 def disconnect(self):
181 '''Disconnect.
182 connect() and disconnect() are called implicitly when using the 'with' statement.
183 See class OsmoVty's doc.
184 '''
185 if self.sck is None:
186 return
187 self.dbg('Disconnecting')
188 self.sck.close()
189 self.sck = None
190
191###################
192# PUBLIC (test API included)
193###################
194
195 def cmd(self, command_str, timeout=10, strict=True):
196 '''Send one VTY command and return its response.
197 Return a list of strings, one string per line, without line break characters:
198 [ 'first line', 'second line', 'third line' ]
199 When strict==False, do not raise exceptions on '% Unknown command'.
200 If the connection is not yet open, briefly connect for only this command and disconnect again. If it is open,
201 use the open connection and leave it open.
202 '''
203 # allow calling for both already connected VTY as well as establishing
204 # a connection just for this command.
205 if self.sck is None:
206 with self:
207 return self.cmd(command_str, timeout, strict)
208
209 # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
210 command_str = command_str or '\r'
211 if command_str[-1] not in '?\r\t':
212 command_str = command_str + '\r'
213
214 received_lines = self._command(command_str, timeout, strict)
215
216 # send escape to cancel the '?' command line
217 if command_str[-1] == '?':
218 self._command('\x03', timeout)
219
220 return received_lines
221
222 def cmds(self, *cmds, timeout=10, strict=True):
223 '''Send a series of commands and return each command's response:
224 cmds('foo', 'bar', 'baz') --> [ ['foo line 1','foo line 2'], ['bar line 1'], ['baz line 1']]
225 When strict==False, do not raise exceptions on '% Unknown command'.
226 If the connection is not yet open, briefly connect for only these commands and disconnect again. If it is
227 open, use the open connection and leave it open.
228 '''
229 # allow calling for both already connected VTY as well as establishing
230 # a connection just for this command.
231 if self.sck is None:
232 with self:
233 return self.cmds(*cmds, timeout=timeout, strict=strict)
234
235 responses = []
236 for cmd in cmds:
237 responses.append(self.cmd(cmd, timeout, strict))
238 return responses
239
240 def __enter__(self):
241 self.connect()
242 return self
243
244 def __exit__(self, *exc_info):
245 self.disconnect()
246
247# vim: expandtab tabstop=4 shiftwidth=4