apps/trx: separate CTRLInterface implementation
diff --git a/apps/trx/ctrl_if.py b/apps/trx/ctrl_if.py
index 58960c5..0129d7f 100644
--- a/apps/trx/ctrl_if.py
+++ b/apps/trx/ctrl_if.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# GR-GSM based transceiver
-# Transceiver UDP interface
+# CTRL interface implementation
#
# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
#
@@ -22,25 +22,18 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import socket
-import select
-
from udp_link import UDPLink
class CTRLInterface(UDPLink):
- def __init__(self, remote_addr, remote_port, bind_port, radio_if):
- print("[i] Init TRX CTRL interface")
- UDPLink.__init__(self, remote_addr, remote_port, bind_port)
- self.tb = radio_if
-
- def shutdown(self):
- print("[i] Shutdown TRX CTRL interface")
- UDPLink.shutdown(self)
-
def handle_rx(self, data):
if self.verify_req(data):
request = self.prepare_req(data)
- self.parse_cmd(request)
+ rc = self.parse_cmd(request)
+
+ if type(rc) is tuple:
+ self.send_response(request, rc[0], rc[1])
+ else:
+ self.send_response(request, rc)
else:
print("[!] Wrong data on CTRL interface")
@@ -72,61 +65,18 @@
return True
- def parse_cmd(self, request):
- response_code = "0"
-
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- print("[i] Recv POWERON cmd")
- if not self.tb.trx_started:
- if self.tb.check_available():
- print("[i] Starting transceiver...")
- self.tb.trx_started = True
- self.tb.start()
- else:
- print("[!] Transceiver isn't ready to start")
- response_code = "-1"
- else:
- print("[!] Transceiver already started!")
- response_code = "-1"
- elif self.verify_cmd(request, "POWEROFF", 0):
- print("[i] Recv POWEROFF cmd")
- print("[i] Stopping transceiver...")
- self.tb.trx_started = False
- # TODO: flush all buffers between blocks
- self.tb.stop()
- elif self.verify_cmd(request, "SETRXGAIN", 1):
- print("[i] Recv SETRXGAIN cmd")
- # TODO: check gain value
- gain = int(request[1])
- self.tb.set_gain(gain)
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- print("[i] Recv RXTUNE cmd")
- # TODO: check freq range
- freq = int(request[1]) * 1000
- self.tb.set_fc(freq)
- elif self.verify_cmd(request, "TXTUNE", 1):
- print("[i] Recv TXTUNE cmd")
- # TODO: is not implemented yet
-
- # Misc
- elif self.verify_cmd(request, "ECHO", 0):
- print("[i] Recv ECHO cmd")
-
- # Wrong / unknown command
- else:
- print("[!] Wrong request on CTRL interface")
- response_code = "-1"
-
- # Anyway, we need to respond
- self.send_response(request, response_code)
-
- def send_response(self, request, response_code):
+ def send_response(self, request, response_code, params = None):
# Include status code, for example ["TXTUNE", "0", "941600"]
- request.insert(1, response_code)
+ request.insert(1, str(response_code))
+
+ # Optionally append command specific parameters
+ if params is not None:
+ request += params
+
# Add the response signature, and join back to string
response = "RSP " + " ".join(request) + "\0"
# Now we have something like "RSP TXTUNE 0 941600"
self.send(response)
+
+ def parse_cmd(self, request):
+ raise NotImplementedError