blob: 810dc9034962e46d15f187799f8fb8b6b9e26091 [file] [log] [blame]
Harald Weltee4cd2672019-08-06 19:56:16 +02001/*! \file it_q.c
2 * Osmocom Inter-Thread queue implementation */
3/* (C) 2019 by Harald Welte <laforge@gnumonks.org>
4 * All Rights Reserved.
5 *
6 * SPDX-License-Identifier: GPL-2.0+
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
Harald Weltee4cd2672019-08-06 19:56:16 +020017 */
18
19/*! \addtogroup it_q
20 * @{
21 * Inter-Thread Message Queue.
22 *
23 * This implements a general-purpose queue between threads. It uses
24 * user-provided data types (containing a llist_head as initial member)
25 * as elements in the queue and an eventfd-based notification mechanism.
26 * Hence, it can be used for pretty much anything, including but not
27 * limited to msgbs, including msgb-wrapped osmo_prim.
28 *
29 * The idea is that the sending thread simply calls osmo_it_q_enqueue().
30 * The receiving thread is woken up from its osmo_select_main() loop by eventfd,
31 * and a general osmo_fd callback function for the eventfd will dequeue each item
32 * and call a queue-specific callback function.
33 */
34
Pau Espin Pedrol88955fb2023-01-18 18:54:00 +010035#include "config.h"
Harald Weltee4cd2672019-08-06 19:56:16 +020036
37#ifdef HAVE_SYS_EVENTFD_H
38
39#include <pthread.h>
40#include <unistd.h>
41#include <string.h>
42#include <errno.h>
43#include <sys/eventfd.h>
44
45#include <osmocom/core/linuxlist.h>
46#include <osmocom/core/talloc.h>
47#include <osmocom/core/utils.h>
48#include <osmocom/core/it_q.h>
49
50/* "increment" the eventfd by specified 'inc' */
51static int eventfd_increment(int fd, uint64_t inc)
52{
53 int rc;
54
55 rc = write(fd, &inc, sizeof(inc));
56 if (rc != sizeof(inc))
57 return -1;
58
59 return 0;
60}
61
62/* global (for all threads) list of message queues in a program + associated lock */
63static LLIST_HEAD(it_queues);
64static pthread_rwlock_t it_queues_rwlock = PTHREAD_RWLOCK_INITIALIZER;
65
66/* resolve it-queue by its [globally unique] name; must be called with rwlock held */
67static struct osmo_it_q *_osmo_it_q_by_name(const char *name)
68{
69 struct osmo_it_q *q;
70 llist_for_each_entry(q, &it_queues, entry) {
71 if (!strcmp(q->name, name))
72 return q;
73 }
74 return NULL;
75}
76
77/*! resolve it-queue by its [globally unique] name */
78struct osmo_it_q *osmo_it_q_by_name(const char *name)
79{
80 struct osmo_it_q *q;
81 pthread_rwlock_rdlock(&it_queues_rwlock);
82 q = _osmo_it_q_by_name(name);
83 pthread_rwlock_unlock(&it_queues_rwlock);
84 return q;
85}
86
87/* osmo_fd call-back when eventfd is readable */
88static int osmo_it_q_fd_cb(struct osmo_fd *ofd, unsigned int what)
89{
90 struct osmo_it_q *q = (struct osmo_it_q *) ofd->data;
91 uint64_t val;
92 int i, rc;
93
94 if (!(what & OSMO_FD_READ))
95 return 0;
96
97 rc = read(ofd->fd, &val, sizeof(val));
98 if (rc < sizeof(val))
99 return rc;
100
101 for (i = 0; i < val; i++) {
102 struct llist_head *item = _osmo_it_q_dequeue(q);
103 /* in case the user might have called osmo_it_q_flush() we may
104 * end up in the eventfd-dispatch but without any messages left in the queue,
105 * otherwise I'd have loved to OSMO_ASSERT(msg) here. */
106 if (!item)
107 break;
108 q->read_cb(q, item);
109 }
110
111 return 0;
112}
113
114/*! Allocate a new inter-thread message queue.
115 * \param[in] ctx talloc context from which to allocate the queue
116 * \param[in] name human-readable string name of the queue; function creates a copy.
117 * \param[in] read_cb call-back function to be called for each de-queued message; may be
118 * NULL in case you don't want eventfd/osmo_select integration and
119 * will manually take care of noticing if and when to dequeue.
120 * \returns a newly-allocated inter-thread message queue; NULL in case of error */
121struct osmo_it_q *osmo_it_q_alloc(void *ctx, const char *name, unsigned int max_length,
122 void (*read_cb)(struct osmo_it_q *q, struct llist_head *item),
123 void *data)
124{
125 struct osmo_it_q *q;
126 int fd;
127
128 q = talloc_zero(ctx, struct osmo_it_q);
129 if (!q)
130 return NULL;
131 q->data = data;
132 q->name = talloc_strdup(q, name);
133 q->current_length = 0;
134 q->max_length = max_length;
135 q->read_cb = read_cb;
136 INIT_LLIST_HEAD(&q->list);
137 pthread_mutex_init(&q->mutex, NULL);
138 q->event_ofd.fd = -1;
139
140 if (q->read_cb) {
141 /* create eventfd *if* the user has provided a read_cb function */
142 fd = eventfd(0, 0);
143 if (fd < 0) {
144 talloc_free(q);
145 return NULL;
146 }
147
148 /* initialize BUT NOT REGISTER the osmo_fd. The receiving thread must
149 * take are to select/poll/read/... on it */
150 osmo_fd_setup(&q->event_ofd, fd, OSMO_FD_READ, osmo_it_q_fd_cb, q, 0);
151 }
152
153 /* add to global list of queues, checking for duplicate names */
154 pthread_rwlock_wrlock(&it_queues_rwlock);
155 if (_osmo_it_q_by_name(q->name)) {
156 pthread_rwlock_unlock(&it_queues_rwlock);
157 if (q->event_ofd.fd >= 0)
158 osmo_fd_close(&q->event_ofd);
159 talloc_free(q);
160 return NULL;
161 }
162 llist_add_tail(&q->entry, &it_queues);
163 pthread_rwlock_unlock(&it_queues_rwlock);
164
165 return q;
166}
167
168static void *item_dequeue(struct llist_head *queue)
169{
170 struct llist_head *lh;
171
172 if (llist_empty(queue))
173 return NULL;
174
175 lh = queue->next;
176 if (lh) {
177 llist_del(lh);
178 return lh;
179 } else
180 return NULL;
181}
182
183/*! Flush all messages currently present in queue */
184static void _osmo_it_q_flush(struct osmo_it_q *q)
185{
186 void *item;
187 while ((item = item_dequeue(&q->list))) {
188 talloc_free(item);
189 }
190 q->current_length = 0;
191}
192
193/*! Flush all messages currently present in queue */
194void osmo_it_q_flush(struct osmo_it_q *q)
195{
196 OSMO_ASSERT(q);
197
198 pthread_mutex_lock(&q->mutex);
199 _osmo_it_q_flush(q);
200 pthread_mutex_unlock(&q->mutex);
201}
202
203/*! Destroy a message queue */
204void osmo_it_q_destroy(struct osmo_it_q *q)
205{
206 OSMO_ASSERT(q);
207
208 /* first remove from global list of queues */
209 pthread_rwlock_wrlock(&it_queues_rwlock);
210 llist_del(&q->entry);
211 pthread_rwlock_unlock(&it_queues_rwlock);
212 /* next, close the eventfd */
213 if (q->event_ofd.fd >= 0)
214 osmo_fd_close(&q->event_ofd);
215 /* flush all messages still present */
216 osmo_it_q_flush(q);
217 pthread_mutex_destroy(&q->mutex);
218 /* and finally release memory */
219 talloc_free(q);
220}
221
222/*! Thread-safe en-queue to an inter-thread message queue.
223 * \param[in] queue Inter-thread queue on which to enqueue
224 * \param[in] item Item to enqueue. Must have llist_head as first member!
225 * \returns 0 on success; negative on error */
226int _osmo_it_q_enqueue(struct osmo_it_q *queue, struct llist_head *item)
227{
228 OSMO_ASSERT(queue);
229 OSMO_ASSERT(item);
230
231 pthread_mutex_lock(&queue->mutex);
232 if (queue->current_length+1 > queue->max_length) {
233 pthread_mutex_unlock(&queue->mutex);
234 return -ENOSPC;
235 }
236 llist_add_tail(item, &queue->list);
237 queue->current_length++;
238 pthread_mutex_unlock(&queue->mutex);
239 /* increment eventfd counter by one */
240 if (queue->event_ofd.fd >= 0)
241 eventfd_increment(queue->event_ofd.fd, 1);
242 return 0;
243}
244
245
246/*! Thread-safe de-queue from an inter-thread message queue.
247 * \param[in] queue Inter-thread queue from which to dequeue
Vadim Yanitskiy9c972932023-12-09 02:56:12 +0700248 * \returns llist_head of dequeued message; NULL if none available
Harald Weltee4cd2672019-08-06 19:56:16 +0200249 */
250struct llist_head *_osmo_it_q_dequeue(struct osmo_it_q *queue)
251{
252 struct llist_head *l;
253 OSMO_ASSERT(queue);
254
255 pthread_mutex_lock(&queue->mutex);
256
Vadim Yanitskiy9c972932023-12-09 02:56:12 +0700257 l = item_dequeue(&queue->list);
258 if (l != NULL)
259 queue->current_length--;
Harald Weltee4cd2672019-08-06 19:56:16 +0200260
261 pthread_mutex_unlock(&queue->mutex);
262
263 return l;
264}
265
266
267#endif /* HAVE_SYS_EVENTFD_H */
268
269/*! @} */