blob: a6395feacb3eb5909f1d1701487cb0d4978c7698 [file] [log] [blame]
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01001/*! \file osmo_io_uring.c
2 * io_uring backend for osmo_io.
3 *
4 * (C) 2022-2023 by sysmocom s.f.m.c.
5 * Author: Daniel Willmann <daniel@sysmocom.de>
6 *
7 * All Rights Reserved.
8 *
9 * SPDX-License-Identifier: GPL-2.0+
10 *
11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2 of the License, or
14 * (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 */
21
22/* TODO:
23 * Parameters:
24 * - number of simultaneous read/write in uring for given fd
25 *
26 */
27
28#include "../config.h"
29#if defined(__linux__)
30
31#include <stdio.h>
32#include <talloc.h>
33#include <unistd.h>
34#include <string.h>
35#include <stdbool.h>
36#include <errno.h>
37
38#include <sys/eventfd.h>
39#include <liburing.h>
40
41#include <osmocom/core/osmo_io.h>
42#include <osmocom/core/linuxlist.h>
43#include <osmocom/core/logging.h>
44#include <osmocom/core/msgb.h>
45#include <osmocom/core/select.h>
46#include <osmocom/core/talloc.h>
47#include <osmocom/core/utils.h>
48#include <osmocom/core/socket.h>
49
50#include "osmo_io_internal.h"
51
52#define IOFD_URING_ENTRIES 4096
53
54struct osmo_io_uring {
55 struct osmo_fd event_ofd;
56 struct io_uring ring;
57};
58
59static __thread struct osmo_io_uring g_ring;
60
61static void iofd_uring_cqe(struct io_uring *ring);
62static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
63{
64 struct io_uring *ring = ofd->data;
65 eventfd_t val;
66 int rc;
67
68 if (what & OSMO_FD_READ) {
69 rc = eventfd_read(ofd->fd, &val);
70 if (rc < 0) {
71 LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
72 return rc;
73 }
74
75 iofd_uring_cqe(ring);
76 }
77 if (what & OSMO_FD_WRITE)
78 OSMO_ASSERT(0);
79
80 return 0;
81}
82
83/*! initialize the uring and tie it into our event loop */
84void osmo_iofd_uring_init(void)
85{
86 int rc;
87 rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
88 if (rc < 0)
89 OSMO_ASSERT(0);
90
91 rc = eventfd(0, 0);
92 if (rc < 0) {
93 io_uring_queue_exit(&g_ring.ring);
94 OSMO_ASSERT(0);
95 }
96
97 osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
98 osmo_fd_register(&g_ring.event_ofd);
99 io_uring_register_eventfd(&g_ring.ring, rc);
100}
101
102
103static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
104{
105 struct msgb *msg;
106 struct iofd_msghdr *msghdr;
107 struct io_uring_sqe *sqe;
108
109 msg = iofd_msgb_pending_or_alloc(iofd);
110 if (!msg) {
111 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
112 OSMO_ASSERT(0);
113 }
114
115 msghdr = iofd_msghdr_alloc(iofd, action, msg);
116 if (!msghdr) {
117 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
118 OSMO_ASSERT(0);
119 }
120
121 msghdr->iov[0].iov_base = msg->tail;
122 msghdr->iov[0].iov_len = msgb_tailroom(msg);
123
124 switch (action) {
125 case IOFD_ACT_READ:
126 break;
127 case IOFD_ACT_RECVFROM:
128 msghdr->hdr.msg_iov = &msghdr->iov[0];
129 msghdr->hdr.msg_iovlen = 1;
130 msghdr->hdr.msg_name = &msghdr->osa.u.sa;
131 msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
132 break;
133 default:
134 OSMO_ASSERT(0);
135 }
136
137 sqe = io_uring_get_sqe(&g_ring.ring);
138 if (!sqe) {
139 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
140 OSMO_ASSERT(0);
141 }
142
143 switch (action) {
144 case IOFD_ACT_READ:
145 io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
146 break;
147 case IOFD_ACT_RECVFROM:
148 io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
149 break;
150 default:
151 OSMO_ASSERT(0);
152 }
153 io_uring_sqe_set_data(sqe, msghdr);
154
155 io_uring_submit(&g_ring.ring);
156 /* NOTE: This only works if we have one read per fd */
157 iofd->u.uring.read_msghdr = msghdr;
158}
159
160static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
161{
162 struct osmo_io_fd *iofd = msghdr->iofd;
163 struct msgb *msg = msghdr->msg;
164
165 if (rc > 0)
166 msgb_put(msg, rc);
167
168 if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
169 iofd_handle_recv(iofd, msg, rc, msghdr);
170
171 if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
172 iofd_uring_submit_recv(iofd, msghdr->action);
173 else
174 iofd->u.uring.read_msghdr = NULL;
175
176
177 iofd_msghdr_free(msghdr);
178}
179
180static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
181
182static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
183{
184 struct osmo_io_fd *iofd = msghdr->iofd;
Daniel Willmannec7d4912023-08-30 17:18:34 +0200185 struct msgb *msg = msghdr->msg;
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100186
187 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
188 goto out_free;
189
190 /* Error during write */
191 if (rc < 0) {
192 if (msghdr->action == IOFD_ACT_WRITE)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200193 iofd->io_ops.write_cb(iofd, rc, msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100194 else if (msghdr->action == IOFD_ACT_SENDTO)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200195 iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100196 else
197 OSMO_ASSERT(0);
198 goto out_free;
199 }
200
201 /* Incomplete write */
Daniel Willmannec7d4912023-08-30 17:18:34 +0200202 if (rc < msgb_length(msg)) {
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100203 /* Re-enqueue remaining data */
Daniel Willmannec7d4912023-08-30 17:18:34 +0200204 msgb_pull(msg, rc);
205 msghdr->iov[0].iov_len = msgb_length(msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100206 iofd_txqueue_enqueue_front(iofd, msghdr);
207 goto out;
208 }
209
210 if (msghdr->action == IOFD_ACT_WRITE)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200211 iofd->io_ops.write_cb(iofd, rc, msg);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100212 else if (msghdr->action == IOFD_ACT_SENDTO)
Daniel Willmannec7d4912023-08-30 17:18:34 +0200213 iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
Daniel Willmannf91d2aa2023-01-04 18:20:55 +0100214 else
215 OSMO_ASSERT(0);
216
217out_free:
218 msgb_free(msghdr->msg);
219 iofd_msghdr_free(msghdr);
220
221out:
222 iofd->u.uring.write_msghdr = NULL;
223 if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
224 iofd_uring_submit_tx(iofd);
225}
226
227static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
228{
229 struct osmo_io_fd *iofd = msghdr->iofd;
230
231 IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
232
233 switch (msghdr->action) {
234 case IOFD_ACT_READ:
235 case IOFD_ACT_RECVFROM:
236 iofd_uring_handle_recv(msghdr, res);
237 break;
238 case IOFD_ACT_WRITE:
239 case IOFD_ACT_SENDTO:
240 iofd_uring_handle_tx(msghdr, res);
241 break;
242 default:
243 OSMO_ASSERT(0)
244 }
245
246 if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
247 IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
248
249 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
250 talloc_free(iofd);
251}
252
253static void iofd_uring_cqe(struct io_uring *ring)
254{
255 int rc;
256 struct io_uring_cqe *cqe;
257 struct iofd_msghdr *msghdr;
258
259 while (io_uring_peek_cqe(ring, &cqe) == 0) {
260
261 msghdr = io_uring_cqe_get_data(cqe);
262 if (!msghdr) {
263 LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
264 io_uring_cqe_seen(ring, cqe);
265 continue;
266 }
267
268 rc = cqe->res;
269 /* Hand the entry back to the kernel before */
270 io_uring_cqe_seen(ring, cqe);
271
272 iofd_uring_handle_completion(msghdr, rc);
273
274 }
275}
276
277static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
278{
279 struct io_uring_sqe *sqe;
280 struct iofd_msghdr *msghdr;
281
282 msghdr = iofd_txqueue_dequeue(iofd);
283 if (!msghdr)
284 return -ENODATA;
285
286 sqe = io_uring_get_sqe(&g_ring.ring);
287 if (!sqe) {
288 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
289 OSMO_ASSERT(0);
290 }
291
292 io_uring_sqe_set_data(sqe, msghdr);
293
294 switch (msghdr->action) {
295 case IOFD_ACT_WRITE:
296 case IOFD_ACT_SENDTO:
297 io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
298 break;
299 default:
300 OSMO_ASSERT(0);
301 }
302
303 io_uring_submit(&g_ring.ring);
304 iofd->u.uring.write_msghdr = msghdr;
305
306 return 0;
307}
308
309static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
310static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
311
312static int iofd_uring_register(struct osmo_io_fd *iofd)
313{
314 return 0;
315}
316
317static int iofd_uring_unregister(struct osmo_io_fd *iofd)
318{
319 struct io_uring_sqe *sqe;
320 if (iofd->u.uring.read_msghdr) {
321 sqe = io_uring_get_sqe(&g_ring.ring);
322 OSMO_ASSERT(sqe != NULL);
323 io_uring_sqe_set_data(sqe, NULL);
324 LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
325 io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
326 }
327
328 if (iofd->u.uring.write_msghdr) {
329 sqe = io_uring_get_sqe(&g_ring.ring);
330 OSMO_ASSERT(sqe != NULL);
331 io_uring_sqe_set_data(sqe, NULL);
332 LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
333 io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
334 }
335 io_uring_submit(&g_ring.ring);
336
337 return 0;
338}
339
340static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
341{
342 iofd->u.uring.write_enabled = true;
343
344 if (iofd->u.uring.write_msghdr)
345 return;
346
347 if (osmo_iofd_txqueue_len(iofd) > 0)
348 iofd_uring_submit_tx(iofd);
349 else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
350 /* Empty write request to check when the socket is connected */
351 struct iofd_msghdr *msghdr;
352 struct io_uring_sqe *sqe;
353 struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
354 if (!msg) {
355 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
356 OSMO_ASSERT(0);
357 }
358 msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
359 if (!msghdr) {
360 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
361 OSMO_ASSERT(0);
362 }
363
364 msghdr->iov[0].iov_base = msgb_data(msg);
365 msghdr->iov[0].iov_len = msgb_tailroom(msg);
366
367 sqe = io_uring_get_sqe(&g_ring.ring);
368 if (!sqe) {
369 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
370 OSMO_ASSERT(0);
371 }
372 // Prep msgb/iov
373 io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
374 io_uring_sqe_set_data(sqe, msghdr);
375
376 io_uring_submit(&g_ring.ring);
377 iofd->u.uring.write_msghdr = msghdr;
378 }
379}
380
381static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
382{
383 iofd->u.uring.write_enabled = false;
384}
385
386static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
387{
388 iofd->u.uring.read_enabled = true;
389
390 if (iofd->u.uring.read_msghdr)
391 return;
392
393 switch (iofd->mode) {
394 case OSMO_IO_FD_MODE_READ_WRITE:
395 iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
396 break;
397 case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
398 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
399 break;
400 default:
401 OSMO_ASSERT(0);
402 }
403}
404
405static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
406{
407 iofd->u.uring.read_enabled = false;
408}
409
410static int iofd_uring_close(struct osmo_io_fd *iofd)
411{
412 iofd_uring_read_disable(iofd);
413 iofd_uring_write_disable(iofd);
414 iofd_uring_unregister(iofd);
415 return close(iofd->fd);
416}
417
418const struct iofd_backend_ops iofd_uring_ops = {
419 .register_fd = iofd_uring_register,
420 .unregister_fd = iofd_uring_unregister,
421 .close = iofd_uring_close,
422 .write_enable = iofd_uring_write_enable,
423 .write_disable = iofd_uring_write_disable,
424 .read_enable = iofd_uring_read_enable,
425 .read_disable = iofd_uring_read_disable,
426};
427
428#endif /* defined(__linux__) */