blob: 84b7b4c735353c0f4688a5340dacc2645dd8067d [file] [log] [blame]
Daniel Willmannf91d2aa2023-01-04 18:20:55 +01001/*! \file osmo_io_uring.c
2 * io_uring backend for osmo_io.
3 *
4 * (C) 2022-2023 by sysmocom s.f.m.c.
5 * Author: Daniel Willmann <daniel@sysmocom.de>
6 *
7 * All Rights Reserved.
8 *
9 * SPDX-License-Identifier: GPL-2.0+
10 *
11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation; either version 2 of the License, or
14 * (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
20 */
21
22/* TODO:
23 * Parameters:
24 * - number of simultaneous read/write in uring for given fd
25 *
26 */
27
28#include "../config.h"
29#if defined(__linux__)
30
31#include <stdio.h>
32#include <talloc.h>
33#include <unistd.h>
34#include <string.h>
35#include <stdbool.h>
36#include <errno.h>
37
38#include <sys/eventfd.h>
39#include <liburing.h>
40
41#include <osmocom/core/osmo_io.h>
42#include <osmocom/core/linuxlist.h>
43#include <osmocom/core/logging.h>
44#include <osmocom/core/msgb.h>
45#include <osmocom/core/select.h>
46#include <osmocom/core/talloc.h>
47#include <osmocom/core/utils.h>
48#include <osmocom/core/socket.h>
49
50#include "osmo_io_internal.h"
51
52#define IOFD_URING_ENTRIES 4096
53
54struct osmo_io_uring {
55 struct osmo_fd event_ofd;
56 struct io_uring ring;
57};
58
59static __thread struct osmo_io_uring g_ring;
60
61static void iofd_uring_cqe(struct io_uring *ring);
62static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
63{
64 struct io_uring *ring = ofd->data;
65 eventfd_t val;
66 int rc;
67
68 if (what & OSMO_FD_READ) {
69 rc = eventfd_read(ofd->fd, &val);
70 if (rc < 0) {
71 LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
72 return rc;
73 }
74
75 iofd_uring_cqe(ring);
76 }
77 if (what & OSMO_FD_WRITE)
78 OSMO_ASSERT(0);
79
80 return 0;
81}
82
83/*! initialize the uring and tie it into our event loop */
84void osmo_iofd_uring_init(void)
85{
86 int rc;
87 rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
88 if (rc < 0)
89 OSMO_ASSERT(0);
90
91 rc = eventfd(0, 0);
92 if (rc < 0) {
93 io_uring_queue_exit(&g_ring.ring);
94 OSMO_ASSERT(0);
95 }
96
97 osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
98 osmo_fd_register(&g_ring.event_ofd);
99 io_uring_register_eventfd(&g_ring.ring, rc);
100}
101
102
103static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
104{
105 struct msgb *msg;
106 struct iofd_msghdr *msghdr;
107 struct io_uring_sqe *sqe;
108
109 msg = iofd_msgb_pending_or_alloc(iofd);
110 if (!msg) {
111 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
112 OSMO_ASSERT(0);
113 }
114
115 msghdr = iofd_msghdr_alloc(iofd, action, msg);
116 if (!msghdr) {
117 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
118 OSMO_ASSERT(0);
119 }
120
121 msghdr->iov[0].iov_base = msg->tail;
122 msghdr->iov[0].iov_len = msgb_tailroom(msg);
123
124 switch (action) {
125 case IOFD_ACT_READ:
126 break;
127 case IOFD_ACT_RECVFROM:
128 msghdr->hdr.msg_iov = &msghdr->iov[0];
129 msghdr->hdr.msg_iovlen = 1;
130 msghdr->hdr.msg_name = &msghdr->osa.u.sa;
131 msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
132 break;
133 default:
134 OSMO_ASSERT(0);
135 }
136
137 sqe = io_uring_get_sqe(&g_ring.ring);
138 if (!sqe) {
139 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
140 OSMO_ASSERT(0);
141 }
142
143 switch (action) {
144 case IOFD_ACT_READ:
145 io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
146 break;
147 case IOFD_ACT_RECVFROM:
148 io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
149 break;
150 default:
151 OSMO_ASSERT(0);
152 }
153 io_uring_sqe_set_data(sqe, msghdr);
154
155 io_uring_submit(&g_ring.ring);
156 /* NOTE: This only works if we have one read per fd */
157 iofd->u.uring.read_msghdr = msghdr;
158}
159
160static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
161{
162 struct osmo_io_fd *iofd = msghdr->iofd;
163 struct msgb *msg = msghdr->msg;
164
165 if (rc > 0)
166 msgb_put(msg, rc);
167
168 if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
169 iofd_handle_recv(iofd, msg, rc, msghdr);
170
171 if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
172 iofd_uring_submit_recv(iofd, msghdr->action);
173 else
174 iofd->u.uring.read_msghdr = NULL;
175
176
177 iofd_msghdr_free(msghdr);
178}
179
180static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
181
182static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
183{
184 struct osmo_io_fd *iofd = msghdr->iofd;
185
186 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
187 goto out_free;
188
189 /* Error during write */
190 if (rc < 0) {
191 if (msghdr->action == IOFD_ACT_WRITE)
192 iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
193 else if (msghdr->action == IOFD_ACT_SENDTO)
194 iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
195 else
196 OSMO_ASSERT(0);
197 goto out_free;
198 }
199
200 /* Incomplete write */
201 if (rc < msgb_length(msghdr->msg)) {
202 /* Re-enqueue remaining data */
203 msgb_pull(msghdr->msg, rc);
204 msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
205 iofd_txqueue_enqueue_front(iofd, msghdr);
206 goto out;
207 }
208
209 if (msghdr->action == IOFD_ACT_WRITE)
210 iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
211 else if (msghdr->action == IOFD_ACT_SENDTO)
212 iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
213 else
214 OSMO_ASSERT(0);
215
216out_free:
217 msgb_free(msghdr->msg);
218 iofd_msghdr_free(msghdr);
219
220out:
221 iofd->u.uring.write_msghdr = NULL;
222 if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
223 iofd_uring_submit_tx(iofd);
224}
225
226static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
227{
228 struct osmo_io_fd *iofd = msghdr->iofd;
229
230 IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
231
232 switch (msghdr->action) {
233 case IOFD_ACT_READ:
234 case IOFD_ACT_RECVFROM:
235 iofd_uring_handle_recv(msghdr, res);
236 break;
237 case IOFD_ACT_WRITE:
238 case IOFD_ACT_SENDTO:
239 iofd_uring_handle_tx(msghdr, res);
240 break;
241 default:
242 OSMO_ASSERT(0)
243 }
244
245 if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
246 IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
247
248 if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
249 talloc_free(iofd);
250}
251
252static void iofd_uring_cqe(struct io_uring *ring)
253{
254 int rc;
255 struct io_uring_cqe *cqe;
256 struct iofd_msghdr *msghdr;
257
258 while (io_uring_peek_cqe(ring, &cqe) == 0) {
259
260 msghdr = io_uring_cqe_get_data(cqe);
261 if (!msghdr) {
262 LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
263 io_uring_cqe_seen(ring, cqe);
264 continue;
265 }
266
267 rc = cqe->res;
268 /* Hand the entry back to the kernel before */
269 io_uring_cqe_seen(ring, cqe);
270
271 iofd_uring_handle_completion(msghdr, rc);
272
273 }
274}
275
276static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
277{
278 struct io_uring_sqe *sqe;
279 struct iofd_msghdr *msghdr;
280
281 msghdr = iofd_txqueue_dequeue(iofd);
282 if (!msghdr)
283 return -ENODATA;
284
285 sqe = io_uring_get_sqe(&g_ring.ring);
286 if (!sqe) {
287 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
288 OSMO_ASSERT(0);
289 }
290
291 io_uring_sqe_set_data(sqe, msghdr);
292
293 switch (msghdr->action) {
294 case IOFD_ACT_WRITE:
295 case IOFD_ACT_SENDTO:
296 io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
297 break;
298 default:
299 OSMO_ASSERT(0);
300 }
301
302 io_uring_submit(&g_ring.ring);
303 iofd->u.uring.write_msghdr = msghdr;
304
305 return 0;
306}
307
308static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
309static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
310
311static int iofd_uring_register(struct osmo_io_fd *iofd)
312{
313 return 0;
314}
315
316static int iofd_uring_unregister(struct osmo_io_fd *iofd)
317{
318 struct io_uring_sqe *sqe;
319 if (iofd->u.uring.read_msghdr) {
320 sqe = io_uring_get_sqe(&g_ring.ring);
321 OSMO_ASSERT(sqe != NULL);
322 io_uring_sqe_set_data(sqe, NULL);
323 LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
324 io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
325 }
326
327 if (iofd->u.uring.write_msghdr) {
328 sqe = io_uring_get_sqe(&g_ring.ring);
329 OSMO_ASSERT(sqe != NULL);
330 io_uring_sqe_set_data(sqe, NULL);
331 LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
332 io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
333 }
334 io_uring_submit(&g_ring.ring);
335
336 return 0;
337}
338
339static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
340{
341 iofd->u.uring.write_enabled = true;
342
343 if (iofd->u.uring.write_msghdr)
344 return;
345
346 if (osmo_iofd_txqueue_len(iofd) > 0)
347 iofd_uring_submit_tx(iofd);
348 else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
349 /* Empty write request to check when the socket is connected */
350 struct iofd_msghdr *msghdr;
351 struct io_uring_sqe *sqe;
352 struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
353 if (!msg) {
354 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
355 OSMO_ASSERT(0);
356 }
357 msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
358 if (!msghdr) {
359 LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
360 OSMO_ASSERT(0);
361 }
362
363 msghdr->iov[0].iov_base = msgb_data(msg);
364 msghdr->iov[0].iov_len = msgb_tailroom(msg);
365
366 sqe = io_uring_get_sqe(&g_ring.ring);
367 if (!sqe) {
368 LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
369 OSMO_ASSERT(0);
370 }
371 // Prep msgb/iov
372 io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
373 io_uring_sqe_set_data(sqe, msghdr);
374
375 io_uring_submit(&g_ring.ring);
376 iofd->u.uring.write_msghdr = msghdr;
377 }
378}
379
380static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
381{
382 iofd->u.uring.write_enabled = false;
383}
384
385static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
386{
387 iofd->u.uring.read_enabled = true;
388
389 if (iofd->u.uring.read_msghdr)
390 return;
391
392 switch (iofd->mode) {
393 case OSMO_IO_FD_MODE_READ_WRITE:
394 iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
395 break;
396 case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
397 iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
398 break;
399 default:
400 OSMO_ASSERT(0);
401 }
402}
403
404static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
405{
406 iofd->u.uring.read_enabled = false;
407}
408
409static int iofd_uring_close(struct osmo_io_fd *iofd)
410{
411 iofd_uring_read_disable(iofd);
412 iofd_uring_write_disable(iofd);
413 iofd_uring_unregister(iofd);
414 return close(iofd->fd);
415}
416
417const struct iofd_backend_ops iofd_uring_ops = {
418 .register_fd = iofd_uring_register,
419 .unregister_fd = iofd_uring_unregister,
420 .close = iofd_uring_close,
421 .write_enable = iofd_uring_write_enable,
422 .write_disable = iofd_uring_write_disable,
423 .read_enable = iofd_uring_read_enable,
424 .read_disable = iofd_uring_read_disable,
425};
426
427#endif /* defined(__linux__) */