Transceiver52M: Implement POWEROFF command

Add stop and restart capability through the POWEROFF and POWERON
commands. Calling stop causes receive streaming to cease, and I/O
threads to shutdown leaving only the control handling thread running.
Upon receiving a POWERON command, I/O threads and device streaming are
restarted.

Proper shutdown of the transceiver is now initiated by the destructor,
which calls the stop command internally to wind down and deallocate
threads.

Signed-off-by: Tom Tsou <tom@tsou.cc>
diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp
index f8b34b3..4885654 100644
--- a/Transceiver52M/Transceiver.cpp
+++ b/Transceiver52M/Transceiver.cpp
@@ -84,42 +84,46 @@
 }
 
 Transceiver::Transceiver(int wBasePort,
-			 const char *TRXAddress,
+			 const char *wTRXAddress,
 			 size_t wSPS, size_t wChans,
 			 GSM::Time wTransmitLatency,
 			 RadioInterface *wRadioInterface)
-  : mBasePort(wBasePort), mAddr(TRXAddress),
-    mTransmitLatency(wTransmitLatency), mClockSocket(NULL),
-    mRadioInterface(wRadioInterface), mSPSTx(wSPS), mSPSRx(1), mChans(wChans),
-    mOn(false), mTxFreq(0.0), mRxFreq(0.0), mMaxExpectedDelay(0)
+  : mBasePort(wBasePort), mAddr(wTRXAddress),
+    mClockSocket(wBasePort, wTRXAddress, mBasePort + 100),
+    mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface),
+    mSPSTx(wSPS), mSPSRx(1), mChans(wChans), mOn(false),
+    mTxFreq(0.0), mRxFreq(0.0), mMaxExpectedDelay(0)
 {
-  GSM::Time startTime(random() % gHyperframe,0);
-
-  mRxLowerLoopThread = new Thread(32768);
-  mTxLowerLoopThread = new Thread(32768);
-
-  mTransmitDeadlineClock = startTime;
-  mLastClockUpdateTime = startTime;
-  mLatencyUpdateTime = startTime;
-  mRadioInterface->getClock()->set(startTime);
-
   txFullScale = mRadioInterface->fullScaleInputValue();
   rxFullScale = mRadioInterface->fullScaleOutputValue();
 }
 
 Transceiver::~Transceiver()
 {
+  stop();
+
   sigProcLibDestroy();
 
-  delete mClockSocket;
-
   for (size_t i = 0; i < mChans; i++) {
+    mControlServiceLoopThreads[i]->cancel();
+    mControlServiceLoopThreads[i]->join();
+    delete mControlServiceLoopThreads[i];
+
     mTxPriorityQueues[i].clear();
     delete mCtrlSockets[i];
     delete mDataSockets[i];
   }
 }
 
+/*
+ * Initialize transceiver
+ *
+ * Start or restart the control loop. Any further control is handled through the
+ * socket API. Randomize the central radio clock set the downlink burst
+ * counters. Note that the clock will not update until the radio starts, but we
+ * are still expected to report clock indications through control channel
+ * activity.
+ */
 bool Transceiver::init(bool filler)
 {
   int d_srcport, d_dstport, c_srcport, c_dstport;
@@ -150,8 +154,7 @@
   if (filler)
     mStates[0].mRetrans = true;
 
-  mClockSocket = new UDPSocket(mBasePort, mAddr.c_str(), mBasePort + 100);
-
+  /* Setup sockets */
   for (size_t i = 0; i < mChans; i++) {
     c_srcport = mBasePort + 2 * i + 1;
     c_dstport = mBasePort + 2 * i + 101;
@@ -162,10 +165,19 @@
     mDataSockets[i] = new UDPSocket(d_srcport, mAddr.c_str(), d_dstport);
   }
 
+  /* Randomize the central clock */
+  GSM::Time startTime(random() % gHyperframe, 0);
+  mRadioInterface->getClock()->set(startTime);
+  mTransmitDeadlineClock = startTime;
+  mLastClockUpdateTime = startTime;
+  mLatencyUpdateTime = startTime;
+
+  /* Start control threads */
   for (size_t i = 0; i < mChans; i++) {
+    TransceiverChannel *chan = new TransceiverChannel(this, i);
     mControlServiceLoopThreads[i] = new Thread(32768);
-    mTxPriorityQueueServiceLoopThreads[i] = new Thread(32768);
-    mRxServiceLoopThreads[i] = new Thread(32768);
+    mControlServiceLoopThreads[i]->start((void * (*)(void*))
+                                 ControlServiceLoopAdapter, (void*) chan);
 
     for (size_t n = 0; n < 8; n++) {
       burst = modulateBurst(gDummyBurst, 8 + (n % 4 == 0), mSPSTx);
@@ -178,6 +190,106 @@
   return true;
 }
 
+/*
+ * Start the transceiver
+ *
+ * Submit command(s) to the radio device to commence streaming samples and
+ * launch threads to handle sample I/O. Re-synchronize the transmit burst
+ * counters to the central radio clock here as well.
+ */
+bool Transceiver::start()
+{
+  ScopedLock lock(mLock);
+
+  if (mOn) {
+    LOG(ERR) << "Transceiver already running";
+    return false;
+  }
+
+  LOG(NOTICE) << "Starting the transceiver";
+
+  GSM::Time time = mRadioInterface->getClock()->get();
+  mTransmitDeadlineClock = time;
+  mLastClockUpdateTime = time;
+  mLatencyUpdateTime = time;
+
+  if (!mRadioInterface->start()) {
+    LOG(ALERT) << "Device failed to start";
+    return false;
+  }
+
+  /* Device is running - launch I/O threads */
+  mRxLowerLoopThread = new Thread(32768);
+  mTxLowerLoopThread = new Thread(32768);
+  mTxLowerLoopThread->start((void * (*)(void*))
+                            TxLowerLoopAdapter,(void*) this);
+  mRxLowerLoopThread->start((void * (*)(void*))
+                            RxLowerLoopAdapter,(void*) this);
+
+  /* Launch uplink and downlink burst processing threads */
+  for (size_t i = 0; i < mChans; i++) {
+    TransceiverChannel *chan = new TransceiverChannel(this, i);
+    mRxServiceLoopThreads[i] = new Thread(32768);
+    mRxServiceLoopThreads[i]->start((void * (*)(void*))
+                            RxUpperLoopAdapter, (void*) chan);
+
+    chan = new TransceiverChannel(this, i);
+    mTxPriorityQueueServiceLoopThreads[i] = new Thread(32768);
+    mTxPriorityQueueServiceLoopThreads[i]->start((void * (*)(void*))
+                            TxUpperLoopAdapter, (void*) chan);
+  }
+
+  writeClockInterface();
+  mOn = true;
+  return true;
+}
+
+/*
+ * Stop the transceiver
+ *
+ * Perform stopping by disabling receive streaming and issuing cancellation
+ * requests to running threads. Most threads will timeout and terminate once
+ * device is disabled, but the transmit loop may block waiting on the central
+ * UMTS clock. Explicitly signal the clock to make sure that the transmit loop
+ * makes it to the thread cancellation point.
+ */
+void Transceiver::stop()
+{
+  ScopedLock lock(mLock);
+
+  if (!mOn)
+    return;
+
+  LOG(NOTICE) << "Stopping the transceiver";
+  mTxLowerLoopThread->cancel();
+  mRxLowerLoopThread->cancel();
+
+  for (size_t i = 0; i < mChans; i++) {
+    mRxServiceLoopThreads[i]->cancel();
+    mTxPriorityQueueServiceLoopThreads[i]->cancel();
+  }
+
+  LOG(INFO) << "Stopping the device";
+  mRadioInterface->stop();
+
+  for (size_t i = 0; i < mChans; i++) {
+    mRxServiceLoopThreads[i]->join();
+    mTxPriorityQueueServiceLoopThreads[i]->join();
+    delete mRxServiceLoopThreads[i];
+    delete mTxPriorityQueueServiceLoopThreads[i];
+
+    mTxPriorityQueues[i].clear();
+  }
+
+  mTxLowerLoopThread->join();
+  mRxLowerLoopThread->join();
+  delete mTxLowerLoopThread;
+  delete mRxLowerLoopThread;
+
+  mOn = false;
+  LOG(NOTICE) << "Transceiver stopped";
+}
+
 void Transceiver::addRadioVector(size_t chan, BitVector &bits,
                                  int RSSI, GSM::Time &wTime)
 {
@@ -525,17 +637,6 @@
   return bits;
 }
 
-void Transceiver::start()
-{
-  TransceiverChannel *chan;
-
-  for (size_t i = 0; i < mControlServiceLoopThreads.size(); i++) {
-    chan = new TransceiverChannel(this, i);
-    mControlServiceLoopThreads[i]->start((void * (*)(void*))
-                                 ControlServiceLoopAdapter, (void*) chan);
-  }
-}
-
 void Transceiver::reset()
 {
   for (size_t i = 0; i < mTxPriorityQueues.size(); i++)
@@ -574,39 +675,14 @@
   LOG(INFO) << "command is " << buffer;
 
   if (strcmp(command,"POWEROFF")==0) {
-    // turn off transmitter/demod
-    sprintf(response,"RSP POWEROFF 0"); 
+    stop();
+    sprintf(response,"RSP POWEROFF 0");
   }
   else if (strcmp(command,"POWERON")==0) {
-    // turn on transmitter/demod
-    if (!mTxFreq || !mRxFreq) 
+    if (!start())
       sprintf(response,"RSP POWERON 1");
-    else {
+    else
       sprintf(response,"RSP POWERON 0");
-      if (!chan && !mOn) {
-        // Prepare for thread start
-        mRadioInterface->start();
-
-        // Start radio interface threads.
-        mTxLowerLoopThread->start((void * (*)(void*))
-                                  TxLowerLoopAdapter,(void*) this);
-        mRxLowerLoopThread->start((void * (*)(void*))
-                                  RxLowerLoopAdapter,(void*) this);
-
-        for (size_t i = 0; i < mChans; i++) {
-          TransceiverChannel *chan = new TransceiverChannel(this, i);
-          mRxServiceLoopThreads[i]->start((void * (*)(void*))
-                                  RxUpperLoopAdapter, (void*) chan);
-
-          chan = new TransceiverChannel(this, i);
-          mTxPriorityQueueServiceLoopThreads[i]->start((void * (*)(void*))
-                                  TxUpperLoopAdapter, (void*) chan);
-        }
-
-        writeClockInterface();
-        mOn = true;
-      }
-    }
   }
   else if (strcmp(command,"SETMAXDLY")==0) {
     //set expected maximum time-of-arrival
@@ -855,7 +931,7 @@
 
   LOG(INFO) << "ClockInterface: sending " << command;
 
-  mClockSocket->write(command, strlen(command) + 1);
+  mClockSocket.write(command, strlen(command) + 1);
 
   mLastClockUpdateTime = mTransmitDeadlineClock;
 
@@ -923,15 +999,7 @@
   trx->setPriority(0.40);
 
   while (1) {
-    bool stale = false;
-    // Flush the UDP packets until a successful transfer.
-    while (!trx->driveTxPriorityQueue(num)) {
-      stale = true;
-    }
-    if (!num && stale) {
-      // If a packet was stale, remind the GSM stack of the clock.
-      trx->writeClockInterface();
-    }
+    trx->driveTxPriorityQueue(num);
     pthread_testcancel();
   }
   return NULL;
diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h
index 56f9115..0b81511 100644
--- a/Transceiver52M/Transceiver.h
+++ b/Transceiver52M/Transceiver.h
@@ -91,12 +91,10 @@
 private:
   int mBasePort;
   std::string mAddr;
-  GSM::Time mTransmitLatency;     ///< latency between basestation clock and transmit deadline clock
-  GSM::Time mLatencyUpdateTime;   ///< last time latency was updated
 
   std::vector<UDPSocket *> mDataSockets;  ///< socket for writing to/reading from GSM core
   std::vector<UDPSocket *> mCtrlSockets;  ///< socket for writing/reading control commands from GSM core
-  UDPSocket *mClockSocket;                ///< socket for writing clock updates to GSM core
+  UDPSocket mClockSocket;                 ///< socket for writing clock updates to GSM core
 
   std::vector<VectorQueue> mTxPriorityQueues;   ///< priority queue of transmit bursts received from GSM core
   std::vector<VectorFIFO *>  mReceiveFIFO;      ///< radioInterface FIFO of receive bursts
@@ -107,6 +105,8 @@
   std::vector<Thread *> mControlServiceLoopThreads;         ///< thread to process control messages from GSM core
   std::vector<Thread *> mTxPriorityQueueServiceLoopThreads; ///< thread to process transmit bursts from GSM core
 
+  GSM::Time mTransmitLatency;             ///< latency between basestation clock and transmit deadline clock
+  GSM::Time mLatencyUpdateTime;           ///< last time latency was updated
   GSM::Time mTransmitDeadlineClock;       ///< deadline for pushing bursts into transmit FIFO 
   GSM::Time mLastClockUpdateTime;         ///< last time clock update was sent up to core
 
@@ -173,6 +173,13 @@
 
   std::vector<TransceiverState> mStates;
 
+  /** Start and stop I/O threads through the control socket API */
+  bool start();
+  void stop();
+
+  /** Protect destructor accessable stop call */
+  Mutex mLock;
+
 public:
 
   /** Transceiver constructor 
@@ -191,8 +198,7 @@
   /** Destructor */
   ~Transceiver();
 
-  /** start the Transceiver */
-  void start();
+  /** Start the control loop */
   bool init(bool filler);
 
   /** attach the radioInterface receive FIFO */
diff --git a/Transceiver52M/UHDDevice.cpp b/Transceiver52M/UHDDevice.cpp
index c914868..cbfc2e4 100644
--- a/Transceiver52M/UHDDevice.cpp
+++ b/Transceiver52M/UHDDevice.cpp
@@ -40,6 +40,14 @@
 #define TX_AMPL          0.3
 #define SAMPLE_BUF_SZ    (1 << 20)
 
+/*
+ * UHD timeout value on streaming (re)start
+ *
+ * Allow some time for streaming to commence after the start command is issued,
+ * but consider a wait beyond one second to be a definite error condition.
+ */
+#define UHD_RESTART_TIMEOUT     1.0
+
 enum uhd_dev_type {
 	USRP1,
 	USRP2,
@@ -268,7 +276,7 @@
 	int open(const std::string &args, bool extref);
 	bool start();
 	bool stop();
-	void restart();
+	bool restart();
 	void setPriority(float prio);
 	enum TxWindowType getWindowType() { return tx_window; }
 
@@ -313,8 +321,9 @@
 
 	enum err_code {
 		ERROR_TIMING = -1,
-		ERROR_UNRECOVERABLE = -2,
-		ERROR_UNHANDLED = -3,
+		ERROR_TIMEOUT = -2,
+		ERROR_UNRECOVERABLE = -3,
+		ERROR_UNHANDLED = -4,
 	};
 
 private:
@@ -358,7 +367,7 @@
 	uhd::tune_request_t select_freq(double wFreq, size_t chan, bool tx);
 	bool set_freq(double freq, size_t chan, bool tx);
 
-	Thread async_event_thrd;
+	Thread *async_event_thrd;
 	bool diversity;
 };
 
@@ -396,6 +405,12 @@
 	}
 }
 
+static void thread_enable_cancel(bool cancel)
+{
+	cancel ? pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) :
+		 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+}
+
 uhd_device::uhd_device(size_t sps, size_t chans, bool diversity, double offset)
 	: tx_gain_min(0.0), tx_gain_max(0.0),
 	  rx_gain_min(0.0), rx_gain_max(0.0),
@@ -727,7 +742,7 @@
 {
 	uhd::rx_metadata_t md;
 	size_t num_smpls;
-	float timeout = 0.1f;
+	float timeout = UHD_RESTART_TIMEOUT;
 
 	std::vector<std::vector<short> >
 		pkt_bufs(chans, std::vector<short>(2 * rx_spp));
@@ -743,6 +758,8 @@
 		if (!num_smpls) {
 			switch (md.error_code) {
 			case uhd::rx_metadata_t::ERROR_CODE_TIMEOUT:
+				LOG(ALERT) << "Device timed out";
+				return false;
 			default:
 				continue;
 			}
@@ -756,7 +773,7 @@
 	return true;
 }
 
-void uhd_device::restart()
+bool uhd_device::restart()
 {
 	/* Allow 100 ms delay to align multi-channel streams */
 	double delay = 0.1;
@@ -771,7 +788,7 @@
 
 	usrp_dev->issue_stream_cmd(cmd);
 
-	flush_recv(1);
+	return flush_recv(10);
 }
 
 bool uhd_device::start()
@@ -787,10 +804,12 @@
 	uhd::msg::register_handler(&uhd_msg_handler);
 
 	// Start asynchronous event (underrun check) loop
-	async_event_thrd.start((void * (*)(void*))async_event_loop, (void*)this);
+	async_event_thrd = new Thread();
+	async_event_thrd->start((void * (*)(void*))async_event_loop, (void*)this);
 
 	// Start streaming
-	restart();
+	if (!restart())
+		return false;
 
 	// Display usrp time
 	double time_now = usrp_dev->get_time_now().get_real_secs();
@@ -810,6 +829,10 @@
 
 	usrp_dev->issue_stream_cmd(stream_cmd);
 
+	async_event_thrd->cancel();
+	async_event_thrd->join();
+	delete async_event_thrd;
+
 	started = false;
 	return true;
 }
@@ -830,6 +853,7 @@
 		switch (md.error_code) {
 		case uhd::rx_metadata_t::ERROR_CODE_TIMEOUT:
 			LOG(ALERT) << "UHD: Receive timed out";
+			return ERROR_TIMEOUT;
 		case uhd::rx_metadata_t::ERROR_CODE_OVERFLOW:
 		case uhd::rx_metadata_t::ERROR_CODE_LATE_COMMAND:
 		case uhd::rx_metadata_t::ERROR_CODE_BROKEN_CHAIN:
@@ -899,8 +923,11 @@
 
 	// Receive samples from the usrp until we have enough
 	while (rx_buffers[0]->avail_smpls(timestamp) < len) {
+		thread_enable_cancel(false);
 		size_t num_smpls = rx_stream->recv(pkt_ptrs, rx_spp,
 						   metadata, 0.1, true);
+		thread_enable_cancel(true);
+
 		rx_pkt_cnt++;
 
 		// Check for errors 
@@ -910,6 +937,9 @@
 			LOG(ALERT) << "UHD: Version " << uhd::get_version_string();
 			LOG(ALERT) << "UHD: Unrecoverable error, exiting...";
 			exit(-1);
+		case ERROR_TIMEOUT:
+			// Assume stopping condition
+			return 0;
 		case ERROR_TIMING:
 			restart();
 		case ERROR_UNHANDLED:
@@ -988,7 +1018,10 @@
 		}
 	}
 
+	thread_enable_cancel(false);
 	size_t num_smpls = tx_stream->send(bufs, len, metadata);
+	thread_enable_cancel(true);
+
 	if (num_smpls != (unsigned) len) {
 		LOG(ALERT) << "UHD: Device send timed out";
 	}
@@ -1124,7 +1157,11 @@
 bool uhd_device::recv_async_msg()
 {
 	uhd::async_metadata_t md;
-	if (!usrp_dev->get_device()->recv_async_msg(md))
+
+	thread_enable_cancel(false);
+	bool rc = usrp_dev->get_device()->recv_async_msg(md);
+	thread_enable_cancel(true);
+	if (!rc)
 		return false;
 
 	// Assume that any error requires resynchronization
diff --git a/Transceiver52M/osmo-trx.cpp b/Transceiver52M/osmo-trx.cpp
index 9215fa5..db0b2b1 100644
--- a/Transceiver52M/osmo-trx.cpp
+++ b/Transceiver52M/osmo-trx.cpp
@@ -391,8 +391,6 @@
 	if (!trx)
 		goto shutdown;
 
-	trx->start();
-
 	chans = trx->numChans();
 	std::cout << "-- Transceiver active with "
 		  << chans << " channel(s)" << std::endl;
diff --git a/Transceiver52M/radioClock.cpp b/Transceiver52M/radioClock.cpp
index 710018a..505bb01 100644
--- a/Transceiver52M/radioClock.cpp
+++ b/Transceiver52M/radioClock.cpp
@@ -23,32 +23,27 @@
 
 void RadioClock::set(const GSM::Time& wTime)
 {
-	mLock.lock();
+	ScopedLock lock(mLock);
 	mClock = wTime;
 	updateSignal.signal();
-	mLock.unlock();
 }
 
 void RadioClock::incTN()
 {
-	mLock.lock();
+	ScopedLock lock(mLock);
 	mClock.incTN();
 	updateSignal.signal();
-	mLock.unlock();
 }
 
 GSM::Time RadioClock::get()
 {
-	mLock.lock();
+	ScopedLock lock(mLock);
 	GSM::Time retVal = mClock;
-	mLock.unlock();
-
 	return retVal;
 }
 
 void RadioClock::wait()
 {
-	mLock.lock();
+	ScopedLock lock(mLock);
 	updateSignal.wait(mLock,1);
-	mLock.unlock();
 }
diff --git a/Transceiver52M/radioInterface.cpp b/Transceiver52M/radioInterface.cpp
index d67b486..369f2ac 100644
--- a/Transceiver52M/radioInterface.cpp
+++ b/Transceiver52M/radioInterface.cpp
@@ -171,15 +171,20 @@
   return mRadio->setRxFreq(freq, chan);
 }
 
-
-void RadioInterface::start()
+bool RadioInterface::start()
 {
-  LOG(INFO) << "Starting radio";
+  if (mOn)
+    return true;
+
+  LOG(INFO) << "Starting radio device";
 #ifdef USRP1
   mAlignRadioServiceLoopThread.start((void * (*)(void*))AlignRadioServiceLoopAdapter,
                                      (void*)this);
 #endif
-  mRadio->start();
+
+  if (!mRadio->start())
+    return false;
+
   writeTimestamp = mRadio->initialWriteTimestamp();
   readTimestamp = mRadio->initialReadTimestamp();
 
@@ -188,6 +193,23 @@
 
   mOn = true;
   LOG(INFO) << "Radio started";
+  return true;
+}
+
+/*
+ * Stop the radio device
+ *
+ * This is a pass-through call to the device interface. Because the underlying
+ * stop command issuance generally doesn't return confirmation on device status,
+ * this call will only return false if the device is already stopped.
+ */
+bool RadioInterface::stop()
+{
+  if (!mOn || !mRadio->stop())
+    return false;
+
+  mOn = false;
+  return true;
 }
 
 #ifdef USRP1
diff --git a/Transceiver52M/radioInterface.h b/Transceiver52M/radioInterface.h
index 877102f..b359cbd 100644
--- a/Transceiver52M/radioInterface.h
+++ b/Transceiver52M/radioInterface.h
@@ -78,7 +78,8 @@
 public:
 
   /** start the interface */
-  void start();
+  bool start();
+  bool stop();
 
   /** intialization */
   virtual bool init(int type);