blob: 1aa17d4f13fc0620c0fa67f4242e701232dbb48e [file] [log] [blame]
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01001/*! \file osmo_io_uring.c
2 * io_uring backend for osmo_io.
3 *
4 * (C) 2022-2023 by sysmocom s.f.m.c.
5 * Author: Daniel Willmann <daniel@sysmocom.de>
6 *
7 * All Rights Reserved.
8 *
9 * SPDX-License-Identifier: GPL-2.0+
10 *
11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2 of the License, or
14 * (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 */
21
22/* TODO:
23 * Parameters:
24 * - number of simultaneous read/write in uring for given fd
25 *
26 */
27
28#include "../config.h"
29#if defined(__linux__)
30
31#include <stdio.h>
32#include <talloc.h>
33#include <unistd.h>
34#include <string.h>
35#include <stdbool.h>
36#include <errno.h>
37
38#include <sys/eventfd.h>
39#include <liburing.h>
40
41#include <osmocom/core/osmo_io.h>
42#include <osmocom/core/linuxlist.h>
43#include <osmocom/core/logging.h>
44#include <osmocom/core/msgb.h>
45#include <osmocom/core/select.h>
46#include <osmocom/core/talloc.h>
47#include <osmocom/core/utils.h>
48#include <osmocom/core/socket.h>
49
50#include "osmo_io_internal.h"
51
52#define IOFD_URING_ENTRIES 4096
53
54struct osmo_io_uring {
55 struct osmo_fd event_ofd;
56 struct io_uring ring;
57};
58
59static __thread struct osmo_io_uring g_ring;
60
61static void iofd_uring_cqe(struct io_uring *ring);
Harald Welte987a86a2023-11-18 18:46:24 +010062
63/*! read call-back for eventfd notifying us if entries are in the completion queue */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +010064static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
65{
66 struct io_uring *ring = ofd->data;
67 eventfd_t val;
68 int rc;
69
70 if (what & OSMO_FD_READ) {
71 rc = eventfd_read(ofd->fd, &val);
72 if (rc < 0) {
73 LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
74 return rc;
75 }
76
77 iofd_uring_cqe(ring);
78 }
79 if (what & OSMO_FD_WRITE)
80 OSMO_ASSERT(0);
81
82 return 0;
83}
84
85/*! initialize the uring and tie it into our event loop */
86void osmo_iofd_uring_init(void)
87{
88 int rc;
89 rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
90 if (rc < 0)
91 OSMO_ASSERT(0);
92
93 rc = eventfd(0, 0);
94 if (rc < 0) {
95 io_uring_queue_exit(&g_ring.ring);
96 OSMO_ASSERT(0);
97 }
98
99 osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
100 osmo_fd_register(&g_ring.event_ofd);
101 io_uring_register_eventfd(&g_ring.ring, rc);
102}
103
104
105static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
106{
107 struct msgb *msg;
108 struct iofd_msghdr *msghdr;
109 struct io_uring_sqe *sqe;
110
111 msg = iofd_msgb_pending_or_alloc(iofd);
112 if (!msg) {
113 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
114 OSMO_ASSERT(0);
115 }
116
117 msghdr = iofd_msghdr_alloc(iofd, action, msg);
118 if (!msghdr) {
119 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
120 OSMO_ASSERT(0);
121 }
122
123 msghdr->iov[0].iov_base = msg->tail;
124 msghdr->iov[0].iov_len = msgb_tailroom(msg);
125
126 switch (action) {
127 case IOFD_ACT_READ:
128 break;
129 case IOFD_ACT_RECVFROM:
130 msghdr->hdr.msg_iov = &msghdr->iov[0];
131 msghdr->hdr.msg_iovlen = 1;
132 msghdr->hdr.msg_name = &msghdr->osa.u.sa;
133 msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
134 break;
135 default:
136 OSMO_ASSERT(0);
137 }
138
139 sqe = io_uring_get_sqe(&g_ring.ring);
140 if (!sqe) {
141 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
142 OSMO_ASSERT(0);
143 }
144
145 switch (action) {
146 case IOFD_ACT_READ:
147 io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
148 break;
149 case IOFD_ACT_RECVFROM:
150 io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
151 break;
152 default:
153 OSMO_ASSERT(0);
154 }
155 io_uring_sqe_set_data(sqe, msghdr);
156
157 io_uring_submit(&g_ring.ring);
158 /* NOTE: This only works if we have one read per fd */
159 iofd->u.uring.read_msghdr = msghdr;
160}
161
Harald Welte987a86a2023-11-18 18:46:24 +0100162/*! completion call-back for READ/RECVFROM */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100163static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
164{
165 struct osmo_io_fd *iofd = msghdr->iofd;
166 struct msgb *msg = msghdr->msg;
167
168 if (rc > 0)
169 msgb_put(msg, rc);
170
171 if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
172 iofd_handle_recv(iofd, msg, rc, msghdr);
173
174 if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
175 iofd_uring_submit_recv(iofd, msghdr->action);
176 else
177 iofd->u.uring.read_msghdr = NULL;
178
179
180 iofd_msghdr_free(msghdr);
181}
182
183static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
184
Harald Welte987a86a2023-11-18 18:46:24 +0100185/*! completion call-back for WRITE/SENDTO */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100186static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
187{
188 struct osmo_io_fd *iofd = msghdr->iofd;
189
Daniel Willmann84611882023-11-21 10:17:00 +0100190 if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
191 msgb_free(msghdr->msg);
192 iofd_msghdr_free(msghdr);
193 } else {
194 iofd_handle_send_completion(iofd, rc, msghdr);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100195 }
196
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100197 iofd->u.uring.write_msghdr = NULL;
Harald Welte987a86a2023-11-18 18:46:24 +0100198 /* submit the next to-be-transmitted message for this file descriptor */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100199 if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
200 iofd_uring_submit_tx(iofd);
201}
202
Harald Welte987a86a2023-11-18 18:46:24 +0100203/*! handle completion of a single I/O message */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100204static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
205{
206 struct osmo_io_fd *iofd = msghdr->iofd;
207
208 IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
209
210 switch (msghdr->action) {
211 case IOFD_ACT_READ:
212 case IOFD_ACT_RECVFROM:
213 iofd_uring_handle_recv(msghdr, res);
214 break;
215 case IOFD_ACT_WRITE:
216 case IOFD_ACT_SENDTO:
217 iofd_uring_handle_tx(msghdr, res);
218 break;
219 default:
220 OSMO_ASSERT(0)
221 }
222
223 if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
224 IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
225
226 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
227 talloc_free(iofd);
228}
229
Harald Welte987a86a2023-11-18 18:46:24 +0100230/*! process all pending completion queue entries in given io_uring */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100231static void iofd_uring_cqe(struct io_uring *ring)
232{
233 int rc;
234 struct io_uring_cqe *cqe;
235 struct iofd_msghdr *msghdr;
236
237 while (io_uring_peek_cqe(ring, &cqe) == 0) {
238
239 msghdr = io_uring_cqe_get_data(cqe);
240 if (!msghdr) {
241 LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
242 io_uring_cqe_seen(ring, cqe);
243 continue;
244 }
245
246 rc = cqe->res;
247 /* Hand the entry back to the kernel before */
248 io_uring_cqe_seen(ring, cqe);
249
250 iofd_uring_handle_completion(msghdr, rc);
251
252 }
253}
254
Harald Welte987a86a2023-11-18 18:46:24 +0100255/*! will submit the next to-be-transmitted message for given iofd */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100256static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
257{
258 struct io_uring_sqe *sqe;
259 struct iofd_msghdr *msghdr;
260
261 msghdr = iofd_txqueue_dequeue(iofd);
262 if (!msghdr)
263 return -ENODATA;
264
265 sqe = io_uring_get_sqe(&g_ring.ring);
266 if (!sqe) {
267 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
268 OSMO_ASSERT(0);
269 }
270
271 io_uring_sqe_set_data(sqe, msghdr);
272
273 switch (msghdr->action) {
274 case IOFD_ACT_WRITE:
275 case IOFD_ACT_SENDTO:
276 io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
277 break;
278 default:
279 OSMO_ASSERT(0);
280 }
281
282 io_uring_submit(&g_ring.ring);
283 iofd->u.uring.write_msghdr = msghdr;
284
285 return 0;
286}
287
288static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
289static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
290
291static int iofd_uring_register(struct osmo_io_fd *iofd)
292{
293 return 0;
294}
295
296static int iofd_uring_unregister(struct osmo_io_fd *iofd)
297{
298 struct io_uring_sqe *sqe;
299 if (iofd->u.uring.read_msghdr) {
300 sqe = io_uring_get_sqe(&g_ring.ring);
301 OSMO_ASSERT(sqe != NULL);
302 io_uring_sqe_set_data(sqe, NULL);
303 LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
304 io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
305 }
306
307 if (iofd->u.uring.write_msghdr) {
308 sqe = io_uring_get_sqe(&g_ring.ring);
309 OSMO_ASSERT(sqe != NULL);
310 io_uring_sqe_set_data(sqe, NULL);
311 LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
312 io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
313 }
314 io_uring_submit(&g_ring.ring);
315
316 return 0;
317}
318
319static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
320{
321 iofd->u.uring.write_enabled = true;
322
323 if (iofd->u.uring.write_msghdr)
324 return;
325
326 if (osmo_iofd_txqueue_len(iofd) > 0)
327 iofd_uring_submit_tx(iofd);
328 else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
329 /* Empty write request to check when the socket is connected */
330 struct iofd_msghdr *msghdr;
331 struct io_uring_sqe *sqe;
332 struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
333 if (!msg) {
334 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
335 OSMO_ASSERT(0);
336 }
337 msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
338 if (!msghdr) {
339 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
340 OSMO_ASSERT(0);
341 }
342
343 msghdr->iov[0].iov_base = msgb_data(msg);
344 msghdr->iov[0].iov_len = msgb_tailroom(msg);
345
346 sqe = io_uring_get_sqe(&g_ring.ring);
347 if (!sqe) {
348 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
349 OSMO_ASSERT(0);
350 }
351 // Prep msgb/iov
352 io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
353 io_uring_sqe_set_data(sqe, msghdr);
354
355 io_uring_submit(&g_ring.ring);
356 iofd->u.uring.write_msghdr = msghdr;
357 }
358}
359
360static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
361{
362 iofd->u.uring.write_enabled = false;
363}
364
365static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
366{
367 iofd->u.uring.read_enabled = true;
368
369 if (iofd->u.uring.read_msghdr)
370 return;
371
372 switch (iofd->mode) {
373 case OSMO_IO_FD_MODE_READ_WRITE:
374 iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
375 break;
376 case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
377 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
378 break;
379 default:
380 OSMO_ASSERT(0);
381 }
382}
383
384static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
385{
386 iofd->u.uring.read_enabled = false;
387}
388
389static int iofd_uring_close(struct osmo_io_fd *iofd)
390{
391 iofd_uring_read_disable(iofd);
392 iofd_uring_write_disable(iofd);
393 iofd_uring_unregister(iofd);
394 return close(iofd->fd);
395}
396
397const struct iofd_backend_ops iofd_uring_ops = {
398 .register_fd = iofd_uring_register,
399 .unregister_fd = iofd_uring_unregister,
400 .close = iofd_uring_close,
401 .write_enable = iofd_uring_write_enable,
402 .write_disable = iofd_uring_write_disable,
403 .read_enable = iofd_uring_read_enable,
404 .read_disable = iofd_uring_read_disable,
405};
406
407#endif /* defined(__linux__) */