blob: 72f4c28c2a3c968131553b07c51ab7913fd1681c [file] [log] [blame]
Eric07cfdf72022-07-19 21:12:58 +02001/*
2 * (C) 2022 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
3 * All Rights Reserved
4 *
5 * Author: Eric Wild <ewild@sysmocom.de>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Affero General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 *
20 */
Eric1b65c922022-07-21 02:35:17 +020021
Eric07cfdf72022-07-19 21:12:58 +020022#pragma once
23
24#include <atomic>
25#include <complex>
26#include <cassert>
Eric2f7b82f2022-10-09 15:13:18 +020027#include <deque>
Eric1b65c922022-07-21 02:35:17 +020028#include <mutex>
Eric2f7b82f2022-10-09 15:13:18 +020029#include <vector>
30
31#include "shmif.h"
Eric1b65c922022-07-21 02:35:17 +020032
Eric07cfdf72022-07-19 21:12:58 +020033const int max_ul_rdlen = 1024 * 10;
34const int max_dl_rdlen = 1024 * 10;
35using sample_t = std::complex<int16_t>;
36struct shm_if {
37 std::atomic<bool> ms_connected;
38 struct {
Eric1b65c922022-07-21 02:35:17 +020039 shm::sema r;
40 shm::sema w;
Eric07cfdf72022-07-19 21:12:58 +020041 std::atomic<uint64_t> ts;
Eric2f7b82f2022-10-09 15:13:18 +020042 std::atomic<uint64_t> ts_req;
43 std::atomic<size_t> len_written_sps; // ->
Eric07cfdf72022-07-19 21:12:58 +020044 sample_t buffer[max_ul_rdlen];
45 } ul;
46 struct {
Eric1b65c922022-07-21 02:35:17 +020047 shm::sema r;
48 shm::sema w;
Eric07cfdf72022-07-19 21:12:58 +020049 std::atomic<uint64_t> ts;
Eric2f7b82f2022-10-09 15:13:18 +020050 std::atomic<uint64_t> ts_req;
51 std::atomic<size_t> len_written_sps;
Eric07cfdf72022-07-19 21:12:58 +020052 sample_t buffer[max_dl_rdlen];
53 } dl;
54};
55
56// unique up to signed_type/2 diff
Eric2f7b82f2022-10-09 15:13:18 +020057// ex: uint8/int8 (250, 0) = -6
Eric07cfdf72022-07-19 21:12:58 +020058template <typename A> auto unsigned_diff(A a, A b) -> typename std::make_signed<A>::type
59{
60 using stype = typename std::make_signed<A>::type;
61 return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a);
62};
63
Eric2f7b82f2022-10-09 15:13:18 +020064constexpr inline int samp2byte(int v)
65{
66 return v * sizeof(sample_t);
67}
68constexpr inline int byte2samp(int v)
69{
70 return v / sizeof(sample_t);
71}
Eric07cfdf72022-07-19 21:12:58 +020072
Eric2f7b82f2022-10-09 15:13:18 +020073struct ulentry {
74 bool done;
75 uint64_t ts;
76 unsigned int len_in_sps;
77 unsigned int read_pos_in_sps;
78 sample_t buf[1000];
79};
80/*
81 write: find read index +.. until marked free = "end" of current list
82
83 check:
84 within begin, end AND not free?
85 y:
86 copy (chunk)
87 if chunk advance burst buf ptr
88 n: next, advance, remove old.
89 */
90template <unsigned int num_bursts> class ulburstprovider {
91 std::mutex ul_q_m;
92 // std::deque<ulentry> ul_q;
93
94 // classic circular buffer
95 ulentry foo[num_bursts];
96 int current_index; // % num_bursts
97
98 void cur_buf_done()
Eric07cfdf72022-07-19 21:12:58 +020099 {
Eric2f7b82f2022-10-09 15:13:18 +0200100 foo[current_index].done = true;
101 current_index = current_index + 1 % num_bursts;
102 }
103 bool is_empty()
104 {
105 return foo[current_index].done = true;
106 }
107 void reset()
108 {
109 for (auto &i : foo)
110 i = {};
111 current_index = 0;
112 }
113 ulentry &find_free_at_end()
114 {
115 for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
116 i = (i + 1 % num_bursts), max_to_search++) {
117 if (foo[i].done)
118 return foo[i];
119 }
120 return foo[0]; // FIXME actually broken, q full, wat do?
121 }
122
123 void push_back(ulentry &e)
124 {
125 auto free_buf = find_free_at_end();
126 free_buf = e;
127 e.done = false;
Eric07cfdf72022-07-19 21:12:58 +0200128 }
129
130 public:
Eric2f7b82f2022-10-09 15:13:18 +0200131 void add(ulentry &e)
132 {
133 std::lock_guard<std::mutex> foo(ul_q_m);
134 push_back(e);
135 }
136 void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t *buf, unsigned int max_buf_write_len)
137 {
138 std::lock_guard<std::mutex> g(ul_q_m);
139
140 /*
141 1) if empty return
142 2) if not empty prune stale bursts
143 3) if only future bursts also return and zero buf
144 */
145 for (int i = current_index, max_to_search = 0; max_to_search < num_bursts;
146 i = (i + 1 % num_bursts), max_to_search++) {
147 auto cur_entry = foo[i];
148 if (is_empty()) { // might be empty due to advance below!
149 memset(buf, 0, samp2byte(req_len_in_sps));
150 return;
151 }
152
153 if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) { // remove late bursts
154 if (i == current_index) // only advance if we are at the front
155 cur_buf_done();
156 else
157 assert(true);
158 } else if (cur_entry.ts >= requested_ts + byte2samp(max_buf_write_len)) { // not in range
159 memset(buf, 0, samp2byte(req_len_in_sps));
160 return;
161
162 // FIXME: what about requested_ts <= entry.ts <= ts + reqlen?
163 } else {
164 // requested_ts <= cur_entry.ts <= requested_ts + byte2samp(max_write_len)
165
166 auto before_sps = unsigned_diff(cur_entry.ts, requested_ts);
167
168 // at least one whole buffer before our most recent "head" burst?
169 // set 0, return.
170 if (-before_sps >= byte2samp(max_buf_write_len)) {
171 memset(buf, 0, samp2byte(req_len_in_sps));
172 return;
173 }
174 // less than one full buffer before: pad 0
175 auto to_pad_sps = -before_sps;
176 memset(buf, 0, samp2byte(to_pad_sps));
177 requested_ts += to_pad_sps;
178 req_len_in_sps -= to_pad_sps;
179
180 if (!req_len_in_sps)
181 return;
182
183 // actual burst data after possible 0 pad
184 auto max_sps_to_write = std::min(cur_entry.len_in_sps, req_len_in_sps);
185 memcpy(&buf[samp2byte(to_pad_sps)], cur_entry.buf, samp2byte(max_sps_to_write));
186 requested_ts += max_sps_to_write;
187 req_len_in_sps -= max_sps_to_write;
188 cur_entry.read_pos_in_sps += max_sps_to_write;
189
190 //this buf is done...
191 if (cur_entry.read_pos_in_sps == cur_entry.len_in_sps) {
192 cur_buf_done();
193 }
194
195 if (!req_len_in_sps)
196 return;
197 }
198 }
199 }
200};
201
202class trxmsif {
203 shm::shm<shm_if> m;
204 shm_if *ptr;
205
206 ulburstprovider<10> p;
207
208 template <typename T> void read(T &direction, size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
209 {
210 static int readoffset_sps;
211 // auto &direction = ptr->dl;
212 auto buf = &direction.buffer[0];
213 size_t len_avail_sps = direction.len_written_sps.load();
214
215 auto left_to_read = len_avail_sps - readoffset_sps;
216
217 shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() << " " << readoffset_sps << std::endl;
218
219 // no data, wait for new buffer, maybe some data left afterwards
220 if (!left_to_read) {
221 assert(readoffset_sps == len_avail_sps);
222 readoffset_sps = 0;
223 direction.r.reset_unsafe();
224 direction.ts_req = (*read_ts);
225 direction.w.set(1);
226 direction.r.wait_and_reset(1);
227 assert(*read_ts != direction.ts.load());
228 // shm::sema_guard g(dl.r, dl.w);
229 *read_ts = direction.ts.load();
230 len_avail_sps = direction.len_written_sps.load();
231 readoffset_sps += howmany_sps;
232 assert(len_avail_sps >= howmany_sps);
233 memcpy(outbuf, buf, samp2byte(howmany_sps));
234
235 shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany_sps << std::endl;
236 return;
237 }
238
239 *read_ts = direction.ts.load() + readoffset_sps;
240 left_to_read = len_avail_sps - readoffset_sps;
241
242 // data left from prev read
243 if (left_to_read >= howmany_sps) {
244 memcpy(outbuf, &buf[readoffset_sps], samp2byte(howmany_sps));
245 readoffset_sps += howmany_sps;
246
247 shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany_sps << std::endl;
248 return;
249 } else {
250 memcpy(outbuf, &buf[readoffset_sps], samp2byte(left_to_read));
251 readoffset_sps = 0;
252 auto still_left_to_read = howmany_sps - left_to_read;
253 {
254 direction.r.reset_unsafe();
255 direction.ts_req = (*read_ts);
256 direction.w.set(1);
257 direction.r.wait_and_reset(1);
258 assert(*read_ts != direction.ts.load());
259 len_avail_sps = direction.len_written_sps.load();
260 assert(len_avail_sps >= still_left_to_read);
261 memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read));
262 readoffset_sps += still_left_to_read;
263 shm::mtx_log::print_guard()
264 << "\tr+++2 " << *read_ts << " " << howmany_sps << " " << still_left_to_read
265 << " new @" << direction.ts.load() << std::endl;
266 }
267 }
268 }
269
270 public:
271 trxmsif() : m("trx-ms-if")
Eric07cfdf72022-07-19 21:12:58 +0200272 {
273 }
274
275 bool create()
276 {
277 m.create();
278 ptr = m.p();
279 return m.isgood();
280 }
281 bool connect()
282 {
283 m.open();
284 ptr = m.p();
285 ptr->ms_connected = true;
Eric1b65c922022-07-21 02:35:17 +0200286 ptr->dl.w.set(1);
Eric07cfdf72022-07-19 21:12:58 +0200287 return m.isgood();
288 }
289 bool good()
290 {
291 return m.isgood();
292 }
Eric1b65c922022-07-21 02:35:17 +0200293 bool is_connected()
294 {
295 return ptr->ms_connected == true;
296 }
Eric07cfdf72022-07-19 21:12:58 +0200297
Eric2f7b82f2022-10-09 15:13:18 +0200298 /* is being read from ms side */
299 void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
300 {
301 return read(ptr->dl, howmany_sps, read_ts, outbuf);
302 }
303
304 /* is being read from trx/network side */
305 void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf)
306 {
307 // if (ptr->ms_connected != true) {
308 memset(outbuf, 0, samp2byte(howmany_sps));
309 // return;
310 // }
311 // return read(ptr->ul, howmany_sps, read_ts, outbuf);
312 }
313
314 void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf)
Eric07cfdf72022-07-19 21:12:58 +0200315 {
316 auto &dl = ptr->dl;
317 auto buf = &dl.buffer[0];
Eric2f7b82f2022-10-09 15:13:18 +0200318 if (ptr->ms_connected != true)
319 return;
Eric07cfdf72022-07-19 21:12:58 +0200320
Eric2f7b82f2022-10-09 15:13:18 +0200321 assert(sizeof(dl.buffer) >= samp2byte(howmany_sps));
Eric1b65c922022-07-21 02:35:17 +0200322 // print_guard() << "####w " << std::endl;
Eric07cfdf72022-07-19 21:12:58 +0200323
324 {
Eric1b65c922022-07-21 02:35:17 +0200325 shm::sema_wait_guard g(dl.w, dl.r);
Eric07cfdf72022-07-19 21:12:58 +0200326
Eric2f7b82f2022-10-09 15:13:18 +0200327 memcpy(buf, inbuf, samp2byte(howmany_sps));
Eric1b65c922022-07-21 02:35:17 +0200328 dl.ts.store(write_ts);
Eric2f7b82f2022-10-09 15:13:18 +0200329 dl.len_written_sps.store(howmany_sps);
Eric07cfdf72022-07-19 21:12:58 +0200330 }
Ericd0270372022-07-21 15:10:45 +0200331 shm::mtx_log::print_guard() << std::endl
Eric2f7b82f2022-10-09 15:13:18 +0200332 << "####w+ " << write_ts << " " << howmany_sps << std::endl
Ericd0270372022-07-21 15:10:45 +0200333 << std::endl;
Eric07cfdf72022-07-19 21:12:58 +0200334 }
335
Eric2f7b82f2022-10-09 15:13:18 +0200336 void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t *inbuf)
337 {
338 auto &ul = ptr->ul;
339 assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps));
340 // print_guard() << "####w " << std::endl;
341
342 ulentry e;
343 e.ts = write_ts;
344 e.len_in_sps = howmany_sps_sps;
345 e.done = false;
346 e.read_pos_in_sps = 0;
347 assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps));
348 memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps));
349 p.add(e);
350
351 shm::mtx_log::print_guard() << std::endl
352 << "####q+ " << write_ts << " " << howmany_sps_sps << std::endl
353 << std::endl;
354 }
355
356 void drive_tx()
357 {
358 auto &ul = ptr->ul;
359 auto buf = &ul.buffer[0];
360 const auto max_write_len = sizeof(ul.buffer);
361
362 // ul_q_m.lock();
363 // ul_q.push_front(e);
364 // ul_q_m.unlock();
365 // ul.w.wait_and_reset();
366
367 // no read waiting for a write
368 if (!ul.w.check_unsafe(1))
369 return;
370
371 // FIXME: store written, notify after get!
372
373 auto requested_ts = ul.ts_req.load();
374
375 p.get(requested_ts, byte2samp(max_write_len), buf, max_write_len);
376
377 // memset(buf, 0, max_write_len);
378 ul.ts.store(requested_ts);
379 ul.len_written_sps.store(byte2samp(max_write_len));
380 ul.w.reset_unsafe();
381 ul.r.set(1);
382 }
383
Eric1b65c922022-07-21 02:35:17 +0200384 void signal_read_start()
385 { /* nop */
386 }
Eric07cfdf72022-07-19 21:12:58 +0200387};