blob: abeea798fb9d44878a9255c6f91133128b54ea30 [file] [log] [blame]
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01001/*! \file osmo_io_uring.c
2 * io_uring backend for osmo_io.
3 *
4 * (C) 2022-2023 by sysmocom s.f.m.c.
5 * Author: Daniel Willmann <daniel@sysmocom.de>
6 *
7 * All Rights Reserved.
8 *
9 * SPDX-License-Identifier: GPL-2.0+
10 *
11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2 of the License, or
14 * (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 */
21
22/* TODO:
23 * Parameters:
24 * - number of simultaneous read/write in uring for given fd
25 *
26 */
27
28#include "../config.h"
29#if defined(__linux__)
30
31#include <stdio.h>
32#include <talloc.h>
33#include <unistd.h>
34#include <string.h>
35#include <stdbool.h>
36#include <errno.h>
37
38#include <sys/eventfd.h>
39#include <liburing.h>
40
41#include <osmocom/core/osmo_io.h>
42#include <osmocom/core/linuxlist.h>
43#include <osmocom/core/logging.h>
44#include <osmocom/core/msgb.h>
45#include <osmocom/core/select.h>
46#include <osmocom/core/talloc.h>
47#include <osmocom/core/utils.h>
48#include <osmocom/core/socket.h>
49
50#include "osmo_io_internal.h"
51
52#define IOFD_URING_ENTRIES 4096
53
54struct osmo_io_uring {
55 struct osmo_fd event_ofd;
56 struct io_uring ring;
57};
58
59static __thread struct osmo_io_uring g_ring;
60
61static void iofd_uring_cqe(struct io_uring *ring);
Harald Welte987a86a2023-11-18 18:46:24 +010062
63/*! read call-back for eventfd notifying us if entries are in the completion queue */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +010064static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
65{
66 struct io_uring *ring = ofd->data;
67 eventfd_t val;
68 int rc;
69
70 if (what & OSMO_FD_READ) {
71 rc = eventfd_read(ofd->fd, &val);
72 if (rc < 0) {
73 LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
74 return rc;
75 }
76
77 iofd_uring_cqe(ring);
78 }
79 if (what & OSMO_FD_WRITE)
80 OSMO_ASSERT(0);
81
82 return 0;
83}
84
85/*! initialize the uring and tie it into our event loop */
86void osmo_iofd_uring_init(void)
87{
88 int rc;
89 rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
90 if (rc < 0)
91 OSMO_ASSERT(0);
92
93 rc = eventfd(0, 0);
94 if (rc < 0) {
95 io_uring_queue_exit(&g_ring.ring);
96 OSMO_ASSERT(0);
97 }
98
99 osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
100 osmo_fd_register(&g_ring.event_ofd);
101 io_uring_register_eventfd(&g_ring.ring, rc);
102}
103
104
105static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
106{
107 struct msgb *msg;
108 struct iofd_msghdr *msghdr;
109 struct io_uring_sqe *sqe;
110
111 msg = iofd_msgb_pending_or_alloc(iofd);
112 if (!msg) {
113 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
114 OSMO_ASSERT(0);
115 }
116
117 msghdr = iofd_msghdr_alloc(iofd, action, msg);
118 if (!msghdr) {
119 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
120 OSMO_ASSERT(0);
121 }
122
123 msghdr->iov[0].iov_base = msg->tail;
124 msghdr->iov[0].iov_len = msgb_tailroom(msg);
125
126 switch (action) {
127 case IOFD_ACT_READ:
128 break;
129 case IOFD_ACT_RECVFROM:
130 msghdr->hdr.msg_iov = &msghdr->iov[0];
131 msghdr->hdr.msg_iovlen = 1;
132 msghdr->hdr.msg_name = &msghdr->osa.u.sa;
133 msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
134 break;
135 default:
136 OSMO_ASSERT(0);
137 }
138
139 sqe = io_uring_get_sqe(&g_ring.ring);
140 if (!sqe) {
141 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
142 OSMO_ASSERT(0);
143 }
144
145 switch (action) {
146 case IOFD_ACT_READ:
147 io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
148 break;
149 case IOFD_ACT_RECVFROM:
150 io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
151 break;
152 default:
153 OSMO_ASSERT(0);
154 }
155 io_uring_sqe_set_data(sqe, msghdr);
156
157 io_uring_submit(&g_ring.ring);
158 /* NOTE: This only works if we have one read per fd */
159 iofd->u.uring.read_msghdr = msghdr;
160}
161
Harald Welte987a86a2023-11-18 18:46:24 +0100162/*! completion call-back for READ/RECVFROM */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100163static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
164{
165 struct osmo_io_fd *iofd = msghdr->iofd;
166 struct msgb *msg = msghdr->msg;
167
168 if (rc > 0)
169 msgb_put(msg, rc);
170
171 if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
172 iofd_handle_recv(iofd, msg, rc, msghdr);
173
174 if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
175 iofd_uring_submit_recv(iofd, msghdr->action);
176 else
177 iofd->u.uring.read_msghdr = NULL;
178
179
180 iofd_msghdr_free(msghdr);
181}
182
183static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
184
Harald Welte987a86a2023-11-18 18:46:24 +0100185/*! completion call-back for WRITE/SENDTO */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100186static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
187{
188 struct osmo_io_fd *iofd = msghdr->iofd;
Daniel Willmannec7d4912023-08-30 17:18:34 +0200189 struct msgb *msg = msghdr->msg;
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100190
191 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
192 goto out_free;
193
194 /* Error during write */
195 if (rc < 0) {
196 if (msghdr->action == IOFD_ACT_WRITE)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200197 iofd->io_ops.write_cb(iofd, rc, msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100198 else if (msghdr->action == IOFD_ACT_SENDTO)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200199 iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100200 else
201 OSMO_ASSERT(0);
202 goto out_free;
203 }
204
205 /* Incomplete write */
Daniel Willmannec7d4912023-08-30 17:18:34 +0200206 if (rc < msgb_length(msg)) {
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100207 /* Re-enqueue remaining data */
Daniel Willmannec7d4912023-08-30 17:18:34 +0200208 msgb_pull(msg, rc);
209 msghdr->iov[0].iov_len = msgb_length(msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100210 iofd_txqueue_enqueue_front(iofd, msghdr);
211 goto out;
212 }
213
214 if (msghdr->action == IOFD_ACT_WRITE)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200215 iofd->io_ops.write_cb(iofd, rc, msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100216 else if (msghdr->action == IOFD_ACT_SENDTO)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200217 iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100218 else
219 OSMO_ASSERT(0);
220
221out_free:
222 msgb_free(msghdr->msg);
223 iofd_msghdr_free(msghdr);
224
225out:
226 iofd->u.uring.write_msghdr = NULL;
Harald Welte987a86a2023-11-18 18:46:24 +0100227 /* submit the next to-be-transmitted message for this file descriptor */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100228 if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
229 iofd_uring_submit_tx(iofd);
230}
231
Harald Welte987a86a2023-11-18 18:46:24 +0100232/*! handle completion of a single I/O message */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100233static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
234{
235 struct osmo_io_fd *iofd = msghdr->iofd;
236
237 IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
238
239 switch (msghdr->action) {
240 case IOFD_ACT_READ:
241 case IOFD_ACT_RECVFROM:
242 iofd_uring_handle_recv(msghdr, res);
243 break;
244 case IOFD_ACT_WRITE:
245 case IOFD_ACT_SENDTO:
246 iofd_uring_handle_tx(msghdr, res);
247 break;
248 default:
249 OSMO_ASSERT(0)
250 }
251
252 if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
253 IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
254
255 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
256 talloc_free(iofd);
257}
258
Harald Welte987a86a2023-11-18 18:46:24 +0100259/*! process all pending completion queue entries in given io_uring */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100260static void iofd_uring_cqe(struct io_uring *ring)
261{
262 int rc;
263 struct io_uring_cqe *cqe;
264 struct iofd_msghdr *msghdr;
265
266 while (io_uring_peek_cqe(ring, &cqe) == 0) {
267
268 msghdr = io_uring_cqe_get_data(cqe);
269 if (!msghdr) {
270 LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
271 io_uring_cqe_seen(ring, cqe);
272 continue;
273 }
274
275 rc = cqe->res;
276 /* Hand the entry back to the kernel before */
277 io_uring_cqe_seen(ring, cqe);
278
279 iofd_uring_handle_completion(msghdr, rc);
280
281 }
282}
283
Harald Welte987a86a2023-11-18 18:46:24 +0100284/*! will submit the next to-be-transmitted message for given iofd */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100285static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
286{
287 struct io_uring_sqe *sqe;
288 struct iofd_msghdr *msghdr;
289
290 msghdr = iofd_txqueue_dequeue(iofd);
291 if (!msghdr)
292 return -ENODATA;
293
294 sqe = io_uring_get_sqe(&g_ring.ring);
295 if (!sqe) {
296 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
297 OSMO_ASSERT(0);
298 }
299
300 io_uring_sqe_set_data(sqe, msghdr);
301
302 switch (msghdr->action) {
303 case IOFD_ACT_WRITE:
304 case IOFD_ACT_SENDTO:
305 io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
306 break;
307 default:
308 OSMO_ASSERT(0);
309 }
310
311 io_uring_submit(&g_ring.ring);
312 iofd->u.uring.write_msghdr = msghdr;
313
314 return 0;
315}
316
317static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
318static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
319
320static int iofd_uring_register(struct osmo_io_fd *iofd)
321{
322 return 0;
323}
324
325static int iofd_uring_unregister(struct osmo_io_fd *iofd)
326{
327 struct io_uring_sqe *sqe;
328 if (iofd->u.uring.read_msghdr) {
329 sqe = io_uring_get_sqe(&g_ring.ring);
330 OSMO_ASSERT(sqe != NULL);
331 io_uring_sqe_set_data(sqe, NULL);
332 LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
333 io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
334 }
335
336 if (iofd->u.uring.write_msghdr) {
337 sqe = io_uring_get_sqe(&g_ring.ring);
338 OSMO_ASSERT(sqe != NULL);
339 io_uring_sqe_set_data(sqe, NULL);
340 LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
341 io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
342 }
343 io_uring_submit(&g_ring.ring);
344
345 return 0;
346}
347
348static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
349{
350 iofd->u.uring.write_enabled = true;
351
352 if (iofd->u.uring.write_msghdr)
353 return;
354
355 if (osmo_iofd_txqueue_len(iofd) > 0)
356 iofd_uring_submit_tx(iofd);
357 else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
358 /* Empty write request to check when the socket is connected */
359 struct iofd_msghdr *msghdr;
360 struct io_uring_sqe *sqe;
361 struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
362 if (!msg) {
363 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
364 OSMO_ASSERT(0);
365 }
366 msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
367 if (!msghdr) {
368 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
369 OSMO_ASSERT(0);
370 }
371
372 msghdr->iov[0].iov_base = msgb_data(msg);
373 msghdr->iov[0].iov_len = msgb_tailroom(msg);
374
375 sqe = io_uring_get_sqe(&g_ring.ring);
376 if (!sqe) {
377 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
378 OSMO_ASSERT(0);
379 }
380 // Prep msgb/iov
381 io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
382 io_uring_sqe_set_data(sqe, msghdr);
383
384 io_uring_submit(&g_ring.ring);
385 iofd->u.uring.write_msghdr = msghdr;
386 }
387}
388
389static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
390{
391 iofd->u.uring.write_enabled = false;
392}
393
394static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
395{
396 iofd->u.uring.read_enabled = true;
397
398 if (iofd->u.uring.read_msghdr)
399 return;
400
401 switch (iofd->mode) {
402 case OSMO_IO_FD_MODE_READ_WRITE:
403 iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
404 break;
405 case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
406 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
407 break;
408 default:
409 OSMO_ASSERT(0);
410 }
411}
412
413static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
414{
415 iofd->u.uring.read_enabled = false;
416}
417
418static int iofd_uring_close(struct osmo_io_fd *iofd)
419{
420 iofd_uring_read_disable(iofd);
421 iofd_uring_write_disable(iofd);
422 iofd_uring_unregister(iofd);
423 return close(iofd->fd);
424}
425
426const struct iofd_backend_ops iofd_uring_ops = {
427 .register_fd = iofd_uring_register,
428 .unregister_fd = iofd_uring_unregister,
429 .close = iofd_uring_close,
430 .write_enable = iofd_uring_write_enable,
431 .write_disable = iofd_uring_write_disable,
432 .read_enable = iofd_uring_read_enable,
433 .read_disable = iofd_uring_read_disable,
434};
435
436#endif /* defined(__linux__) */