ms: fix startup & shutdown of blade

One of the mystery bugs was that the blade ms needed two starts
after powercycling the bladerf due to transfer timeouts.
This is now fixed.

Change-Id: I1cd8790191790f4861a70bc55c8f4c9993fa10c8
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h
index 78c51ab..85a2ea0 100644
--- a/Transceiver52M/ms/bladerf_specific.h
+++ b/Transceiver52M/ms/bladerf_specific.h
@@ -293,10 +293,10 @@
 		setRxGain(rxgain, 0);
 		setTxGain(txgain, 0);
 		usleep(1000);
-		blade_check(bladerf_enable_module, dev, BLADERF_MODULE_RX, true);
-		usleep(1000);
-		blade_check(bladerf_enable_module, dev, BLADERF_MODULE_TX, true);
-		usleep(1000);
+
+		bladerf_set_stream_timeout(dev, BLADERF_TX, 10);
+		bladerf_set_stream_timeout(dev, BLADERF_RX, 10);
+
 		blade_check(bladerf_init_stream, &rx_stream, dev, getrxcb(rxh), &buf_mgmt.rx_samples, BLADE_NUM_BUFFERS,
 			    BLADERF_FORMAT_SC16_Q11_META, BLADE_BUFFER_SIZE, NUM_TRANSFERS, (void *)this);
 
@@ -308,15 +308,16 @@
 			buf_mgmt.bufptrqueue.spsc_push(&cur_buffer[i]);
 		}
 
-
-		usleep(1000);
-
-		// bladerf_set_stream_timeout(dev, BLADERF_TX, 4);
-		// bladerf_set_stream_timeout(dev, BLADERF_RX, 4);
-
 		return 0;
 	}
 
+	void actually_enable_streams()
+	{
+		blade_check(bladerf_enable_module, dev, BLADERF_MODULE_RX, true);
+		usleep(1000);
+		blade_check(bladerf_enable_module, dev, BLADERF_MODULE_TX, true);
+	}
+
 	bool tuneTx(double freq, size_t chan = 0)
 	{
 		msleep(15);
@@ -418,8 +419,9 @@
 	auto get_rx_burst_handler_fn(bh_fn_t burst_handler)
 	{
 		auto fn = [this] {
-			int status;
-			status = bladerf_stream(rx_stream, BLADERF_RX_X1);
+			int status = 0;
+			if (!stop_me_flag)
+				status = bladerf_stream(rx_stream, BLADERF_RX_X1);
 			if (status < 0)
 				std::cerr << "rx stream error! " << bladerf_strerror(status) << std::endl;
 
@@ -430,8 +432,9 @@
 	auto get_tx_burst_handler_fn(bh_fn_t burst_handler)
 	{
 		auto fn = [this] {
-			int status;
-			status = bladerf_stream(tx_stream, BLADERF_TX_X1);
+			int status = 0;
+			if (!stop_me_flag)
+				status = bladerf_stream(tx_stream, BLADERF_TX_X1);
 			if (status < 0)
 				std::cerr << "rx stream error! " << bladerf_strerror(status) << std::endl;
 
@@ -442,9 +445,15 @@
 
 	void submit_burst_ts(blade_sample_type *buffer, int len, uint64_t ts)
 	{
-		//get empty bufer from list
 		tx_buf_q_type::elem_t rcd;
 
+		// exit by submitting a dummy buffer to assure the libbladerf stream mutex is happy (thread!)
+		if (!buffer) {
+			bladerf_submit_stream_buffer(tx_stream, (void *)BLADERF_STREAM_SHUTDOWN, 1000);
+			return;
+		}
+
+		//get empty bufer from list
 		while (!buf_mgmt.bufptrqueue.spsc_pop(&rcd))
 			buf_mgmt.bufptrqueue.spsc_prep_pop();
 		assert(rcd != nullptr);
diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp
index 1dc2587..f338225 100644
--- a/Transceiver52M/ms/ms.cpp
+++ b/Transceiver52M/ms/ms.cpp
@@ -276,6 +276,8 @@
 
 void ms_trx::start()
 {
+	if (stop_me_flag)
+		return;
 	auto fn = get_rx_burst_handler_fn(rx_bh());
 	rx_task = std::thread(fn);
 	set_name_aff_sched(rx_task.native_handle(), sched_params::thread_names::RXRUN);
@@ -285,6 +287,7 @@
 	tx_task = std::thread(fn2);
 	set_name_aff_sched(tx_task.native_handle(), sched_params::thread_names::TXRUN);
 
+	actually_enable_streams();
 }
 
 void ms_trx::set_upper_ready(bool is_ready)
@@ -294,11 +297,14 @@
 
 void ms_trx::stop_threads()
 {
-	std::cerr << "killing threads...\r\n" << std::endl;
+	std::cerr << "killing threads..." << std::endl;
 	stop_me_flag = true;
 	close_device();
+	std::cerr << "dev closed..." << std::endl;
 	rx_task.join();
+	std::cerr << "L rx dead..." << std::endl;
 	tx_task.join();
+	std::cerr << "L tx dead..." << std::endl;
 }
 
 void ms_trx::submit_burst(blade_sample_type *buffer, int len, GSM::Time target)
diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp
index 3cb27e3..5b213f7 100644
--- a/Transceiver52M/ms/ms_upper.cpp
+++ b/Transceiver52M/ms/ms_upper.cpp
@@ -94,6 +94,17 @@
 
 std::atomic<bool> g_exit_flag;
 
+void upper_trx::stop_upper_threads()
+{
+	g_exit_flag = true;
+
+	if (thr_control.joinable())
+		thr_control.join();
+	if (thr_rx.joinable())
+		thr_rx.join();
+	if (thr_tx.joinable())
+		thr_tx.join();
+}
 void upper_trx::start_threads()
 {
 	thr_control = std::thread([this] {
@@ -101,6 +112,7 @@
 		while (!g_exit_flag) {
 			driveControl();
 		}
+		std::cerr << "exit control!" << std::endl;
 	});
 	msleep(1);
 	thr_tx = std::thread([this] {
@@ -108,6 +120,7 @@
 		while (!g_exit_flag) {
 			driveTx();
 		}
+		std::cerr << "exit tx U!" << std::endl;
 	});
 
 	// atomic ensures data is not written to q until loop reads
@@ -115,7 +128,8 @@
 
 	set_name_aff_sched(sched_params::thread_names::U_RX);
 	while (!g_exit_flag) {
-		// set_upper_ready(true);
+		// set_upper_ready(true) needs to happen during cmd handling:
+		// the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck..
 		driveReceiveFIFO();
 		trxcon::osmo_select_main(1);
 
@@ -125,6 +139,9 @@
 			trxcon_phyif_handle_rsp(trxcon::g_trxcon, &r);
 		}
 	}
+	set_upper_ready(false);
+	std::cerr << "exit rx U!" << std::endl;
+	mOn = false;
 
 #ifdef LSANDEBUG
 	std::thread([this] {
@@ -267,8 +284,7 @@
 
 	// ensure our tx cb is tickled and can exit
 	if (g_exit_flag) {
-		blade_sample_type dummy[10] = {};
-		submit_burst_ts(dummy, 10, 1);
+		submit_burst_ts(0, 1337, 1);
 		return;
 	}
 
@@ -360,10 +376,9 @@
 		set_ta(0);
 		break;
 	case trxcon::TRXCON_PHYIF_CMDT_POWERON:
-
 		if (!mOn) {
-			set_upper_ready(true);
 			mOn = true;
+			set_upper_ready(true);
 		}
 		break;
 	case trxcon::TRXCON_PHYIF_CMDT_POWEROFF:
@@ -444,7 +459,7 @@
 void sighandler(int sigset)
 {
 	// we might get a sigpipe in case the l1ctl ud socket disconnects because mobile quits
-	if (sigset == SIGPIPE) {
+	if (sigset == SIGPIPE || sigset == SIGINT) {
 		g_exit_flag = true;
 
 		// we know the flag is atomic and it prevents the trxcon cb handlers from writing
@@ -453,6 +468,7 @@
 		trxcon::internal_q_tx_buf b = {};
 		trxcon::txq.spsc_push(&b);
 		trxcon::cmdq_to_phy.spsc_push(&cmd);
+		msleep(200);
 
 		return;
 	}
@@ -462,6 +478,7 @@
 {
 	auto tall_trxcon_ctx = talloc_init("trxcon context");
 	signal(SIGPIPE, sighandler);
+	signal(SIGINT, sighandler);
 
 	trxcon::msgb_talloc_ctx_init(tall_trxcon_ctx, 0);
 	trxc_log_init(tall_trxcon_ctx);
@@ -492,8 +509,11 @@
 		return -1;
 	}
 
+	// blocking, will return when global exit is requested
 	trx->start_threads();
+
 	trx->stop_threads();
+	trx->stop_upper_threads();
 
 	return status;
 }
diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h
index 49516e6..fe20f6d 100644
--- a/Transceiver52M/ms/ms_upper.h
+++ b/Transceiver52M/ms/ms_upper.h
@@ -43,6 +43,7 @@
     public:
 	void start_threads();
 	void start_lower_ms();
+	void stop_upper_threads();
 
 	upper_trx(){};
 };
diff --git a/Transceiver52M/ms/uhd_specific.h b/Transceiver52M/ms/uhd_specific.h
index 726bb95..07fc449 100644
--- a/Transceiver52M/ms/uhd_specific.h
+++ b/Transceiver52M/ms/uhd_specific.h
@@ -185,6 +185,11 @@
 		return 0;
 	}
 
+	void actually_enable_streams()
+	{
+		// nop: stream cmd in handler
+	}
+
 	void *rx_cb(bh_fn_t burst_handler)
 	{
 		void *ret = nullptr;