ms: rearrange code to allow clean exits

This allows gracefully terminating the application by introducing queue
timeouts.

Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h
index 3dc4777..e32d77c 100644
--- a/Transceiver52M/ms/bladerf_specific.h
+++ b/Transceiver52M/ms/bladerf_specific.h
@@ -192,7 +192,7 @@
 	struct bladerf_stream *rx_stream;
 	struct bladerf_stream *tx_stream;
 	// using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>;
-	using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
+	using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
 	const unsigned int rxFullScale, txFullScale;
 	const int rxtxdelay;
 
diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h
index 1d9e217..69ff515 100644
--- a/Transceiver52M/ms/itrq.h
+++ b/Transceiver52M/ms/itrq.h
@@ -29,7 +29,58 @@
 
 namespace spsc_detail
 {
-template <bool block_read, bool block_write> class spsc_cond_detail {
+template <bool block_read, bool block_write>
+class spsc_cond_timeout_detail {
+	std::condition_variable cond_r, cond_w;
+	std::mutex lr, lw;
+	std::atomic_int r_flag, w_flag;
+	const int timeout_ms = 200;
+
+    public:
+	explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0)
+	{
+	}
+
+	~spsc_cond_timeout_detail()
+	{
+	}
+
+	ssize_t spsc_check_r()
+	{
+		std::unique_lock<std::mutex> lk(lr);
+		if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return r_flag != 0; })) {
+			r_flag--;
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+	ssize_t spsc_check_w()
+	{
+		std::unique_lock<std::mutex> lk(lw);
+		if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return w_flag != 0; })) {
+			w_flag--;
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+	void spsc_notify_r()
+	{
+		std::unique_lock<std::mutex> lk(lr);
+		r_flag++;
+		cond_r.notify_one();
+	}
+	void spsc_notify_w()
+	{
+		std::unique_lock<std::mutex> lk(lw);
+		w_flag++;
+		cond_w.notify_one();
+	}
+};
+
+template <bool block_read, bool block_write>
+class spsc_cond_detail {
 	std::condition_variable cond_r, cond_w;
 	std::mutex lr, lw;
 	std::atomic_int r_flag, w_flag;
@@ -74,7 +125,8 @@
 };
 
 // originally designed for select loop integration
-template <bool block_read, bool block_write> class spsc_efd_detail {
+template <bool block_read, bool block_write>
+class spsc_efd_detail {
 	int efd_r, efd_w; /* eventfds used to block/notify readers/writers */
 
     public:
@@ -191,4 +243,7 @@
 template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
 class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_efd_detail> {};
 template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
-class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
\ No newline at end of file
+class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
+template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
+class spsc_cond_timeout
+	: public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_timeout_detail> {};
\ No newline at end of file
diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp
index 2e91cae..e587c05 100644
--- a/Transceiver52M/ms/ms.cpp
+++ b/Transceiver52M/ms/ms.cpp
@@ -78,7 +78,7 @@
 	};
 }
 
-void ms_trx::start()
+void ms_trx::start_lower_ms()
 {
 	if (stop_lower_threads_flag)
 		return;
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index efccffc..8ca9b02 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -117,7 +117,7 @@
 	};
 };
 
-using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
+using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
 
 enum class SCH_STATE { SEARCHING, FOUND };
 
@@ -267,7 +267,7 @@
 	sched_params::target hw_target;
 	single_thread_pool worker_thread;
 
-	void start();
+	void start_lower_ms();
 	std::atomic<bool> upper_is_ready;
 	void set_upper_ready(bool is_ready);
 
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp
index 4d6ce18..26ee131 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -142,10 +142,8 @@
 		memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits));
 	}
 
-	if (upper_is_ready) { // this is blocking, so only submit if there is a reader - only if upper exists!
-		while (!rxqueue.spsc_push(&brst))
-			;
-	}
+	while (upper_is_ready && !rxqueue.spsc_push(&brst))
+		;
 
 	if (do_auto_gain)
 		maybe_update_gain(brst);
diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp
index a10d542..4b2f919 100644
--- a/Transceiver52M/ms/ms_upper.cpp
+++ b/Transceiver52M/ms/ms_upper.cpp
@@ -80,37 +80,16 @@
 		while (!g_exit_flag) {
 			driveControl();
 		}
-		std::cerr << "exit control!" << std::endl;
+		std::cerr << "exit U control!" << std::endl;
 	});
-	msleep(1);
 	thr_tx = std::thread([this] {
 		set_name_aff_sched(sched_params::thread_names::U_TX);
 		while (!g_exit_flag) {
 			driveTx();
 		}
-		std::cerr << "exit tx U!" << std::endl;
+		std::cerr << "exit U tx!" << std::endl;
 	});
 
-	// atomic ensures data is not written to q until loop reads
-	start_lower_ms();
-
-	set_name_aff_sched(sched_params::thread_names::U_RX);
-	while (!g_exit_flag) {
-		// set_upper_ready(true) needs to happen during cmd handling:
-		// the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck..
-		driveReceiveFIFO();
-		osmo_select_main(1);
-
-		trxcon_phyif_rsp r;
-		if (cmdq_from_phy.spsc_pop(&r)) {
-			DBGLG() << "HAVE RESP:" << r.type << std::endl;
-			trxcon_phyif_handle_rsp(g_trxcon, &r);
-		}
-	}
-	set_upper_ready(false);
-	std::cerr << "exit rx U!" << std::endl;
-	mOn = false;
-
 #ifdef LSANDEBUG
 	std::thread([this] {
 		set_name_aff_sched(sched_params::thread_names::LEAKCHECK);
@@ -123,9 +102,23 @@
 #endif
 }
 
-void upper_trx::start_lower_ms()
+void upper_trx::main_loop()
 {
-	ms_trx::start();
+	set_name_aff_sched(sched_params::thread_names::U_RX);
+	set_upper_ready(true);
+	while (!g_exit_flag) {
+		driveReceiveFIFO();
+		osmo_select_main(1);
+
+		trxcon_phyif_rsp r;
+		if (cmdq_from_phy.spsc_pop(&r)) {
+			DBGLG() << "HAVE RESP:" << r.type << std::endl;
+			trxcon_phyif_handle_rsp(g_trxcon, &r);
+		}
+	}
+	set_upper_ready(false);
+	std::cerr << "exit U rx!" << std::endl;
+	mOn = false;
 }
 
 // signalvector is owning despite claiming not to, but we can pretend, too..
@@ -346,7 +339,7 @@
 	case TRXCON_PHYIF_CMDT_POWERON:
 		if (!mOn) {
 			mOn = true;
-			set_upper_ready(true);
+			start_lower_ms();
 		}
 		break;
 	case TRXCON_PHYIF_CMDT_POWEROFF:
@@ -430,7 +423,7 @@
 
 	// blocking, will return when global exit is requested
 	trx->start_threads();
-
+	trx->main_loop();
 	trx->stop_threads();
 	trx->stop_upper_threads();
 
diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h
index bc9bd14..2362365 100644
--- a/Transceiver52M/ms/ms_upper.h
+++ b/Transceiver52M/ms/ms_upper.h
@@ -41,7 +41,7 @@
 
     public:
 	void start_threads();
-	void start_lower_ms();
+	void main_loop();
 	void stop_upper_threads();
 
 	upper_trx(){};