blob: 8d10e36c477c15bc065154b7d2daf62f4db5af2e [file] [log] [blame]
Erice03e34f2021-09-06 20:07:24 +02001/*
2 * (C) 2021 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
3 * All Rights Reserved
4 *
5 * Author: Eric Wild
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Affero General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 *
20 */
21
22#include <inttypes.h>
23#include <stdatomic.h>
24#include <stdbool.h>
25#include <stdint.h>
26#include <stdlib.h>
27#include <string.h>
28#include <sys/eventfd.h>
29#include <sys/types.h>
30#include <unistd.h>
31#include <talloc.h>
32
33#include <osmocom/mgcp/mgcp_threads_queue.h>
34
35/*
36classic lamport circular lockfree spsc queue:
37every "side" only writes its own ptr, but may read the other sides ptr
38
39notify reader using eventfd as soon as element is added, reader then reads until
40read fails
41-> reader pops in a loop until FALSE and might get spurious events because it
42read before it was notified, which is fine
43-> writing pushes *the same data* in a loop until TRUE, blocks
44
45shutting this down requires
461) to stop reading and pushing
472) ONE side to take care of the eventfds
48*/
49
50static struct spsc *spsc_init(void *talloc_ctx, unsigned int count, unsigned int size_per_buf, bool blockr, bool blockw)
51{
52 struct spsc *q = talloc_zero_size(talloc_ctx, sizeof(struct spsc) + sizeof(uintptr_t) * count);
53 atomic_init(&q->readptr, 0);
54 atomic_init(&q->writeptr, 0);
55 q->efd_r = eventfd(0, blockr ? 0 : EFD_NONBLOCK);
56 q->efd_w = eventfd(1, blockw ? 0 : EFD_NONBLOCK);
57 q->count = count;
58 q->size_per_buf = size_per_buf;
59 q->buf = talloc_zero_size(q, size_per_buf * count);
60
61 for (int i = 0; i < count; i++)
62 q->data[i] = (uintptr_t)q->buf + i * size_per_buf;
63 return q;
64}
65
66static void spsc_deinit(struct spsc *q)
67{
68 talloc_free(q->buf);
69 close(q->efd_r);
70 close(q->efd_w);
71 talloc_free(q);
72}
73
74static ssize_t spsc_check_r(struct spsc *q)
75{
76 uint64_t efdr;
77 return read(q->efd_r, &efdr, sizeof(uint64_t));
78}
79static ssize_t spsc_check_w(struct spsc *q)
80{
81 uint64_t efdr;
82 return read(q->efd_w, &efdr, sizeof(uint64_t));
83}
84static void spsc_notify_r(struct spsc *q)
85{
86 uint64_t efdu = 1;
87 write(q->efd_r, &efdu, sizeof(uint64_t));
88}
89static void spsc_notify_w(struct spsc *q)
90{
91 uint64_t efdu = 1;
92 write(q->efd_w, &efdu, sizeof(uint64_t));
93}
94
95/*! Adds element to the queue by copying the data.
96 * \param[in] q queue.
97 * \param[in] elem input buffer, must match the originally configured queue buffer size!.
98 * \returns true if queue was not full and element was successfully pushed */
99bool spsc_push(struct spsc *q, void *elem)
100{
101 size_t cur_wp, cur_rp;
102 cur_wp = atomic_load_explicit(&q->writeptr, memory_order_relaxed);
103 cur_rp = atomic_load_explicit(&q->readptr, memory_order_acquire);
104 if ((cur_wp + 1) % q->count == cur_rp) {
105 spsc_check_w(q); /* blocks, ensures next (!) call succeeds */
106 return false;
107 }
108 memcpy((void *)q->data[cur_wp], elem, q->size_per_buf);
109 atomic_store_explicit(&q->writeptr, (cur_wp + 1) % q->count, memory_order_release);
110 spsc_notify_r(q); /* fine after release */
111 return true;
112}
113
114/*! Reads the read-fd of the queue, which, depending on settings passed on queue creation, blocks.
115 * This function can be used to deliberately wait for a non-empty queue on the read side.
116 * \param[in] q queue.
117 * \returns result of reading the fd. */
118ssize_t spsc_prep_pop(struct spsc *q)
119{
120 return spsc_check_r(q);
121}
122
123/*! Removes element from the queue by copying the data.
124 * \param[in] q queue.
125 * \param[in] elem output buffer, must match the originally configured queue buffer size!.
126 * \returns true if queue was not empty and element was successfully removed */
127bool spsc_pop(struct spsc *q, void *elem)
128{
129 size_t cur_wp, cur_rp;
130 cur_wp = atomic_load_explicit(&q->writeptr, memory_order_acquire);
131 cur_rp = atomic_load_explicit(&q->readptr, memory_order_relaxed);
132
133 if (cur_wp == cur_rp) /* blocks via prep_pop */
134 return false;
135 memcpy(elem, (void *)q->data[cur_rp], q->size_per_buf);
136 atomic_store_explicit(&q->readptr, (cur_rp + 1) % q->count, memory_order_release);
137 spsc_notify_w(q);
138 return true;
139}
140
141/*! Creates a bidirectional queue channel that consists of two queues, one in each direction,
142 * commonly referred to as a and b side.
143 * \param[in] talloc_ctx allocation context.
144 * \param[in] count number of buffers per queue.
145 * \param[in] size_per_buf size of buffers per queue.
146 * \param[in] blockr_a should reading the a-side read fd block?.
147 * \param[in] blockw_a should reading the a-side write fd block?.
148 * \param[in] blockr_b should reading the b-side read fd block?.
149 * \param[in] blockw_b should reading the b-side write fd block?.
150 * \returns queue channel */
151struct qchan spsc_chan_init_ex(void *talloc_ctx, unsigned int count, unsigned int size_per_buf, bool blockr_a,
152 bool blockw_a, bool blockr_b, bool blockw_b)
153{
154 struct qchan q;
155 q.a = spsc_init(talloc_ctx, count, size_per_buf, blockr_a, blockw_a);
156 q.b = spsc_init(talloc_ctx, count, size_per_buf, blockr_b, blockw_b);
157 return q;
158}
159
160/*! Creates a bidirectional queue channel that consists of two queues, one in each direction,
161 * commonly referred to as a and b side.
162 * \param[in] talloc_ctx allocation context.
163 * \param[in] count number of buffers per queue.
164 * \param[in] size_per_buf size of buffers per queue.
165 * \returns queue channel */
166struct qchan spsc_chan_init(void *talloc_ctx, unsigned int count, unsigned int size_per_buf)
167{
168 return spsc_chan_init_ex(talloc_ctx, count, size_per_buf, false, true, false, true);
169}
170
171/*! Closes a bidirectional queue channel.
172 * \param[in] q queue */
173void spsc_chan_close(struct qchan *q)
174{
175 spsc_deinit(q->a);
176 spsc_deinit(q->b);
177 free(q);
178}
179
180/*! Gets queue channel read/write fd for a/b side according to function name.
181 * \param[in] q queue channel.
182 * \returns fd */
183int spsc_get_a_rdfd(struct qchan *q)
184{
185 return q->a->efd_r;
186}
187/*! Gets queue channel read/write fd for a/b side according to function name.
188 * \param[in] q queue channel.
189 * \returns fd */
190int spsc_get_b_rdfd(struct qchan *q)
191{
192 return q->b->efd_r;
193}
194/*! Gets queue channel read/write fd for a/b side according to function name.
195 * \param[in] q queue channel.
196 * \returns fd */
197int spsc_get_a_wrfd(struct qchan *q)
198{
199 return q->a->efd_w;
200}
201/*! Gets queue channel read/write fd for a/b side according to function name.
202 * \param[in] q queue channel.
203 * \returns fd */
204int spsc_get_b_wrfd(struct qchan *q)
205{
206 return q->b->efd_w;
207}