blob: 3a4d630e8c661f0c9ec10160055be1644e5c3100 [file] [log] [blame]
Neels Hofmeyr726b58d2017-10-15 03:01:09 +02001#!/usr/bin/env python3
2#
3# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
4# All rights reserved.
5#
6# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21'''
22Run VTY test transcripts against a given application.
23
24A VTY transcript contains VTY commands and their expected results.
25It looks like:
26
27"
28OsmoHLR> enable
29
30OsmoHLR# subscriber show imsi 123456789023000
31% No subscriber for imsi = '123456789023000'
32OsmoHLR# subscriber show msisdn 12345
33% No subscriber for msisdn = '12345'
34
35OsmoHLR# subscriber create imsi 123456789023000
36% Created subscriber 123456789023000
37 ID: 1
38 IMSI: 123456789023000
39 MSISDN: none
40 No auth data
41"
42
43The application to be tested is described by
44- a binary to run,
45- command line arguments to pass to the binary,
46- the VTY telnet port,
47- the application name as printed in the VTY prompt.
48
49This module can either be run directly to run or update a given VTY transcript,
50or it can be imported as a module to run more complex setups.
51'''
52
53import re
54
55from osmopy.osmo_verify_transcript_common import *
56
57class InteractVty(Interact):
58
59 class VtyStep(Interact.StepBase):
60 expect_node = None # e.g. '(config-net)'
61 expect_prompt_char = None # '>' or '#'
62
63 def __init__(self, prompt):
64 super().__init__()
65 self.prompt = prompt
66
67 def verify_interact_state(self, interact_instance):
68 if interact_instance.update:
69 return
70 if interact_instance.this_node != self.expect_node:
71 raise Exception('Mismatch: expected VTY node %r in the prompt, got %r'
72 % (self.expect_node, interact_instance.this_node))
73 if interact_instance.this_prompt_char != self.expect_prompt_char:
74 raise Exception('Mismatch: expected VTY prompt character %r, got %r'
75 % (self.expect_prompt_char, interact_instance.this_prompt_char))
76
77 @staticmethod
78 def is_next_step(line, interact_instance):
79 m = interact_instance.re_prompt.match(line)
80 if not m:
81 return None
82 next_step = InteractVty.VtyStep(interact_instance.prompt)
83 next_step.expect_node = m.group(1)
84 next_step.expect_prompt_char = m.group(2)
85 next_step.command = m.group(3)
86 return next_step
87
88 def command_str(self, interact_instance=None):
89 if interact_instance is None:
90 node = self.expect_node
91 prompt_char = self.expect_prompt_char
92 else:
93 node = interact_instance.last_node
94 prompt_char = interact_instance.last_prompt_char
95 if node:
96 node = '(%s)' % node
97 node = node or ''
98 return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command)
99
100 def __init__(self, prompt, port, host, verbose, update):
101 self.prompt = prompt
102 self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
103 super().__init__(InteractVty.VtyStep, port, host, verbose, update)
104
105 def connect(self):
106 self.this_node = None
107 self.this_prompt_char = '>' # slight cheat for initial prompt char
108 self.last_node = None
109 self.last_prompt_char = None
110
111 super().connect()
112 # receive the first welcome message and discard
113 self.socket.recv(4096)
114
115 def _command(self, command_str, timeout=10):
116 self.socket.send(command_str.encode())
117
118 waited_since = time.time()
119 received_lines = []
120 last_line = ''
121
122 while True:
123 new_data = self.socket.recv(4096).decode('utf-8')
124
125 last_line = "%s%s" % (last_line, new_data)
126
127 if last_line:
128 lines = last_line.splitlines()
129 received_lines.extend(lines[:-1])
130 last_line = lines[-1]
131
132 match = self.re_prompt.match(last_line)
133 if not match:
134 if time.time() - waited_since > timeout:
135 raise IOError("Failed to read data (did the app crash?)")
136 time.sleep(.1)
137 continue
138
139 self.last_node = self.this_node
140 self.last_prompt_char = self.this_prompt_char
141 self.this_node = match.group(1) or None
142 self.this_prompt_char = match.group(2)
143 break
144
145 # expecting to have received the command we sent as echo, remove it
146 clean_command_str = command_str.strip()
147 if clean_command_str.endswith('?'):
148 clean_command_str = clean_command_str[:-1]
149 if received_lines and received_lines[0] == clean_command_str:
150 received_lines = received_lines[1:]
151 return received_lines
152
153 def command(self, command_str, timeout=10):
154 command_str = command_str or '\r'
155 if command_str[-1] not in '?\r\t':
156 command_str = command_str + '\r'
157
158 received_lines = self._command(command_str, timeout)
159
160 if command_str[-1] == '?':
161 self._command('\x03', timeout)
162
163 return received_lines
164
165if __name__ == '__main__':
166 parser = common_parser()
167 parser.add_argument('-n', '--prompt-name', dest='prompt',
168 help="Name used in application's telnet VTY prompt.")
169 args = parser.parse_args()
170
171 interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update)
172
173 main(command_str=args.command_str,
174 transcript_files=args.transcript_files,
175 interact=interact,
176 verbose=args.verbose)
177
178# vim: tabstop=4 shiftwidth=4 expandtab nocin ai