blob: aa9df85faa00d31a687ac17203a87b2e8c4863ef [file] [log] [blame]
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01001/*! \file osmo_io_uring.c
2 * io_uring backend for osmo_io.
3 *
4 * (C) 2022-2023 by sysmocom s.f.m.c.
5 * Author: Daniel Willmann <daniel@sysmocom.de>
Harald Welte1047ed72023-11-18 18:51:58 +01006 * (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01007 *
8 * All Rights Reserved.
9 *
10 * SPDX-License-Identifier: GPL-2.0+
11 *
12 * This program is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 */
22
23/* TODO:
24 * Parameters:
25 * - number of simultaneous read/write in uring for given fd
26 *
27 */
28
29#include "../config.h"
30#if defined(__linux__)
31
32#include <stdio.h>
33#include <talloc.h>
34#include <unistd.h>
35#include <string.h>
36#include <stdbool.h>
37#include <errno.h>
38
Harald Welte1047ed72023-11-18 18:51:58 +010039#include <netinet/in.h>
40#include <netinet/sctp.h>
Daniel Willmannf91d2aa2023-01-04 18:20:55 +010041#include <sys/eventfd.h>
42#include <liburing.h>
43
44#include <osmocom/core/osmo_io.h>
45#include <osmocom/core/linuxlist.h>
46#include <osmocom/core/logging.h>
47#include <osmocom/core/msgb.h>
48#include <osmocom/core/select.h>
49#include <osmocom/core/talloc.h>
50#include <osmocom/core/utils.h>
51#include <osmocom/core/socket.h>
52
53#include "osmo_io_internal.h"
54
55#define IOFD_URING_ENTRIES 4096
56
57struct osmo_io_uring {
58 struct osmo_fd event_ofd;
59 struct io_uring ring;
60};
61
62static __thread struct osmo_io_uring g_ring;
63
64static void iofd_uring_cqe(struct io_uring *ring);
Harald Welte987a86a2023-11-18 18:46:24 +010065
66/*! read call-back for eventfd notifying us if entries are in the completion queue */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +010067static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
68{
69 struct io_uring *ring = ofd->data;
70 eventfd_t val;
71 int rc;
72
73 if (what & OSMO_FD_READ) {
74 rc = eventfd_read(ofd->fd, &val);
75 if (rc < 0) {
76 LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
77 return rc;
78 }
79
80 iofd_uring_cqe(ring);
81 }
82 if (what & OSMO_FD_WRITE)
83 OSMO_ASSERT(0);
84
85 return 0;
86}
87
88/*! initialize the uring and tie it into our event loop */
89void osmo_iofd_uring_init(void)
90{
91 int rc;
92 rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
93 if (rc < 0)
94 OSMO_ASSERT(0);
95
96 rc = eventfd(0, 0);
97 if (rc < 0) {
98 io_uring_queue_exit(&g_ring.ring);
99 OSMO_ASSERT(0);
100 }
101
102 osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
103 osmo_fd_register(&g_ring.event_ofd);
104 io_uring_register_eventfd(&g_ring.ring, rc);
105}
106
107
108static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
109{
110 struct msgb *msg;
111 struct iofd_msghdr *msghdr;
112 struct io_uring_sqe *sqe;
113
114 msg = iofd_msgb_pending_or_alloc(iofd);
115 if (!msg) {
116 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
117 OSMO_ASSERT(0);
118 }
119
Harald Welte1047ed72023-11-18 18:51:58 +0100120 msghdr = iofd_msghdr_alloc(iofd, action, msg, iofd->cmsg_size);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100121 if (!msghdr) {
122 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
123 OSMO_ASSERT(0);
124 }
125
126 msghdr->iov[0].iov_base = msg->tail;
127 msghdr->iov[0].iov_len = msgb_tailroom(msg);
128
129 switch (action) {
130 case IOFD_ACT_READ:
131 break;
Harald Welte1047ed72023-11-18 18:51:58 +0100132 case IOFD_ACT_RECVMSG:
133 msghdr->hdr.msg_control = msghdr->cmsg;
134 msghdr->hdr.msg_controllen = iofd->cmsg_size;
135 /* fall-through */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100136 case IOFD_ACT_RECVFROM:
137 msghdr->hdr.msg_iov = &msghdr->iov[0];
138 msghdr->hdr.msg_iovlen = 1;
139 msghdr->hdr.msg_name = &msghdr->osa.u.sa;
140 msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
141 break;
142 default:
143 OSMO_ASSERT(0);
144 }
145
146 sqe = io_uring_get_sqe(&g_ring.ring);
147 if (!sqe) {
148 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
149 OSMO_ASSERT(0);
150 }
151
152 switch (action) {
153 case IOFD_ACT_READ:
154 io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
155 break;
Harald Welte1047ed72023-11-18 18:51:58 +0100156 case IOFD_ACT_RECVMSG:
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100157 case IOFD_ACT_RECVFROM:
158 io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
159 break;
160 default:
161 OSMO_ASSERT(0);
162 }
163 io_uring_sqe_set_data(sqe, msghdr);
164
165 io_uring_submit(&g_ring.ring);
166 /* NOTE: This only works if we have one read per fd */
167 iofd->u.uring.read_msghdr = msghdr;
168}
169
Harald Welte987a86a2023-11-18 18:46:24 +0100170/*! completion call-back for READ/RECVFROM */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100171static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
172{
173 struct osmo_io_fd *iofd = msghdr->iofd;
174 struct msgb *msg = msghdr->msg;
175
176 if (rc > 0)
177 msgb_put(msg, rc);
178
179 if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
180 iofd_handle_recv(iofd, msg, rc, msghdr);
181
182 if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
183 iofd_uring_submit_recv(iofd, msghdr->action);
184 else
185 iofd->u.uring.read_msghdr = NULL;
186
187
188 iofd_msghdr_free(msghdr);
189}
190
191static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
192
Harald Welte987a86a2023-11-18 18:46:24 +0100193/*! completion call-back for WRITE/SENDTO */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100194static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
195{
196 struct osmo_io_fd *iofd = msghdr->iofd;
197
Daniel Willmann84611882023-11-21 10:17:00 +0100198 if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
199 msgb_free(msghdr->msg);
200 iofd_msghdr_free(msghdr);
201 } else {
202 iofd_handle_send_completion(iofd, rc, msghdr);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100203 }
204
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100205 iofd->u.uring.write_msghdr = NULL;
Harald Welte987a86a2023-11-18 18:46:24 +0100206 /* submit the next to-be-transmitted message for this file descriptor */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100207 if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
208 iofd_uring_submit_tx(iofd);
209}
210
Harald Welte987a86a2023-11-18 18:46:24 +0100211/*! handle completion of a single I/O message */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100212static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
213{
214 struct osmo_io_fd *iofd = msghdr->iofd;
215
216 IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
217
218 switch (msghdr->action) {
219 case IOFD_ACT_READ:
220 case IOFD_ACT_RECVFROM:
Harald Welte1047ed72023-11-18 18:51:58 +0100221 case IOFD_ACT_RECVMSG:
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100222 iofd_uring_handle_recv(msghdr, res);
223 break;
224 case IOFD_ACT_WRITE:
225 case IOFD_ACT_SENDTO:
Harald Welte1047ed72023-11-18 18:51:58 +0100226 case IOFD_ACT_SENDMSG:
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100227 iofd_uring_handle_tx(msghdr, res);
228 break;
229 default:
230 OSMO_ASSERT(0)
231 }
232
233 if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
234 IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
235
236 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
237 talloc_free(iofd);
238}
239
Harald Welte987a86a2023-11-18 18:46:24 +0100240/*! process all pending completion queue entries in given io_uring */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100241static void iofd_uring_cqe(struct io_uring *ring)
242{
243 int rc;
244 struct io_uring_cqe *cqe;
245 struct iofd_msghdr *msghdr;
246
247 while (io_uring_peek_cqe(ring, &cqe) == 0) {
248
249 msghdr = io_uring_cqe_get_data(cqe);
250 if (!msghdr) {
251 LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
252 io_uring_cqe_seen(ring, cqe);
253 continue;
254 }
255
256 rc = cqe->res;
257 /* Hand the entry back to the kernel before */
258 io_uring_cqe_seen(ring, cqe);
259
260 iofd_uring_handle_completion(msghdr, rc);
261
262 }
263}
264
Harald Welte987a86a2023-11-18 18:46:24 +0100265/*! will submit the next to-be-transmitted message for given iofd */
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100266static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
267{
268 struct io_uring_sqe *sqe;
269 struct iofd_msghdr *msghdr;
270
271 msghdr = iofd_txqueue_dequeue(iofd);
272 if (!msghdr)
273 return -ENODATA;
274
275 sqe = io_uring_get_sqe(&g_ring.ring);
276 if (!sqe) {
277 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
278 OSMO_ASSERT(0);
279 }
280
281 io_uring_sqe_set_data(sqe, msghdr);
282
283 switch (msghdr->action) {
284 case IOFD_ACT_WRITE:
285 case IOFD_ACT_SENDTO:
Harald Welte1047ed72023-11-18 18:51:58 +0100286 case IOFD_ACT_SENDMSG:
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100287 io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
288 break;
289 default:
290 OSMO_ASSERT(0);
291 }
292
293 io_uring_submit(&g_ring.ring);
294 iofd->u.uring.write_msghdr = msghdr;
295
296 return 0;
297}
298
299static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
300static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
301
302static int iofd_uring_register(struct osmo_io_fd *iofd)
303{
304 return 0;
305}
306
307static int iofd_uring_unregister(struct osmo_io_fd *iofd)
308{
309 struct io_uring_sqe *sqe;
310 if (iofd->u.uring.read_msghdr) {
311 sqe = io_uring_get_sqe(&g_ring.ring);
312 OSMO_ASSERT(sqe != NULL);
313 io_uring_sqe_set_data(sqe, NULL);
314 LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
315 io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
316 }
317
318 if (iofd->u.uring.write_msghdr) {
319 sqe = io_uring_get_sqe(&g_ring.ring);
320 OSMO_ASSERT(sqe != NULL);
321 io_uring_sqe_set_data(sqe, NULL);
322 LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
323 io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
324 }
325 io_uring_submit(&g_ring.ring);
326
327 return 0;
328}
329
330static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
331{
332 iofd->u.uring.write_enabled = true;
333
334 if (iofd->u.uring.write_msghdr)
335 return;
336
337 if (osmo_iofd_txqueue_len(iofd) > 0)
338 iofd_uring_submit_tx(iofd);
339 else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
340 /* Empty write request to check when the socket is connected */
341 struct iofd_msghdr *msghdr;
342 struct io_uring_sqe *sqe;
343 struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
344 if (!msg) {
345 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
346 OSMO_ASSERT(0);
347 }
Harald Welte1047ed72023-11-18 18:51:58 +0100348 msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100349 if (!msghdr) {
350 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
351 OSMO_ASSERT(0);
352 }
353
354 msghdr->iov[0].iov_base = msgb_data(msg);
Harald Welte1047ed72023-11-18 18:51:58 +0100355 msghdr->iov[0].iov_len = msgb_length(msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100356
357 sqe = io_uring_get_sqe(&g_ring.ring);
358 if (!sqe) {
359 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
360 OSMO_ASSERT(0);
361 }
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100362 io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
363 io_uring_sqe_set_data(sqe, msghdr);
364
365 io_uring_submit(&g_ring.ring);
366 iofd->u.uring.write_msghdr = msghdr;
367 }
368}
369
370static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
371{
372 iofd->u.uring.write_enabled = false;
373}
374
375static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
376{
377 iofd->u.uring.read_enabled = true;
378
379 if (iofd->u.uring.read_msghdr)
380 return;
381
382 switch (iofd->mode) {
383 case OSMO_IO_FD_MODE_READ_WRITE:
384 iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
385 break;
386 case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
387 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
388 break;
Harald Welte1047ed72023-11-18 18:51:58 +0100389 case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
390 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVMSG);
391 break;
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100392 default:
393 OSMO_ASSERT(0);
394 }
395}
396
397static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
398{
399 iofd->u.uring.read_enabled = false;
400}
401
402static int iofd_uring_close(struct osmo_io_fd *iofd)
403{
404 iofd_uring_read_disable(iofd);
405 iofd_uring_write_disable(iofd);
406 iofd_uring_unregister(iofd);
407 return close(iofd->fd);
408}
409
410const struct iofd_backend_ops iofd_uring_ops = {
411 .register_fd = iofd_uring_register,
412 .unregister_fd = iofd_uring_unregister,
413 .close = iofd_uring_close,
414 .write_enable = iofd_uring_write_enable,
415 .write_disable = iofd_uring_write_disable,
416 .read_enable = iofd_uring_read_enable,
417 .read_disable = iofd_uring_read_disable,
Andreas Eversberg848faf92024-02-09 12:38:17 +0100418 .notify_connected = iofd_uring_write_enable,
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100419};
420
421#endif /* defined(__linux__) */