Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 1 | /* |
| 2 | * (C) 2022 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> |
| 3 | * All Rights Reserved |
| 4 | * |
| 5 | * Author: Eric Wild <ewild@sysmocom.de> |
| 6 | * |
| 7 | * This program is free software; you can redistribute it and/or modify |
| 8 | * it under the terms of the GNU Affero General Public License as published by |
| 9 | * the Free Software Foundation; either version 3 of the License, or |
| 10 | * (at your option) any later version. |
| 11 | * |
| 12 | * This program is distributed in the hope that it will be useful, |
| 13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 15 | * GNU Affero General Public License for more details. |
| 16 | * |
| 17 | * You should have received a copy of the GNU Affero General Public License |
| 18 | * along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 19 | * |
| 20 | */ |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 21 | |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 22 | #pragma once |
| 23 | |
| 24 | #include <atomic> |
| 25 | #include <complex> |
| 26 | #include <cassert> |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 27 | #include <deque> |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 28 | #include <mutex> |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 29 | #include <vector> |
| 30 | |
| 31 | #include "shmif.h" |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 32 | |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 33 | const int max_ul_rdlen = 1024 * 10; |
| 34 | const int max_dl_rdlen = 1024 * 10; |
| 35 | using sample_t = std::complex<int16_t>; |
| 36 | struct shm_if { |
| 37 | std::atomic<bool> ms_connected; |
| 38 | struct { |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 39 | shm::sema r; |
| 40 | shm::sema w; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 41 | std::atomic<uint64_t> ts; |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 42 | std::atomic<uint64_t> ts_req; |
| 43 | std::atomic<size_t> len_written_sps; // -> |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 44 | sample_t buffer[max_ul_rdlen]; |
| 45 | } ul; |
| 46 | struct { |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 47 | shm::sema r; |
| 48 | shm::sema w; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 49 | std::atomic<uint64_t> ts; |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 50 | std::atomic<uint64_t> ts_req; |
| 51 | std::atomic<size_t> len_written_sps; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 52 | sample_t buffer[max_dl_rdlen]; |
| 53 | } dl; |
| 54 | }; |
| 55 | |
| 56 | // unique up to signed_type/2 diff |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 57 | // ex: uint8/int8 (250, 0) = -6 |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 58 | template <typename A> auto unsigned_diff(A a, A b) -> typename std::make_signed<A>::type |
| 59 | { |
| 60 | using stype = typename std::make_signed<A>::type; |
| 61 | return (a > b) ? static_cast<stype>(a - b) : -static_cast<stype>(b - a); |
| 62 | }; |
| 63 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 64 | constexpr inline int samp2byte(int v) |
| 65 | { |
| 66 | return v * sizeof(sample_t); |
| 67 | } |
| 68 | constexpr inline int byte2samp(int v) |
| 69 | { |
| 70 | return v / sizeof(sample_t); |
| 71 | } |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 72 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 73 | struct ulentry { |
| 74 | bool done; |
| 75 | uint64_t ts; |
| 76 | unsigned int len_in_sps; |
| 77 | unsigned int read_pos_in_sps; |
| 78 | sample_t buf[1000]; |
| 79 | }; |
| 80 | /* |
| 81 | write: find read index +.. until marked free = "end" of current list |
| 82 | |
| 83 | check: |
| 84 | within begin, end AND not free? |
| 85 | y: |
| 86 | copy (chunk) |
| 87 | if chunk advance burst buf ptr |
| 88 | n: next, advance, remove old. |
| 89 | */ |
| 90 | template <unsigned int num_bursts> class ulburstprovider { |
| 91 | std::mutex ul_q_m; |
| 92 | // std::deque<ulentry> ul_q; |
| 93 | |
| 94 | // classic circular buffer |
| 95 | ulentry foo[num_bursts]; |
| 96 | int current_index; // % num_bursts |
| 97 | |
| 98 | void cur_buf_done() |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 99 | { |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 100 | foo[current_index].done = true; |
| 101 | current_index = current_index + 1 % num_bursts; |
| 102 | } |
| 103 | bool is_empty() |
| 104 | { |
| 105 | return foo[current_index].done = true; |
| 106 | } |
| 107 | void reset() |
| 108 | { |
| 109 | for (auto &i : foo) |
| 110 | i = {}; |
| 111 | current_index = 0; |
| 112 | } |
| 113 | ulentry &find_free_at_end() |
| 114 | { |
| 115 | for (int i = current_index, max_to_search = 0; max_to_search < num_bursts; |
| 116 | i = (i + 1 % num_bursts), max_to_search++) { |
| 117 | if (foo[i].done) |
| 118 | return foo[i]; |
| 119 | } |
| 120 | return foo[0]; // FIXME actually broken, q full, wat do? |
| 121 | } |
| 122 | |
| 123 | void push_back(ulentry &e) |
| 124 | { |
| 125 | auto free_buf = find_free_at_end(); |
| 126 | free_buf = e; |
| 127 | e.done = false; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 128 | } |
| 129 | |
| 130 | public: |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 131 | void add(ulentry &e) |
| 132 | { |
| 133 | std::lock_guard<std::mutex> foo(ul_q_m); |
| 134 | push_back(e); |
| 135 | } |
| 136 | void get(uint64_t requested_ts, unsigned int req_len_in_sps, sample_t *buf, unsigned int max_buf_write_len) |
| 137 | { |
| 138 | std::lock_guard<std::mutex> g(ul_q_m); |
| 139 | |
| 140 | /* |
| 141 | 1) if empty return |
| 142 | 2) if not empty prune stale bursts |
| 143 | 3) if only future bursts also return and zero buf |
| 144 | */ |
| 145 | for (int i = current_index, max_to_search = 0; max_to_search < num_bursts; |
| 146 | i = (i + 1 % num_bursts), max_to_search++) { |
| 147 | auto cur_entry = foo[i]; |
| 148 | if (is_empty()) { // might be empty due to advance below! |
| 149 | memset(buf, 0, samp2byte(req_len_in_sps)); |
| 150 | return; |
| 151 | } |
| 152 | |
| 153 | if (cur_entry.ts + cur_entry.len_in_sps < requested_ts) { // remove late bursts |
| 154 | if (i == current_index) // only advance if we are at the front |
| 155 | cur_buf_done(); |
| 156 | else |
| 157 | assert(true); |
| 158 | } else if (cur_entry.ts >= requested_ts + byte2samp(max_buf_write_len)) { // not in range |
| 159 | memset(buf, 0, samp2byte(req_len_in_sps)); |
| 160 | return; |
| 161 | |
| 162 | // FIXME: what about requested_ts <= entry.ts <= ts + reqlen? |
| 163 | } else { |
| 164 | // requested_ts <= cur_entry.ts <= requested_ts + byte2samp(max_write_len) |
| 165 | |
| 166 | auto before_sps = unsigned_diff(cur_entry.ts, requested_ts); |
| 167 | |
| 168 | // at least one whole buffer before our most recent "head" burst? |
| 169 | // set 0, return. |
| 170 | if (-before_sps >= byte2samp(max_buf_write_len)) { |
| 171 | memset(buf, 0, samp2byte(req_len_in_sps)); |
| 172 | return; |
| 173 | } |
| 174 | // less than one full buffer before: pad 0 |
| 175 | auto to_pad_sps = -before_sps; |
| 176 | memset(buf, 0, samp2byte(to_pad_sps)); |
| 177 | requested_ts += to_pad_sps; |
| 178 | req_len_in_sps -= to_pad_sps; |
| 179 | |
| 180 | if (!req_len_in_sps) |
| 181 | return; |
| 182 | |
| 183 | // actual burst data after possible 0 pad |
| 184 | auto max_sps_to_write = std::min(cur_entry.len_in_sps, req_len_in_sps); |
| 185 | memcpy(&buf[samp2byte(to_pad_sps)], cur_entry.buf, samp2byte(max_sps_to_write)); |
| 186 | requested_ts += max_sps_to_write; |
| 187 | req_len_in_sps -= max_sps_to_write; |
| 188 | cur_entry.read_pos_in_sps += max_sps_to_write; |
| 189 | |
| 190 | //this buf is done... |
| 191 | if (cur_entry.read_pos_in_sps == cur_entry.len_in_sps) { |
| 192 | cur_buf_done(); |
| 193 | } |
| 194 | |
| 195 | if (!req_len_in_sps) |
| 196 | return; |
| 197 | } |
| 198 | } |
| 199 | } |
| 200 | }; |
| 201 | |
| 202 | class trxmsif { |
| 203 | shm::shm<shm_if> m; |
| 204 | shm_if *ptr; |
| 205 | |
| 206 | ulburstprovider<10> p; |
| 207 | |
| 208 | template <typename T> void read(T &direction, size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) |
| 209 | { |
| 210 | static int readoffset_sps; |
| 211 | // auto &direction = ptr->dl; |
| 212 | auto buf = &direction.buffer[0]; |
| 213 | size_t len_avail_sps = direction.len_written_sps.load(); |
| 214 | |
| 215 | auto left_to_read = len_avail_sps - readoffset_sps; |
| 216 | |
| 217 | shm::mtx_log::print_guard() << "\tr @" << direction.ts.load() << " " << readoffset_sps << std::endl; |
| 218 | |
| 219 | // no data, wait for new buffer, maybe some data left afterwards |
| 220 | if (!left_to_read) { |
| 221 | assert(readoffset_sps == len_avail_sps); |
| 222 | readoffset_sps = 0; |
| 223 | direction.r.reset_unsafe(); |
| 224 | direction.ts_req = (*read_ts); |
| 225 | direction.w.set(1); |
| 226 | direction.r.wait_and_reset(1); |
| 227 | assert(*read_ts != direction.ts.load()); |
| 228 | // shm::sema_guard g(dl.r, dl.w); |
| 229 | *read_ts = direction.ts.load(); |
| 230 | len_avail_sps = direction.len_written_sps.load(); |
| 231 | readoffset_sps += howmany_sps; |
| 232 | assert(len_avail_sps >= howmany_sps); |
| 233 | memcpy(outbuf, buf, samp2byte(howmany_sps)); |
| 234 | |
| 235 | shm::mtx_log::print_guard() << "\tr+ " << *read_ts << " " << howmany_sps << std::endl; |
| 236 | return; |
| 237 | } |
| 238 | |
| 239 | *read_ts = direction.ts.load() + readoffset_sps; |
| 240 | left_to_read = len_avail_sps - readoffset_sps; |
| 241 | |
| 242 | // data left from prev read |
| 243 | if (left_to_read >= howmany_sps) { |
| 244 | memcpy(outbuf, &buf[readoffset_sps], samp2byte(howmany_sps)); |
| 245 | readoffset_sps += howmany_sps; |
| 246 | |
| 247 | shm::mtx_log::print_guard() << "\tr++ " << *read_ts << " " << howmany_sps << std::endl; |
| 248 | return; |
| 249 | } else { |
| 250 | memcpy(outbuf, &buf[readoffset_sps], samp2byte(left_to_read)); |
| 251 | readoffset_sps = 0; |
| 252 | auto still_left_to_read = howmany_sps - left_to_read; |
| 253 | { |
| 254 | direction.r.reset_unsafe(); |
| 255 | direction.ts_req = (*read_ts); |
| 256 | direction.w.set(1); |
| 257 | direction.r.wait_and_reset(1); |
| 258 | assert(*read_ts != direction.ts.load()); |
| 259 | len_avail_sps = direction.len_written_sps.load(); |
| 260 | assert(len_avail_sps >= still_left_to_read); |
| 261 | memcpy(&outbuf[left_to_read], buf, samp2byte(still_left_to_read)); |
| 262 | readoffset_sps += still_left_to_read; |
| 263 | shm::mtx_log::print_guard() |
| 264 | << "\tr+++2 " << *read_ts << " " << howmany_sps << " " << still_left_to_read |
| 265 | << " new @" << direction.ts.load() << std::endl; |
| 266 | } |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | public: |
| 271 | trxmsif() : m("trx-ms-if") |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 272 | { |
| 273 | } |
| 274 | |
| 275 | bool create() |
| 276 | { |
| 277 | m.create(); |
| 278 | ptr = m.p(); |
| 279 | return m.isgood(); |
| 280 | } |
| 281 | bool connect() |
| 282 | { |
| 283 | m.open(); |
| 284 | ptr = m.p(); |
| 285 | ptr->ms_connected = true; |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 286 | ptr->dl.w.set(1); |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 287 | return m.isgood(); |
| 288 | } |
| 289 | bool good() |
| 290 | { |
| 291 | return m.isgood(); |
| 292 | } |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 293 | bool is_connected() |
| 294 | { |
| 295 | return ptr->ms_connected == true; |
| 296 | } |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 297 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 298 | /* is being read from ms side */ |
| 299 | void read_dl(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) |
| 300 | { |
| 301 | return read(ptr->dl, howmany_sps, read_ts, outbuf); |
| 302 | } |
| 303 | |
| 304 | /* is being read from trx/network side */ |
| 305 | void read_ul(size_t howmany_sps, uint64_t *read_ts, sample_t *outbuf) |
| 306 | { |
| 307 | // if (ptr->ms_connected != true) { |
| 308 | memset(outbuf, 0, samp2byte(howmany_sps)); |
| 309 | // return; |
| 310 | // } |
| 311 | // return read(ptr->ul, howmany_sps, read_ts, outbuf); |
| 312 | } |
| 313 | |
| 314 | void write_dl(size_t howmany_sps, uint64_t write_ts, sample_t *inbuf) |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 315 | { |
| 316 | auto &dl = ptr->dl; |
| 317 | auto buf = &dl.buffer[0]; |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 318 | if (ptr->ms_connected != true) |
| 319 | return; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 320 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 321 | assert(sizeof(dl.buffer) >= samp2byte(howmany_sps)); |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 322 | // print_guard() << "####w " << std::endl; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 323 | |
| 324 | { |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 325 | shm::sema_wait_guard g(dl.w, dl.r); |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 326 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 327 | memcpy(buf, inbuf, samp2byte(howmany_sps)); |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 328 | dl.ts.store(write_ts); |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 329 | dl.len_written_sps.store(howmany_sps); |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 330 | } |
Eric | d027037 | 2022-07-21 15:10:45 +0200 | [diff] [blame] | 331 | shm::mtx_log::print_guard() << std::endl |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 332 | << "####w+ " << write_ts << " " << howmany_sps << std::endl |
Eric | d027037 | 2022-07-21 15:10:45 +0200 | [diff] [blame] | 333 | << std::endl; |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 334 | } |
| 335 | |
Eric | 2f7b82f | 2022-10-09 15:13:18 +0200 | [diff] [blame] | 336 | void write_ul(size_t howmany_sps_sps, uint64_t write_ts, sample_t *inbuf) |
| 337 | { |
| 338 | auto &ul = ptr->ul; |
| 339 | assert(sizeof(ul.buffer) >= samp2byte(howmany_sps_sps)); |
| 340 | // print_guard() << "####w " << std::endl; |
| 341 | |
| 342 | ulentry e; |
| 343 | e.ts = write_ts; |
| 344 | e.len_in_sps = howmany_sps_sps; |
| 345 | e.done = false; |
| 346 | e.read_pos_in_sps = 0; |
| 347 | assert(sizeof(e.buf) >= samp2byte(howmany_sps_sps)); |
| 348 | memcpy(e.buf, inbuf, samp2byte(howmany_sps_sps)); |
| 349 | p.add(e); |
| 350 | |
| 351 | shm::mtx_log::print_guard() << std::endl |
| 352 | << "####q+ " << write_ts << " " << howmany_sps_sps << std::endl |
| 353 | << std::endl; |
| 354 | } |
| 355 | |
| 356 | void drive_tx() |
| 357 | { |
| 358 | auto &ul = ptr->ul; |
| 359 | auto buf = &ul.buffer[0]; |
| 360 | const auto max_write_len = sizeof(ul.buffer); |
| 361 | |
| 362 | // ul_q_m.lock(); |
| 363 | // ul_q.push_front(e); |
| 364 | // ul_q_m.unlock(); |
| 365 | // ul.w.wait_and_reset(); |
| 366 | |
| 367 | // no read waiting for a write |
| 368 | if (!ul.w.check_unsafe(1)) |
| 369 | return; |
| 370 | |
| 371 | // FIXME: store written, notify after get! |
| 372 | |
| 373 | auto requested_ts = ul.ts_req.load(); |
| 374 | |
| 375 | p.get(requested_ts, byte2samp(max_write_len), buf, max_write_len); |
| 376 | |
| 377 | // memset(buf, 0, max_write_len); |
| 378 | ul.ts.store(requested_ts); |
| 379 | ul.len_written_sps.store(byte2samp(max_write_len)); |
| 380 | ul.w.reset_unsafe(); |
| 381 | ul.r.set(1); |
| 382 | } |
| 383 | |
Eric | 1b65c92 | 2022-07-21 02:35:17 +0200 | [diff] [blame] | 384 | void signal_read_start() |
| 385 | { /* nop */ |
| 386 | } |
Eric | 07cfdf7 | 2022-07-19 21:12:58 +0200 | [diff] [blame] | 387 | }; |