ms: rearrange code to allow clean exits
This allows gracefully terminating the application by introducing queue
timeouts.
Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
diff --git a/Transceiver52M/ms/bladerf_specific.h b/Transceiver52M/ms/bladerf_specific.h
index 3dc4777..e32d77c 100644
--- a/Transceiver52M/ms/bladerf_specific.h
+++ b/Transceiver52M/ms/bladerf_specific.h
@@ -192,7 +192,7 @@
struct bladerf_stream *rx_stream;
struct bladerf_stream *tx_stream;
// using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>;
- using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
+ using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, true, false>;
const unsigned int rxFullScale, txFullScale;
const int rxtxdelay;
diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h
index 1d9e217..69ff515 100644
--- a/Transceiver52M/ms/itrq.h
+++ b/Transceiver52M/ms/itrq.h
@@ -29,7 +29,58 @@
namespace spsc_detail
{
-template <bool block_read, bool block_write> class spsc_cond_detail {
+template <bool block_read, bool block_write>
+class spsc_cond_timeout_detail {
+ std::condition_variable cond_r, cond_w;
+ std::mutex lr, lw;
+ std::atomic_int r_flag, w_flag;
+ const int timeout_ms = 200;
+
+ public:
+ explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0)
+ {
+ }
+
+ ~spsc_cond_timeout_detail()
+ {
+ }
+
+ ssize_t spsc_check_r()
+ {
+ std::unique_lock<std::mutex> lk(lr);
+ if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return r_flag != 0; })) {
+ r_flag--;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ ssize_t spsc_check_w()
+ {
+ std::unique_lock<std::mutex> lk(lw);
+ if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), [&] { return w_flag != 0; })) {
+ w_flag--;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ void spsc_notify_r()
+ {
+ std::unique_lock<std::mutex> lk(lr);
+ r_flag++;
+ cond_r.notify_one();
+ }
+ void spsc_notify_w()
+ {
+ std::unique_lock<std::mutex> lk(lw);
+ w_flag++;
+ cond_w.notify_one();
+ }
+};
+
+template <bool block_read, bool block_write>
+class spsc_cond_detail {
std::condition_variable cond_r, cond_w;
std::mutex lr, lw;
std::atomic_int r_flag, w_flag;
@@ -74,7 +125,8 @@
};
// originally designed for select loop integration
-template <bool block_read, bool block_write> class spsc_efd_detail {
+template <bool block_read, bool block_write>
+class spsc_efd_detail {
int efd_r, efd_w; /* eventfds used to block/notify readers/writers */
public:
@@ -191,4 +243,7 @@
template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_efd_detail> {};
template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
-class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
\ No newline at end of file
+class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_detail> {};
+template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
+class spsc_cond_timeout
+ : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, spsc_detail::spsc_cond_timeout_detail> {};
\ No newline at end of file
diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp
index 2e91cae..e587c05 100644
--- a/Transceiver52M/ms/ms.cpp
+++ b/Transceiver52M/ms/ms.cpp
@@ -78,7 +78,7 @@
};
}
-void ms_trx::start()
+void ms_trx::start_lower_ms()
{
if (stop_lower_threads_flag)
return;
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index efccffc..8ca9b02 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -117,7 +117,7 @@
};
};
-using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
+using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
enum class SCH_STATE { SEARCHING, FOUND };
@@ -267,7 +267,7 @@
sched_params::target hw_target;
single_thread_pool worker_thread;
- void start();
+ void start_lower_ms();
std::atomic<bool> upper_is_ready;
void set_upper_ready(bool is_ready);
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp
index 4d6ce18..26ee131 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -142,10 +142,8 @@
memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits));
}
- if (upper_is_ready) { // this is blocking, so only submit if there is a reader - only if upper exists!
- while (!rxqueue.spsc_push(&brst))
- ;
- }
+ while (upper_is_ready && !rxqueue.spsc_push(&brst))
+ ;
if (do_auto_gain)
maybe_update_gain(brst);
diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp
index a10d542..4b2f919 100644
--- a/Transceiver52M/ms/ms_upper.cpp
+++ b/Transceiver52M/ms/ms_upper.cpp
@@ -80,37 +80,16 @@
while (!g_exit_flag) {
driveControl();
}
- std::cerr << "exit control!" << std::endl;
+ std::cerr << "exit U control!" << std::endl;
});
- msleep(1);
thr_tx = std::thread([this] {
set_name_aff_sched(sched_params::thread_names::U_TX);
while (!g_exit_flag) {
driveTx();
}
- std::cerr << "exit tx U!" << std::endl;
+ std::cerr << "exit U tx!" << std::endl;
});
- // atomic ensures data is not written to q until loop reads
- start_lower_ms();
-
- set_name_aff_sched(sched_params::thread_names::U_RX);
- while (!g_exit_flag) {
- // set_upper_ready(true) needs to happen during cmd handling:
- // the main loop is driven by rx, so unless rx is on AND transceiver is on we get stuck..
- driveReceiveFIFO();
- osmo_select_main(1);
-
- trxcon_phyif_rsp r;
- if (cmdq_from_phy.spsc_pop(&r)) {
- DBGLG() << "HAVE RESP:" << r.type << std::endl;
- trxcon_phyif_handle_rsp(g_trxcon, &r);
- }
- }
- set_upper_ready(false);
- std::cerr << "exit rx U!" << std::endl;
- mOn = false;
-
#ifdef LSANDEBUG
std::thread([this] {
set_name_aff_sched(sched_params::thread_names::LEAKCHECK);
@@ -123,9 +102,23 @@
#endif
}
-void upper_trx::start_lower_ms()
+void upper_trx::main_loop()
{
- ms_trx::start();
+ set_name_aff_sched(sched_params::thread_names::U_RX);
+ set_upper_ready(true);
+ while (!g_exit_flag) {
+ driveReceiveFIFO();
+ osmo_select_main(1);
+
+ trxcon_phyif_rsp r;
+ if (cmdq_from_phy.spsc_pop(&r)) {
+ DBGLG() << "HAVE RESP:" << r.type << std::endl;
+ trxcon_phyif_handle_rsp(g_trxcon, &r);
+ }
+ }
+ set_upper_ready(false);
+ std::cerr << "exit U rx!" << std::endl;
+ mOn = false;
}
// signalvector is owning despite claiming not to, but we can pretend, too..
@@ -346,7 +339,7 @@
case TRXCON_PHYIF_CMDT_POWERON:
if (!mOn) {
mOn = true;
- set_upper_ready(true);
+ start_lower_ms();
}
break;
case TRXCON_PHYIF_CMDT_POWEROFF:
@@ -430,7 +423,7 @@
// blocking, will return when global exit is requested
trx->start_threads();
-
+ trx->main_loop();
trx->stop_threads();
trx->stop_upper_threads();
diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h
index bc9bd14..2362365 100644
--- a/Transceiver52M/ms/ms_upper.h
+++ b/Transceiver52M/ms/ms_upper.h
@@ -41,7 +41,7 @@
public:
void start_threads();
- void start_lower_ms();
+ void main_loop();
void stop_upper_threads();
upper_trx(){};