ms: use single thread pool

...so we don't spawn threads all the time.
Used for gain avg/setting.

Change-Id: Id675550f55e8ccbbbe6b0d91fbffd01b6ede15f7
diff --git a/Transceiver52M/Makefile.am b/Transceiver52M/Makefile.am
index 875e9a1..296ff05 100644
--- a/Transceiver52M/Makefile.am
+++ b/Transceiver52M/Makefile.am
@@ -97,6 +97,7 @@
 	ms/ms_upper.h \
 	ms/itrq.h \
 	ms/sch.h \
+	ms/threadpool.h \
 	grgsm_vitac/viterbi_detector.h \
 	grgsm_vitac/constants.h \
 	grgsm_vitac/grgsm_vitac.h
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index 7381397..d92f4b7 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -41,6 +41,7 @@
 #include "Complex.h"
 #include "GSMCommon.h"
 #include "itrq.h"
+#include "threadpool.h"
 
 const unsigned int ONE_TS_BURST_LEN = (3 + 58 + 26 + 58 + 3 + 8.25) * 4 /*sps*/;
 const unsigned int NUM_RXQ_FRAMES = 1; // rx thread <-> upper rx queue
@@ -266,6 +267,7 @@
 	time_keeper timekeeper;
 	int hw_cpus;
 	sched_params::target hw_target;
+	single_thread_pool worker_thread;
 
 	void start();
 	std::atomic<bool> upper_is_ready;
@@ -291,6 +293,7 @@
 		  hw_target(hw_cpus > 4 ? sched_params::target::ODROID : sched_params::target::PI4)
 	{
 		std::cerr << "scheduling for: " << (hw_cpus > 4 ? "odroid" : "pi4") << std::endl;
+		set_name_aff_sched(worker_thread.get_handle(), sched_params::thread_names::SCH_SEARCH);
 	}
 
 	virtual ~ms_trx()
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp b/Transceiver52M/ms/ms_rx_lower.cpp
index e8d8e0e..dc0d56d 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -30,6 +30,8 @@
 #include "ms.h"
 #include "grgsm_vitac/grgsm_vitac.h"
 
+#include "threadpool.h"
+
 extern "C" {
 #include "sch.h"
 }
@@ -126,8 +128,11 @@
 		gainoffset = runmean < (rxFullScale / 2 ? 2 : 1);
 		float newgain = runmean < rx_max_cutoff ? rxgain + gainoffset : rxgain - gainoffset;
 		// FIXME: gian cutoff
-		if (newgain != rxgain && newgain <= 60)
-			std::thread([this, newgain] { setRxGain(newgain); }).detach();
+		if (newgain != rxgain && newgain <= 60) {
+			auto gain_fun = [this, newgain] { setRxGain(newgain); };
+			worker_thread.add_task(gain_fun);
+		}
+
 		runmean = 0;
 	}
 	gain_check = (gain_check + 1) % avgburst_num;
@@ -217,26 +222,36 @@
 	return false;
 }
 
+/*
+accumulates a full big buffer consisting of 8*12 timeslots, then:
+either
+1) adjusts gain if necessary and starts over
+2) searches and finds SCH and is done
+*/
 SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd)
 {
 	static unsigned int sch_pos = 0;
+	auto to_copy = SCH_LEN_SPS - sch_pos;
+
 	if (sch_thread_done)
 		return SCH_STATE::FOUND;
 
 	if (rcv_done)
 		return SCH_STATE::SEARCHING;
 
-	auto to_copy = SCH_LEN_SPS - sch_pos;
-
-	if (SCH_LEN_SPS == to_copy) // first time
+	if (sch_pos == 0) // keep first ts for time delta calc
 		first_sch_buf_rcv_ts = rcd->get_first_ts();
 
-	if (!to_copy) {
+	if (to_copy) {
+		auto spsmax = rcd->actual_samples_per_buffer();
+		if (to_copy > (unsigned int)spsmax)
+			sch_pos += rcd->readall(first_sch_buf + sch_pos);
+		else
+			sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy);
+	} else { // (!to_copy)
 		sch_pos = 0;
 		rcv_done = true;
-		std::thread([this] {
-			set_name_aff_sched(sched_params::thread_names::SCH_SEARCH);
-
+		auto sch_search_fun = [this] {
 			auto ptr = reinterpret_cast<const int16_t *>(first_sch_buf);
 			const auto target_val = rxFullScale / 8;
 			float sum = 0;
@@ -255,16 +270,9 @@
 
 			if (!sch_thread_done)
 				rcv_done = false; // retry!
-			return (bool)sch_thread_done;
-		}).detach();
+		};
+		worker_thread.add_task(sch_search_fun);
 	}
-
-	auto spsmax = rcd->actual_samples_per_buffer();
-	if (to_copy > (unsigned int)spsmax)
-		sch_pos += rcd->readall(first_sch_buf + sch_pos);
-	else
-		sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy);
-
 	return SCH_STATE::SEARCHING;
 }
 
diff --git a/Transceiver52M/ms/threadpool.h b/Transceiver52M/ms/threadpool.h
new file mode 100644
index 0000000..a5dec97
--- /dev/null
+++ b/Transceiver52M/ms/threadpool.h
@@ -0,0 +1,92 @@
+#pragma once
+/*
+ * (C) 2023 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved
+ *
+ * Author: Eric Wild <ewild@sysmocom.de>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <future>
+#include <mutex>
+#include <queue>
+
+struct single_thread_pool {
+	std::mutex m;
+	std::condition_variable cv;
+	std::atomic<bool> stop_flag;
+	std::atomic<bool> is_ready;
+	std::deque<std::function<void()>> wq;
+	std::thread worker_thread;
+
+	template <class F>
+	void add_task(F &&f)
+	{
+		std::unique_lock<std::mutex> l(m);
+		wq.emplace_back(std::forward<F>(f));
+		cv.notify_one();
+		return;
+	}
+
+	single_thread_pool() : worker_thread(std::thread([this] { thread_loop(); }))
+	{
+	}
+	~single_thread_pool()
+	{
+		stop();
+	}
+
+	std::thread::native_handle_type get_handle()
+	{
+		return worker_thread.native_handle();
+	}
+
+    private:
+	void stop()
+	{
+		{
+			std::unique_lock<std::mutex> l(m);
+			wq.clear();
+			stop_flag = true;
+			cv.notify_one();
+		}
+		worker_thread.join();
+	}
+
+	void thread_loop()
+	{
+		while (true) {
+			is_ready = true;
+			std::function<void()> f;
+			{
+				std::unique_lock<std::mutex> l(m);
+				if (wq.empty()) {
+					cv.wait(l, [&] { return !wq.empty() || stop_flag; });
+				}
+				if (stop_flag)
+					return;
+				is_ready = false;
+				f = std::move(wq.front());
+				wq.pop_front();
+			}
+			f();
+		}
+	}
+};
\ No newline at end of file