osmo_io: Add io_uring backend

Change-Id: I5152129eb84b31ccc9e02bc2a5c5bdb046d331bc
diff --git a/configure.ac b/configure.ac
index ef15f22..2fc9895 100644
--- a/configure.ac
+++ b/configure.ac
@@ -175,6 +175,20 @@
 
 PKG_CHECK_MODULES(TALLOC, [talloc >= 2.1.0])
 
+AC_ARG_ENABLE([uring], [AS_HELP_STRING([--disable-uring], [Build without io_uring support])],
+    [
+        ENABLE_URING=$enableval
+    ],
+    [
+        ENABLE_URING="yes"
+    ])
+AS_IF([test "x$ENABLE_URING" = "xyes"], [
+	PKG_CHECK_MODULES(URING, [liburing >= 0.7])
+	AC_DEFINE([HAVE_URING],[1],[Build with io_uring support])
+])
+AM_CONDITIONAL(ENABLE_URING, test "x$ENABLE_URING" = "xyes")
+AC_SUBST(ENABLE_URING)
+
 AC_ARG_ENABLE([pcsc], [AS_HELP_STRING([--disable-pcsc], [Build without PC/SC support])],
     [
         ENABLE_PCSC=$enableval
diff --git a/contrib/libosmocore.spec.in b/contrib/libosmocore.spec.in
index fdd1a65..3fde143 100644
--- a/contrib/libosmocore.spec.in
+++ b/contrib/libosmocore.spec.in
@@ -32,6 +32,7 @@
 BuildRequires:  pkgconfig(talloc) >= 2.1.0
 BuildRequires:  pkgconfig(libmnl)
 BuildRequires:  pkgconfig(libsystemd)
+BuildRequires:  pkgconfig(liburing)
 
 %description
 libosmocore is a package with various utility functions that were
diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h
index 8f3c060..b780d9a 100644
--- a/include/osmocom/core/osmo_io.h
+++ b/include/osmocom/core/osmo_io.h
@@ -27,6 +27,7 @@
 
 enum osmo_io_backend {
 	OSMO_IO_BACKEND_POLL,
+	OSMO_IO_BACKEND_IO_URING,
 };
 
 extern const struct value_string osmo_io_backend_names[];
diff --git a/src/core/Makefile.am b/src/core/Makefile.am
index 80ee458..2f2fc19 100644
--- a/src/core/Makefile.am
+++ b/src/core/Makefile.am
@@ -4,7 +4,7 @@
 LIBVERSION=20:0:0
 
 AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_builddir)
-AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS)
+AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) $(URING_CFLAGS)
 
 if ENABLE_PSEUDOTALLOC
 AM_CPPFLAGS += -I$(top_srcdir)/src/pseudotalloc
@@ -18,6 +18,7 @@
 	$(LIBRARY_RT) \
 	$(PTHREAD_LIBS) \
 	$(LIBSCTP_LIBS) \
+	$(URING_LIBS) \
 	$(NULL)
 
 libosmocore_la_SOURCES = \
@@ -156,5 +157,9 @@
 libosmocore_la_LIBADD += probes.lo
 endif
 
+if ENABLE_URING
+libosmocore_la_SOURCES += osmo_io_uring.c
+endif
+
 crc%gen.c: crcXXgen.c.tpl
 	$(AM_V_GEN)sed -e's/XX/$*/g' $< > $@
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index bccf7af..e176099 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -47,6 +47,7 @@
 
 const struct value_string osmo_io_backend_names[] = {
 	{ OSMO_IO_BACKEND_POLL, "poll" },
+	{ OSMO_IO_BACKEND_IO_URING, "io_uring" },
 	{ 0, NULL }
 };
 
@@ -55,12 +56,21 @@
 /* Used by some tests, can't be static */
 struct iofd_backend_ops osmo_iofd_ops;
 
+#if defined(HAVE_URING)
+void osmo_iofd_uring_init(void);
+#endif
+
 /*! initialize osmo_io for the current thread */
 void osmo_iofd_init(void)
 {
 	switch (g_io_backend) {
 	case OSMO_IO_BACKEND_POLL:
 		break;
+#if defined(HAVE_URING)
+	case OSMO_IO_BACKEND_IO_URING:
+		osmo_iofd_uring_init();
+		break;
+#endif
 	default:
 		OSMO_ASSERT(0);
 		break;
@@ -78,6 +88,11 @@
 	if (!strcmp("POLL", backend)) {
 		g_io_backend = OSMO_IO_BACKEND_POLL;
 		osmo_iofd_ops = iofd_poll_ops;
+#if defined(HAVE_URING)
+	} else if (!strcmp("IO_URING", backend)) {
+		g_io_backend = OSMO_IO_BACKEND_IO_URING;
+		osmo_iofd_ops = iofd_uring_ops;
+#endif
 	} else {
 		fprintf(stderr, "Invalid osmo_io backend requested: \"%s\"\nCheck the environment variable %s\n", backend, OSMO_IO_BACKEND_ENV);
 		exit(1);
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 0f0465d..5b7ab90 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -19,6 +19,10 @@
 extern const struct iofd_backend_ops iofd_poll_ops;
 #define OSMO_IO_BACKEND_DEFAULT "POLL"
 
+#if defined(HAVE_URING)
+extern const struct iofd_backend_ops iofd_uring_ops;
+#endif
+
 struct iofd_backend_ops {
 	int (*register_fd)(struct osmo_io_fd *iofd);
 	int (*unregister_fd)(struct osmo_io_fd *iofd);
@@ -90,9 +94,9 @@
 		} poll;
 		struct {
 			bool read_enabled;
-			bool read_pending;
-			bool write_pending;
 			bool write_enabled;
+			void *read_msghdr;
+			void *write_msghdr;
 			/* TODO: index into array of registered fd's? */
 		} uring;
 	} u;
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
new file mode 100644
index 0000000..84b7b4c
--- /dev/null
+++ b/src/core/osmo_io_uring.c
@@ -0,0 +1,427 @@
+/*! \file osmo_io_uring.c
+ * io_uring backend for osmo_io.
+ *
+ * (C) 2022-2023 by sysmocom s.f.m.c.
+ * Author: Daniel Willmann <daniel@sysmocom.de>
+ *
+ * All Rights Reserved.
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ */
+
+/* TODO:
+ * Parameters:
+ * - number of simultaneous read/write in uring for given fd
+ *
+ */
+
+#include "../config.h"
+#if defined(__linux__)
+
+#include <stdio.h>
+#include <talloc.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdbool.h>
+#include <errno.h>
+
+#include <sys/eventfd.h>
+#include <liburing.h>
+
+#include <osmocom/core/osmo_io.h>
+#include <osmocom/core/linuxlist.h>
+#include <osmocom/core/logging.h>
+#include <osmocom/core/msgb.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/utils.h>
+#include <osmocom/core/socket.h>
+
+#include "osmo_io_internal.h"
+
+#define IOFD_URING_ENTRIES 4096
+
+struct osmo_io_uring {
+	struct osmo_fd event_ofd;
+	struct io_uring ring;
+};
+
+static __thread struct osmo_io_uring g_ring;
+
+static void iofd_uring_cqe(struct io_uring *ring);
+static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
+{
+	struct io_uring *ring = ofd->data;
+	eventfd_t val;
+	int rc;
+
+	if (what & OSMO_FD_READ) {
+		rc = eventfd_read(ofd->fd, &val);
+		if (rc < 0) {
+			LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
+			return rc;
+		}
+
+		iofd_uring_cqe(ring);
+	}
+	if (what & OSMO_FD_WRITE)
+		OSMO_ASSERT(0);
+
+	return 0;
+}
+
+/*! initialize the uring and tie it into our event loop */
+void osmo_iofd_uring_init(void)
+{
+	int rc;
+	rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
+	if (rc < 0)
+		OSMO_ASSERT(0);
+
+	rc = eventfd(0, 0);
+	if (rc < 0) {
+		io_uring_queue_exit(&g_ring.ring);
+		OSMO_ASSERT(0);
+	}
+
+	osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
+	osmo_fd_register(&g_ring.event_ofd);
+	io_uring_register_eventfd(&g_ring.ring, rc);
+}
+
+
+static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
+{
+	struct msgb *msg;
+	struct iofd_msghdr *msghdr;
+	struct io_uring_sqe *sqe;
+
+	msg = iofd_msgb_pending_or_alloc(iofd);
+	if (!msg) {
+		LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
+		OSMO_ASSERT(0);
+	}
+
+	msghdr = iofd_msghdr_alloc(iofd, action, msg);
+	if (!msghdr) {
+		LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
+		OSMO_ASSERT(0);
+	}
+
+	msghdr->iov[0].iov_base = msg->tail;
+	msghdr->iov[0].iov_len = msgb_tailroom(msg);
+
+	switch (action) {
+	case IOFD_ACT_READ:
+		break;
+	case IOFD_ACT_RECVFROM:
+		msghdr->hdr.msg_iov = &msghdr->iov[0];
+		msghdr->hdr.msg_iovlen = 1;
+		msghdr->hdr.msg_name = &msghdr->osa.u.sa;
+		msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
+		break;
+	default:
+		OSMO_ASSERT(0);
+	}
+
+	sqe = io_uring_get_sqe(&g_ring.ring);
+	if (!sqe) {
+		LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
+		OSMO_ASSERT(0);
+	}
+
+	switch (action) {
+	case IOFD_ACT_READ:
+		io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
+		break;
+	case IOFD_ACT_RECVFROM:
+		io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
+		break;
+	default:
+		OSMO_ASSERT(0);
+	}
+	io_uring_sqe_set_data(sqe, msghdr);
+
+	io_uring_submit(&g_ring.ring);
+	/* NOTE: This only works if we have one read per fd */
+	iofd->u.uring.read_msghdr = msghdr;
+}
+
+static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
+{
+	struct osmo_io_fd *iofd = msghdr->iofd;
+	struct msgb *msg = msghdr->msg;
+
+	if (rc > 0)
+		msgb_put(msg, rc);
+
+	if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+		iofd_handle_recv(iofd, msg, rc, msghdr);
+
+	if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+		iofd_uring_submit_recv(iofd, msghdr->action);
+	else
+		iofd->u.uring.read_msghdr = NULL;
+
+
+	iofd_msghdr_free(msghdr);
+}
+
+static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
+
+static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
+{
+	struct osmo_io_fd *iofd = msghdr->iofd;
+
+	if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+		goto out_free;
+
+	/* Error during write */
+	if (rc < 0) {
+		if (msghdr->action == IOFD_ACT_WRITE)
+			iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
+		else if (msghdr->action == IOFD_ACT_SENDTO)
+			iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
+		else
+			OSMO_ASSERT(0);
+		goto out_free;
+	}
+
+	/* Incomplete write */
+	if (rc < msgb_length(msghdr->msg)) {
+		/* Re-enqueue remaining data */
+		msgb_pull(msghdr->msg, rc);
+		msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
+		iofd_txqueue_enqueue_front(iofd, msghdr);
+		goto out;
+	}
+
+	if (msghdr->action == IOFD_ACT_WRITE)
+		iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
+	else if (msghdr->action == IOFD_ACT_SENDTO)
+		iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
+	else
+		OSMO_ASSERT(0);
+
+out_free:
+	msgb_free(msghdr->msg);
+	iofd_msghdr_free(msghdr);
+
+out:
+	iofd->u.uring.write_msghdr = NULL;
+	if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
+		iofd_uring_submit_tx(iofd);
+}
+
+static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
+{
+	struct osmo_io_fd *iofd = msghdr->iofd;
+
+	IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
+
+	switch (msghdr->action) {
+	case IOFD_ACT_READ:
+	case IOFD_ACT_RECVFROM:
+		iofd_uring_handle_recv(msghdr, res);
+		break;
+	case IOFD_ACT_WRITE:
+	case IOFD_ACT_SENDTO:
+		iofd_uring_handle_tx(msghdr, res);
+		break;
+	default:
+		OSMO_ASSERT(0)
+	}
+
+	if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
+		IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
+
+	if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
+		talloc_free(iofd);
+}
+
+static void iofd_uring_cqe(struct io_uring *ring)
+{
+	int rc;
+	struct io_uring_cqe *cqe;
+	struct iofd_msghdr *msghdr;
+
+	while (io_uring_peek_cqe(ring, &cqe) == 0) {
+
+		msghdr = io_uring_cqe_get_data(cqe);
+		if (!msghdr) {
+			LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
+			io_uring_cqe_seen(ring, cqe);
+			continue;
+		}
+
+		rc = cqe->res;
+		/* Hand the entry back to the kernel before */
+		io_uring_cqe_seen(ring, cqe);
+
+		iofd_uring_handle_completion(msghdr, rc);
+
+	}
+}
+
+static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
+{
+	struct io_uring_sqe *sqe;
+	struct iofd_msghdr *msghdr;
+
+	msghdr = iofd_txqueue_dequeue(iofd);
+	if (!msghdr)
+		return -ENODATA;
+
+	sqe = io_uring_get_sqe(&g_ring.ring);
+	if (!sqe) {
+		LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
+		OSMO_ASSERT(0);
+	}
+
+	io_uring_sqe_set_data(sqe, msghdr);
+
+	switch (msghdr->action) {
+	case IOFD_ACT_WRITE:
+	case IOFD_ACT_SENDTO:
+		io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
+		break;
+	default:
+		OSMO_ASSERT(0);
+	}
+
+	io_uring_submit(&g_ring.ring);
+	iofd->u.uring.write_msghdr = msghdr;
+
+	return 0;
+}
+
+static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
+static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
+
+static int iofd_uring_register(struct osmo_io_fd *iofd)
+{
+	return 0;
+}
+
+static int iofd_uring_unregister(struct osmo_io_fd *iofd)
+{
+	struct io_uring_sqe *sqe;
+	if (iofd->u.uring.read_msghdr) {
+		sqe = io_uring_get_sqe(&g_ring.ring);
+		OSMO_ASSERT(sqe != NULL);
+		io_uring_sqe_set_data(sqe, NULL);
+		LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
+		io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
+	}
+
+	if (iofd->u.uring.write_msghdr) {
+		sqe = io_uring_get_sqe(&g_ring.ring);
+		OSMO_ASSERT(sqe != NULL);
+		io_uring_sqe_set_data(sqe, NULL);
+		LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
+		io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
+	}
+	io_uring_submit(&g_ring.ring);
+
+	return 0;
+}
+
+static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
+{
+	iofd->u.uring.write_enabled = true;
+
+	if (iofd->u.uring.write_msghdr)
+		return;
+
+	if (osmo_iofd_txqueue_len(iofd) > 0)
+		iofd_uring_submit_tx(iofd);
+	else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
+		/* Empty write request to check when the socket is connected */
+		struct iofd_msghdr *msghdr;
+		struct io_uring_sqe *sqe;
+		struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
+		if (!msg) {
+			LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
+			OSMO_ASSERT(0);
+		}
+		msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
+		if (!msghdr) {
+			LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
+			OSMO_ASSERT(0);
+		}
+
+		msghdr->iov[0].iov_base = msgb_data(msg);
+		msghdr->iov[0].iov_len = msgb_tailroom(msg);
+
+		sqe = io_uring_get_sqe(&g_ring.ring);
+		if (!sqe) {
+			LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
+			OSMO_ASSERT(0);
+		}
+		// Prep msgb/iov
+		io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
+		io_uring_sqe_set_data(sqe, msghdr);
+
+		io_uring_submit(&g_ring.ring);
+		iofd->u.uring.write_msghdr = msghdr;
+	}
+}
+
+static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
+{
+	iofd->u.uring.write_enabled = false;
+}
+
+static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
+{
+	iofd->u.uring.read_enabled = true;
+
+	if (iofd->u.uring.read_msghdr)
+		return;
+
+	switch (iofd->mode) {
+	case OSMO_IO_FD_MODE_READ_WRITE:
+		iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
+		break;
+	case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
+		iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
+		break;
+	default:
+		OSMO_ASSERT(0);
+	}
+}
+
+static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
+{
+	iofd->u.uring.read_enabled = false;
+}
+
+static int iofd_uring_close(struct osmo_io_fd *iofd)
+{
+	iofd_uring_read_disable(iofd);
+	iofd_uring_write_disable(iofd);
+	iofd_uring_unregister(iofd);
+	return close(iofd->fd);
+}
+
+const struct iofd_backend_ops iofd_uring_ops = {
+	.register_fd = iofd_uring_register,
+	.unregister_fd = iofd_uring_unregister,
+	.close = iofd_uring_close,
+	.write_enable = iofd_uring_write_enable,
+	.write_disable = iofd_uring_write_disable,
+	.read_enable = iofd_uring_read_enable,
+	.read_disable = iofd_uring_read_disable,
+};
+
+#endif /* defined(__linux__) */
diff --git a/tests/osmo_io/osmo_io_test.c b/tests/osmo_io/osmo_io_test.c
index cff594b..93beef4 100644
--- a/tests/osmo_io/osmo_io_test.c
+++ b/tests/osmo_io/osmo_io_test.c
@@ -95,6 +95,9 @@
 
 	osmo_iofd_free(iofd1);
 	osmo_iofd_free(iofd2);
+
+	for (int i = 0; i < 128; i++)
+		osmo_select_main(1);
 }
 
 static void recvfrom_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg,
@@ -147,6 +150,9 @@
 
 	osmo_iofd_free(iofd1);
 	osmo_iofd_free(iofd2);
+
+	for (int i = 0; i < 128; i++)
+		osmo_select_main(1);
 }
 static const struct log_info_cat default_categories[] = {
 };