blob: 1bb0e1556b704842a88f3177a0e663f6b216cb9f [file] [log] [blame]
Harald Weltee4cd2672019-08-06 19:56:16 +02001/*! \file it_q.c
2 * Osmocom Inter-Thread queue implementation */
3/* (C) 2019 by Harald Welte <laforge@gnumonks.org>
4 * All Rights Reserved.
5 *
6 * SPDX-License-Identifier: GPL-2.0+
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
21 * MA 02110-1301, USA.
22 */
23
24/*! \addtogroup it_q
25 * @{
26 * Inter-Thread Message Queue.
27 *
28 * This implements a general-purpose queue between threads. It uses
29 * user-provided data types (containing a llist_head as initial member)
30 * as elements in the queue and an eventfd-based notification mechanism.
31 * Hence, it can be used for pretty much anything, including but not
32 * limited to msgbs, including msgb-wrapped osmo_prim.
33 *
34 * The idea is that the sending thread simply calls osmo_it_q_enqueue().
35 * The receiving thread is woken up from its osmo_select_main() loop by eventfd,
36 * and a general osmo_fd callback function for the eventfd will dequeue each item
37 * and call a queue-specific callback function.
38 */
39
40#include "../config.h"
41
42#ifdef HAVE_SYS_EVENTFD_H
43
44#include <pthread.h>
45#include <unistd.h>
46#include <string.h>
47#include <errno.h>
48#include <sys/eventfd.h>
49
50#include <osmocom/core/linuxlist.h>
51#include <osmocom/core/talloc.h>
52#include <osmocom/core/utils.h>
53#include <osmocom/core/it_q.h>
54
55/* "increment" the eventfd by specified 'inc' */
56static int eventfd_increment(int fd, uint64_t inc)
57{
58 int rc;
59
60 rc = write(fd, &inc, sizeof(inc));
61 if (rc != sizeof(inc))
62 return -1;
63
64 return 0;
65}
66
67/* global (for all threads) list of message queues in a program + associated lock */
68static LLIST_HEAD(it_queues);
69static pthread_rwlock_t it_queues_rwlock = PTHREAD_RWLOCK_INITIALIZER;
70
71/* resolve it-queue by its [globally unique] name; must be called with rwlock held */
72static struct osmo_it_q *_osmo_it_q_by_name(const char *name)
73{
74 struct osmo_it_q *q;
75 llist_for_each_entry(q, &it_queues, entry) {
76 if (!strcmp(q->name, name))
77 return q;
78 }
79 return NULL;
80}
81
82/*! resolve it-queue by its [globally unique] name */
83struct osmo_it_q *osmo_it_q_by_name(const char *name)
84{
85 struct osmo_it_q *q;
86 pthread_rwlock_rdlock(&it_queues_rwlock);
87 q = _osmo_it_q_by_name(name);
88 pthread_rwlock_unlock(&it_queues_rwlock);
89 return q;
90}
91
92/* osmo_fd call-back when eventfd is readable */
93static int osmo_it_q_fd_cb(struct osmo_fd *ofd, unsigned int what)
94{
95 struct osmo_it_q *q = (struct osmo_it_q *) ofd->data;
96 uint64_t val;
97 int i, rc;
98
99 if (!(what & OSMO_FD_READ))
100 return 0;
101
102 rc = read(ofd->fd, &val, sizeof(val));
103 if (rc < sizeof(val))
104 return rc;
105
106 for (i = 0; i < val; i++) {
107 struct llist_head *item = _osmo_it_q_dequeue(q);
108 /* in case the user might have called osmo_it_q_flush() we may
109 * end up in the eventfd-dispatch but without any messages left in the queue,
110 * otherwise I'd have loved to OSMO_ASSERT(msg) here. */
111 if (!item)
112 break;
113 q->read_cb(q, item);
114 }
115
116 return 0;
117}
118
119/*! Allocate a new inter-thread message queue.
120 * \param[in] ctx talloc context from which to allocate the queue
121 * \param[in] name human-readable string name of the queue; function creates a copy.
122 * \param[in] read_cb call-back function to be called for each de-queued message; may be
123 * NULL in case you don't want eventfd/osmo_select integration and
124 * will manually take care of noticing if and when to dequeue.
125 * \returns a newly-allocated inter-thread message queue; NULL in case of error */
126struct osmo_it_q *osmo_it_q_alloc(void *ctx, const char *name, unsigned int max_length,
127 void (*read_cb)(struct osmo_it_q *q, struct llist_head *item),
128 void *data)
129{
130 struct osmo_it_q *q;
131 int fd;
132
133 q = talloc_zero(ctx, struct osmo_it_q);
134 if (!q)
135 return NULL;
136 q->data = data;
137 q->name = talloc_strdup(q, name);
138 q->current_length = 0;
139 q->max_length = max_length;
140 q->read_cb = read_cb;
141 INIT_LLIST_HEAD(&q->list);
142 pthread_mutex_init(&q->mutex, NULL);
143 q->event_ofd.fd = -1;
144
145 if (q->read_cb) {
146 /* create eventfd *if* the user has provided a read_cb function */
147 fd = eventfd(0, 0);
148 if (fd < 0) {
149 talloc_free(q);
150 return NULL;
151 }
152
153 /* initialize BUT NOT REGISTER the osmo_fd. The receiving thread must
154 * take are to select/poll/read/... on it */
155 osmo_fd_setup(&q->event_ofd, fd, OSMO_FD_READ, osmo_it_q_fd_cb, q, 0);
156 }
157
158 /* add to global list of queues, checking for duplicate names */
159 pthread_rwlock_wrlock(&it_queues_rwlock);
160 if (_osmo_it_q_by_name(q->name)) {
161 pthread_rwlock_unlock(&it_queues_rwlock);
162 if (q->event_ofd.fd >= 0)
163 osmo_fd_close(&q->event_ofd);
164 talloc_free(q);
165 return NULL;
166 }
167 llist_add_tail(&q->entry, &it_queues);
168 pthread_rwlock_unlock(&it_queues_rwlock);
169
170 return q;
171}
172
173static void *item_dequeue(struct llist_head *queue)
174{
175 struct llist_head *lh;
176
177 if (llist_empty(queue))
178 return NULL;
179
180 lh = queue->next;
181 if (lh) {
182 llist_del(lh);
183 return lh;
184 } else
185 return NULL;
186}
187
188/*! Flush all messages currently present in queue */
189static void _osmo_it_q_flush(struct osmo_it_q *q)
190{
191 void *item;
192 while ((item = item_dequeue(&q->list))) {
193 talloc_free(item);
194 }
195 q->current_length = 0;
196}
197
198/*! Flush all messages currently present in queue */
199void osmo_it_q_flush(struct osmo_it_q *q)
200{
201 OSMO_ASSERT(q);
202
203 pthread_mutex_lock(&q->mutex);
204 _osmo_it_q_flush(q);
205 pthread_mutex_unlock(&q->mutex);
206}
207
208/*! Destroy a message queue */
209void osmo_it_q_destroy(struct osmo_it_q *q)
210{
211 OSMO_ASSERT(q);
212
213 /* first remove from global list of queues */
214 pthread_rwlock_wrlock(&it_queues_rwlock);
215 llist_del(&q->entry);
216 pthread_rwlock_unlock(&it_queues_rwlock);
217 /* next, close the eventfd */
218 if (q->event_ofd.fd >= 0)
219 osmo_fd_close(&q->event_ofd);
220 /* flush all messages still present */
221 osmo_it_q_flush(q);
222 pthread_mutex_destroy(&q->mutex);
223 /* and finally release memory */
224 talloc_free(q);
225}
226
227/*! Thread-safe en-queue to an inter-thread message queue.
228 * \param[in] queue Inter-thread queue on which to enqueue
229 * \param[in] item Item to enqueue. Must have llist_head as first member!
230 * \returns 0 on success; negative on error */
231int _osmo_it_q_enqueue(struct osmo_it_q *queue, struct llist_head *item)
232{
233 OSMO_ASSERT(queue);
234 OSMO_ASSERT(item);
235
236 pthread_mutex_lock(&queue->mutex);
237 if (queue->current_length+1 > queue->max_length) {
238 pthread_mutex_unlock(&queue->mutex);
239 return -ENOSPC;
240 }
241 llist_add_tail(item, &queue->list);
242 queue->current_length++;
243 pthread_mutex_unlock(&queue->mutex);
244 /* increment eventfd counter by one */
245 if (queue->event_ofd.fd >= 0)
246 eventfd_increment(queue->event_ofd.fd, 1);
247 return 0;
248}
249
250
251/*! Thread-safe de-queue from an inter-thread message queue.
252 * \param[in] queue Inter-thread queue from which to dequeue
253 * \returns dequeued message buffer; NULL if none available
254 */
255struct llist_head *_osmo_it_q_dequeue(struct osmo_it_q *queue)
256{
257 struct llist_head *l;
258 OSMO_ASSERT(queue);
259
260 pthread_mutex_lock(&queue->mutex);
261
262 if (llist_empty(&queue->list))
263 l = NULL;
264 l = queue->list.next;
265 OSMO_ASSERT(l);
266 llist_del(l);
267 queue->current_length--;
268
269 pthread_mutex_unlock(&queue->mutex);
270
271 return l;
272}
273
274
275#endif /* HAVE_SYS_EVENTFD_H */
276
277/*! @} */