transceiver: get rid of the ctrl threads

There is no need to have n threads handle n ctrl sockets, since they all
will immediately respond to commands, so handle them from the existing
main osmo select loop.

Care must be taken to ensure that calls from within the command handler
do not block, or at least don't block too long, which currently is the
case.

Change-Id: I642a34451e1825eafecf71a902df916ccee7944c
diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp
index 36cfa7d..d9bda1d 100644
--- a/Transceiver52M/Transceiver.cpp
+++ b/Transceiver52M/Transceiver.cpp
@@ -45,6 +45,8 @@
 
 using namespace GSM;
 
+Transceiver *transceiver;
+
 #define USB_LATENCY_INTRVL		10,0
 
 /* Number of running values use in noise average */
@@ -151,20 +153,30 @@
     close(mClockSocket);
 
   for (size_t i = 0; i < mChans; i++) {
-    if (mControlServiceLoopThreads[i]) {
-      mControlServiceLoopThreads[i]->cancel();
-      mControlServiceLoopThreads[i]->join();
-      delete mControlServiceLoopThreads[i];
-    }
-
     mTxPriorityQueues[i].clear();
-    if (mCtrlSockets[i] >= 0)
-      close(mCtrlSockets[i]);
     if (mDataSockets[i] >= 0)
       close(mDataSockets[i]);
   }
 }
 
+int Transceiver::ctrl_sock_cb(struct osmo_fd *bfd, unsigned int flags)
+{
+  int rc = 0;
+  int chan = static_cast<int>(reinterpret_cast<uintptr_t>(bfd->data));
+
+  if (flags & OSMO_FD_READ)
+    rc = transceiver->ctrl_sock_handle_rx(chan);
+  if (rc < 0)
+    osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
+
+  if (flags & OSMO_FD_WRITE)
+    rc = transceiver->ctrl_sock_write(chan);
+  if (rc < 0)
+    osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
+
+  return rc;
+}
+
 /*
  * Initialize transceiver
  *
@@ -193,8 +205,7 @@
   mEdge = edge;
 
   mDataSockets.resize(mChans, -1);
-  mCtrlSockets.resize(mChans, -1);
-  mControlServiceLoopThreads.resize(mChans);
+  mCtrlSockets.resize(mChans);
   mTxPriorityQueueServiceLoopThreads.resize(mChans);
   mRxServiceLoopThreads.resize(mChans);
 
@@ -216,24 +227,34 @@
     return false;
 
   for (size_t i = 0; i < mChans; i++) {
+    int rv;
     c_srcport = mBasePort + 2 * i + 1;
     c_dstport = mBasePort + 2 * i + 101;
     d_srcport = mBasePort + 2 * i + 2;
     d_dstport = mBasePort + 2 * i + 102;
 
-    mCtrlSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
+    rv = osmo_sock_init2_ofd(&mCtrlSockets[i].conn_bfd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
                                       mLocalAddr.c_str(), c_srcport,
                                       mRemoteAddr.c_str(), c_dstport,
 				      OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
-    if (mCtrlSockets[i] < 0)
+    if (rv < 0)
       return false;
 
+    mCtrlSockets[i].conn_bfd.cb = ctrl_sock_cb;
+    mCtrlSockets[i].conn_bfd.data = reinterpret_cast<void*>(i);
+
+
     mDataSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
                                       mLocalAddr.c_str(), d_srcport,
                                       mRemoteAddr.c_str(), d_dstport,
 				      OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
     if (mDataSockets[i] < 0)
       return false;
+
+    if (i && filler == FILLER_DUMMY)
+      filler = FILLER_ZERO;
+
+    mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
   }
 
   /* Randomize the central clock */
@@ -243,21 +264,6 @@
   mLastClockUpdateTime = startTime;
   mLatencyUpdateTime = startTime;
 
-  /* Start control threads */
-  for (size_t i = 0; i < mChans; i++) {
-    TrxChanThParams *params = (TrxChanThParams *)malloc(sizeof(struct TrxChanThParams));
-    params->trx = this;
-    params->num = i;
-    mControlServiceLoopThreads[i] = new Thread(stackSize);
-    mControlServiceLoopThreads[i]->start((void * (*)(void*))
-                                 ControlServiceLoopAdapter, (void*) params);
-
-    if (i && filler == FILLER_DUMMY)
-      filler = FILLER_ZERO;
-
-    mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
-  }
-
   return true;
 }
 
@@ -719,8 +725,6 @@
 }
 
 
-#define MAX_PACKET_LENGTH 100
-
 /**
  * Matches a buffer with a command.
  * @param  buf    a buffer to look command in
@@ -750,27 +754,77 @@
   return true;
 }
 
-bool Transceiver::driveControl(size_t chan)
+void Transceiver::ctrl_sock_send(ctrl_msg& m, int chan)
 {
-  char buffer[MAX_PACKET_LENGTH + 1];
-  char response[MAX_PACKET_LENGTH + 1];
+  ctrl_sock_state& s = mCtrlSockets[chan];
+  struct osmo_fd *conn_bfd = &s.conn_bfd;
+
+  s.txmsgqueue.push_back(m);
+  conn_bfd->when |= OSMO_FD_WRITE;
+}
+
+int Transceiver::ctrl_sock_write(int chan)
+{
+  int rc;
+  ctrl_sock_state& s = mCtrlSockets[chan];
+
+  if (s.conn_bfd.fd < 0) {
+      return -EIO;
+  }
+
+  while (s.txmsgqueue.size()) {
+    const ctrl_msg m = s.txmsgqueue.front();
+
+    s.conn_bfd.when &= ~OSMO_FD_WRITE;
+
+    /* try to send it over the socket */
+    rc = write(s.conn_bfd.fd, m.data, strlen(m.data) + 1);
+    if (rc == 0)
+      goto close;
+    if (rc < 0) {
+      if (errno == EAGAIN) {
+        s.conn_bfd.when |= OSMO_FD_WRITE;
+        break;
+      }
+      goto close;
+    }
+
+      s.txmsgqueue.pop_front();
+  }
+  return 0;
+
+close:
+  LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << s.conn_bfd.fd << ") failed: " << rc;
+  return -1;
+}
+
+int Transceiver::ctrl_sock_handle_rx(int chan)
+{
+  ctrl_msg cmd_received;
+  ctrl_msg cmd_to_send;
+  char *buffer = cmd_received.data;
+  char *response = cmd_to_send.data;
   char *command, *params;
   int msgLen;
+  ctrl_sock_state& s = mCtrlSockets[chan];
 
   /* Attempt to read from control socket */
-  msgLen = read(mCtrlSockets[chan], buffer, MAX_PACKET_LENGTH);
+  msgLen = read(s.conn_bfd.fd, buffer, sizeof(cmd_received.data)-1);
+  if (msgLen < 0 && errno == EAGAIN)
+      return 0; /* Try again later */
   if (msgLen <= 0) {
-    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << mCtrlSockets[chan] << ") failed: " << msgLen;
-    return false;
+    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << s.conn_bfd.fd << ") failed: " << msgLen;
+    return -EIO;
   }
 
+
   /* Zero-terminate received string */
   buffer[msgLen] = '\0';
 
   /* Verify a command signature */
   if (strncmp(buffer, "CMD ", 4)) {
     LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface";
-    return false;
+    return -EIO;
   }
 
   /* Set command pointer */
@@ -889,7 +943,7 @@
     if ((timeslot < 0) || (timeslot > 7)) {
       LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface";
       sprintf(response,"RSP SETSLOT 1 %d %d",timeslot,corrCode);
-      return true;
+      return 0;
     }
     mStates[chan].chanType[timeslot] = (ChannelCombination) corrCode;
     setModulus(timeslot, chan);
@@ -921,12 +975,8 @@
   }
 
   LOGCHAN(chan, DTRXCTRL, INFO) << "response is '" << response << "'";
-  msgLen = write(mCtrlSockets[chan], response, strlen(response) + 1);
-  if (msgLen <= 0) {
-    LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << mCtrlSockets[chan] << ") failed: " << msgLen;
-    return false;
-  }
-  return true;
+  transceiver->ctrl_sock_send(cmd_to_send, chan);
+  return 0;
 }
 
 bool Transceiver::driveTxPriorityQueue(size_t chan)
@@ -1179,28 +1229,6 @@
   return NULL;
 }
 
-void *ControlServiceLoopAdapter(TrxChanThParams *params)
-{
-  char thread_name[16];
-  Transceiver *trx = params->trx;
-  size_t num = params->num;
-
-  free(params);
-
-  snprintf(thread_name, 16, "CtrlService%zu", num);
-  set_selfthread_name(thread_name);
-
-  while (1) {
-    if (!trx->driveControl(num)) {
-      LOGCHAN(num, DTRXCTRL, FATAL) << "Something went wrong in thread " << thread_name << ", requesting stop";
-      osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
-      break;
-    }
-    pthread_testcancel();
-  }
-  return NULL;
-}
-
 void *TxUpperLoopAdapter(TrxChanThParams *params)
 {
   char thread_name[16];