| #!/usr/bin/python3 |
| # -*- mode: python-mode; py-indent-tabs-mode: nil -*- |
| """ |
| /* |
| * Copyright (C) 2019 sysmocom s.f.m.c. GmbH |
| * |
| * All Rights Reserved |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 3 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License along |
| * with this program; if not, write to the Free Software Foundation, Inc., |
| * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| */ |
| """ |
| |
| __version__ = "0.0.2" # bump this on every non-trivial change |
| |
| from functools import partial |
| import configparser, argparse, time, os, asyncio, aiohttp |
| from osmopy.trap_helper import make_params, gen_hash, log_init, comm_proc |
| from osmopy.osmo_ipa import Ctrl |
| |
| |
| def log_bsc_time(l, rq, task, ts, bsc, msg, *args, **kwargs): |
| """ |
| Logging contextual wrapper. |
| FIXME: remove task parameter once we bump requirements to Python 3.7+ |
| """ |
| # FIXME: following function is deprecated and will be removed in Python 3.9 |
| # Use the asyncio.all_tasks() function instead when available (Python 3.7+). |
| num_tasks = len(task.all_tasks()) |
| num_req = len(rq) |
| delta = time.perf_counter() - ts |
| if delta < 1: |
| l('[%d/%d] BSC %s: ' + msg, num_req, num_tasks, bsc, *args, **kwargs) |
| else: |
| l('[%d/%d] BSC %s, %.2f sec: ' + msg, num_req, num_tasks, bsc, time.perf_counter() - ts, *args, **kwargs) |
| |
| def check_h_val(ctrl, h, v, t, exp): |
| """ |
| Check for header inconsistencies. |
| """ |
| if v != exp: |
| ctrl.log.error('Unexpected %s value %x (instead of %x) in |%s| header', t, v, exp, h.hex()) |
| |
| def get_ctrl_len(ctrl, header): |
| """ |
| Obtain expected message length. |
| """ |
| (dlen, p, e, _) = ctrl.del_header(header) |
| check_h_val(ctrl, header, p, "protocol", ctrl.PROTO['OSMO']) |
| check_h_val(ctrl, header, e, "extension", ctrl.EXT['CTRL']) |
| return dlen - 1 |
| |
| |
| class Proxy(Ctrl): |
| """ |
| Wrapper class to implement per-type message dispatch and keep BSC <-> http Task mapping. |
| N. B: keep async/await semantics out of it. |
| """ |
| def __init__(self, log): |
| super().__init__() |
| self.req = {} |
| self.log = log |
| self.conf = configparser.ConfigParser(interpolation = None) |
| self.conf.read(self.config_file) |
| self.timeout = self.conf['main'].getint('timeout', 30) |
| self.location = self.conf['main'].get('location') |
| self.ctrl_addr = self.conf['main'].get('addr_ctrl', 'localhost') |
| self.ctrl_port = self.conf['main'].getint('port_ctrl', 4250) |
| self.concurrency = self.conf['main'].getint('num_max_conn', 5) |
| # FIXME: use timeout parameter when available (aiohttp version 3.3) as follows |
| #self.http_client = aiohttp.ClientSession(connector = aiohttp.TCPConnector(limit = self.concurrency), timeout = self.timeout) |
| self.http_client = aiohttp.ClientSession(connector = aiohttp.TCPConnector(limit = self.concurrency)) |
| |
| def dispatch(self, w, data): |
| """ |
| Basic dispatcher: the expected entry point for CTRL messages. |
| """ |
| (cmd, _, v) = data.decode('utf-8').split(' ', 2) |
| method = getattr(self, cmd, lambda *_: self.log.info('CTRL %s is unhandled by dispatch: ignored.', cmd)) |
| method(w, v.split()) |
| |
| def ERROR(self, _, k): |
| """ |
| Handle CTRL ERROR messages. |
| """ |
| self.log_ignore('ERROR', k) |
| |
| def SET_REPLY(self, _, k): |
| """ |
| Handle CTRL SET_REPLY messages. |
| """ |
| self.log_ignore('SET_REPLY', k) |
| |
| def TRAP(self, w, k): |
| """ |
| Handle incoming TRAPs. |
| """ |
| p = k[0].split('.') |
| if p[-1] == 'location-state': |
| self.handle_locationstate(w, p[1], p[3], p[5], k[1]) |
| else: |
| self.log_ignore('TRAP', k[0]) |
| |
| def handle_locationstate(self, w, net, bsc, bts, data): |
| """ |
| Handle location-state TRAP: parse trap content, build HTTP request and setup async handlers. |
| """ |
| ts = time.perf_counter() |
| self.cleanup_task(bsc) |
| params = make_params(bsc, data) |
| params['h'] = gen_hash(params, self.conf['main'].get('secret_key')) |
| # FIXME: use asyncio.create_task() when available (Python 3.7+). |
| t = asyncio.ensure_future(self.http_client.post(self.location, data = params)) |
| log_bsc_time(self.log.info, self.req, t, ts, bsc, 'location-state@%s => %s', params['time_stamp'], data) |
| t.add_done_callback(partial(self.reply_callback, w, bsc, ts)) |
| self.req[bsc] = (t, ts) |
| log_bsc_time(self.log.info, self.req, t, ts, bsc, 'request added (net %s, BTS %s)', net, bts) |
| |
| def cleanup_task(self, bsc): |
| """ |
| It's ok to cancel() task which is done() |
| but if either of the checks above fires it means that Proxy() is in inconsistent state |
| which should never happen as long as we keep async/await semantics out of it. |
| """ |
| if bsc in self.req: |
| (task, ts) = self.req[bsc] |
| log_bsc = partial(log_bsc_time, self.log.error, self.req, task, ts, bsc) |
| if task.done(): |
| log_bsc('task is done but not removed') |
| if task.cancelled(): |
| log_bsc('task is cancelled without update') |
| task.cancel() |
| |
| def log_ignore(self, kind, m): |
| """ |
| Log ignored CTRL message. |
| """ |
| self.log.error('Ignoring CTRL %s: %s', kind, ' '.join(m) if type(m) is list else m) |
| |
| def reply_callback(self, w, bsc, ts, task): |
| """ |
| Process per-BSC response status and prepare async handler if necessary. |
| We don't have to delete cancel()ed task from self.req explicitly because it will be replaced by new one in handle_locationstate() |
| """ |
| log_bsc = partial(log_bsc_time, self.log.info, self.req, task, ts, bsc) |
| if task.cancelled(): |
| log_bsc('request cancelled') |
| else: |
| exp = task.exception() |
| if exp: |
| log_bsc('exception %s triggered', repr(exp)) |
| else: |
| resp = task.result() |
| if resp.status != 200: |
| log_bsc('unexpected HTTP response %d', resp.status) |
| else: |
| log_bsc('request completed') |
| # FIXME: use asyncio.create_task() when available (Python 3.7+). |
| asyncio.ensure_future(recv_response(self.log, w, bsc, resp.json())) |
| del self.req[bsc] |
| |
| |
| async def recv_response(log, w, bsc, resp): |
| """ |
| Process json response asynchronously. |
| """ |
| js = await resp |
| if js.get('error'): |
| log.info('BSC %s response error: %s', bsc, repr(js.get('error'))) |
| else: |
| comm_proc(js.get('commands'), bsc, w.write, log) |
| await w.drain() # Trigger Writer's flow control |
| |
| async def recon_reader(proxy, reader, num_bytes): |
| """ |
| Read requested amount of bytes, reconnect if necessary. |
| """ |
| try: |
| return await reader.readexactly(num_bytes) |
| except asyncio.IncompleteReadError: |
| proxy.log.info('Failed to read %d bytes reconnecting to %s:%d...', num_bytes, proxy.ctrl_addr, proxy.ctrl_port) |
| raise |
| |
| async def ctrl_client(proxy, rd, wr): |
| """ |
| Read CTRL stream and handle selected messages. |
| """ |
| while True: |
| header = await recon_reader(proxy, rd, 4) |
| data = await recon_reader(proxy, rd, get_ctrl_len(proxy, header)) |
| proxy.dispatch(wr, data) |
| |
| async def conn_client(proxy): |
| """ |
| (Re)establish connection with CTRL server and pass Reader/Writer to CTRL handler. |
| """ |
| while True: |
| try: |
| reader, writer = await asyncio.open_connection(proxy.ctrl_addr, proxy.ctrl_port) |
| proxy.log.info('Connected to %s:%d', proxy.ctrl_addr, proxy.ctrl_port) |
| await ctrl_client(proxy, reader, writer) |
| except OSError as e: |
| proxy.log.info('%s: %d seconds delayed retrying...', e, proxy.timeout) |
| await asyncio.sleep(proxy.timeout) |
| except asyncio.IncompleteReadError: |
| pass |
| proxy.log.info('Reconnecting...') |
| |
| |
| if __name__ == '__main__': |
| a = argparse.ArgumentParser(description = 'Proxy between given GCI service and Osmocom CTRL protocol.') |
| a.add_argument('-v', '--version', action = 'version', version = ("%(prog)s v" + __version__)) |
| a.add_argument('-d', '--debug', action = 'store_true', help = "Enable debug log") |
| a.add_argument('-c', '--config-file', required = True, help = "Path to mandatory config file (in INI format).") |
| |
| P = Proxy(log_init('TRAP2CGI', a.parse_args(namespace=Proxy).debug)) |
| |
| P.log.info('CGI proxy v%s starting with PID %d:', __version__, os.getpid()) |
| P.log.info('Destination %s (concurrency %d)', P.location, P.concurrency) |
| P.log.info('Connecting to TRAP source %s:%d...', P.ctrl_addr, P.ctrl_port) |
| |
| loop = asyncio.get_event_loop() |
| loop.run_until_complete(conn_client(P)) |
| # FIXME: use loop.run() function instead when available (Python 3.7+). |