osmo_io: sendmsg/recvmsg support

Add support osmo_io operations resembling sendmsg() and recvmsg() socket
operations.  This is what will enable the implementation of higher-layer
functions like equivalents of sctp_recvmsg() and sctp_send() in
libosmo-netif and/or other users.

Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Related: OS#5751
diff --git a/src/core/libosmocore.map b/src/core/libosmocore.map
index b66e37d..a50a9ed 100644
--- a/src/core/libosmocore.map
+++ b/src/core/libosmocore.map
@@ -266,7 +266,10 @@
 osmo_iofd_ops;
 osmo_iofd_register;
 osmo_iofd_sendto_msgb;
+osmo_iofd_sctp_send_msgb;
+osmo_iofd_sendmsg_msgb;
 osmo_iofd_set_alloc_info;
+osmo_iofd_set_cmsg_size;
 osmo_iofd_set_data;
 osmo_iofd_set_ioops;
 osmo_iofd_set_priv_nr;
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 71249cf..5d25f66 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -1,8 +1,8 @@
 /*! \file osmo_io.c
  * New osmocom async I/O API.
  *
- * (C) 2022 by Harald Welte <laforge@osmocom.org>
- * (C) 2022-2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+ * (C) 2022-2024 by Harald Welte <laforge@osmocom.org>
+ * (C) 2022-2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
  * Author: Daniel Willmann <dwillmann@sysmocom.de>
  *
  * All Rights Reserved.
@@ -105,8 +105,10 @@
  *  \param[in] iofd the osmo_io file structure
  *  \param[in] action the action this msg(hdr) is for (read, write, ..)
  *  \param[in] msg the msg buffer to use. Will allocate a new one if NULL
+ *  \param[in] cmsg_size size (in bytes) of iofd_msghdr.cmsg buffer. Can be 0 if cmsg is not used.
  *  \returns the newly allocated msghdr or NULL in case of error */
-struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg)
+struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg,
+				      size_t cmsg_size)
 {
 	bool free_msg = false;
 	struct iofd_msghdr *hdr;
@@ -120,7 +122,7 @@
 		talloc_steal(iofd, msg);
 	}
 
-	hdr = talloc_zero(iofd, struct iofd_msghdr);
+	hdr = talloc_zero_size(iofd, sizeof(struct iofd_msghdr) + cmsg_size);
 	if (!hdr) {
 		if (free_msg)
 			talloc_free(msg);
@@ -339,8 +341,10 @@
 	case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
 		iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa);
 		break;
-	case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
-		/* TODO Implement */
+	case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+		iofd->io_ops.recvmsg_cb(iofd, rc, msg, &hdr->hdr);
+		break;
+	default:
 		OSMO_ASSERT(false);
 		break;
 	}
@@ -378,6 +382,9 @@
 	case IOFD_ACT_SENDTO:
 		iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
 		break;
+	case IOFD_ACT_SENDMSG:
+		iofd->io_ops.sendmsg_cb(iofd, rc, msg);
+		break;
 	default:
 		OSMO_ASSERT(0);
 	}
@@ -408,7 +415,7 @@
 		return -EINVAL;
 	}
 
-	struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
+	struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
 	if (!msghdr)
 		return -ENOMEM;
 
@@ -450,7 +457,7 @@
 		return -EINVAL;
 	}
 
-	struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg);
+	struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg, 0);
 	if (!msghdr)
 		return -ENOMEM;
 
@@ -475,16 +482,97 @@
 	return 0;
 }
 
+/*! ismo_io equivalent of the sendmsg(2) socket API call
+ *
+ *  Appends the message to the internal transmit queue.
+ *  If the function returns success (0), it will take ownership of the msgb and
+ *  internally call msgb_free() after the write request completes.
+ *  In case of an error the msgb needs to be freed by the caller.
+ *  \param[in] iofd file descriptor to write to
+ *  \param[in] msg message buffer to send; is used to fill msgh->iov[]
+ *  \param[in] sendmsg_flags Flags to pass to the send call
+ *  \param[in] msgh 'struct msghdr' for name/control/flags. iov must be empty!
+ *  \returns 0 in case of success; a negative value in case of error
+ */
+int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags, const struct msghdr *msgh)
+{
+	int rc;
+	struct iofd_msghdr *msghdr;
+
+	OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
+	if (OSMO_UNLIKELY(!iofd->io_ops.sendmsg_cb)) {
+		LOGPIO(iofd, LOGL_ERROR, "sendmsg_cb not set, Rejecting msgb\n");
+		return -EINVAL;
+	}
+
+	if (OSMO_UNLIKELY(msgh->msg_namelen > sizeof(msghdr->osa))) {
+		LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg msg_namelen (%u) > supported %zu bytes\n",
+			msgh->msg_namelen, sizeof(msghdr->osa));
+		return -EINVAL;
+	}
+
+	if (OSMO_UNLIKELY(msgh->msg_iovlen)) {
+		LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg must have all in 'struct msgb', not in 'msg_iov'\n");
+		return -EINVAL;
+	}
+
+	msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDMSG, msg, msgh->msg_controllen);
+	if (!msghdr)
+		return -ENOMEM;
+
+	/* copy over optional address */
+	if (msgh->msg_name) {
+		memcpy(&msghdr->osa, msgh->msg_name, msgh->msg_namelen);
+		msghdr->hdr.msg_name = &msghdr->osa.u.sa;
+		msghdr->hdr.msg_namelen = msgh->msg_namelen;
+	}
+
+	/* build iov from msgb */
+	msghdr->iov[0].iov_base = msgb_data(msghdr->msg);
+	msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
+	msghdr->hdr.msg_iov = &msghdr->iov[0];
+	msghdr->hdr.msg_iovlen = 1;
+
+	/* copy over the cmsg from the msghdr */
+	if (msgh->msg_control && msgh->msg_controllen) {
+		msghdr->hdr.msg_control = msghdr->cmsg;
+		msghdr->hdr.msg_controllen = msgh->msg_controllen;
+		memcpy(msghdr->cmsg, msgh->msg_control, msgh->msg_controllen);
+	}
+
+	/* copy over msg_flags */
+	msghdr->hdr.msg_flags = sendmsg_flags;
+
+	rc = iofd_txqueue_enqueue(iofd, msghdr);
+	if (rc < 0) {
+		iofd_msghdr_free(msghdr);
+		LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc);
+		return rc;
+	}
+
+	return 0;
+}
+
 static int check_mode_callback_compat(enum osmo_io_fd_mode mode, const struct osmo_io_ops *ops)
 {
 	switch (mode) {
 	case OSMO_IO_FD_MODE_READ_WRITE:
 		if (ops->recvfrom_cb || ops->sendto_cb)
 			return false;
+		if (ops->recvmsg_cb || ops->sendmsg_cb)
+			return false;
 		break;
 	case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
 		if (ops->read_cb || ops->write_cb)
 			return false;
+		if (ops->recvmsg_cb || ops->sendmsg_cb)
+			return false;
+		break;
+	case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+		if (ops->recvfrom_cb || ops->sendto_cb)
+			return false;
+		if (ops->read_cb || ops->write_cb)
+			return false;
 		break;
 	default:
 		break;
@@ -511,6 +599,7 @@
 	switch (mode) {
 	case OSMO_IO_FD_MODE_READ_WRITE:
 	case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
+	case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
 		break;
 	default:
 		return NULL;
@@ -547,6 +636,16 @@
 	return iofd;
 }
 
+/*! Set the size of the control message buffer allocated when submitting recvmsg */
+int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size)
+{
+	if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG)
+		return -EINVAL;
+
+	iofd->cmsg_size = cmsg_size;
+	return 0;
+}
+
 /*! Register the fd with the underlying backend.
  *
  *  \param[in] iofd the iofd file descriptor
@@ -567,7 +666,8 @@
 
 	IOFD_FLAG_UNSET(iofd, IOFD_FLAG_CLOSED);
 	if ((iofd->mode == OSMO_IO_FD_MODE_READ_WRITE && iofd->io_ops.read_cb) ||
-	    (iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && iofd->io_ops.recvfrom_cb)) {
+	    (iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && iofd->io_ops.recvfrom_cb) ||
+	    (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG && iofd->io_ops.recvmsg_cb)) {
 		osmo_iofd_ops.read_enable(iofd);
 	}
 
@@ -767,7 +867,12 @@
 		else
 			osmo_iofd_ops.read_disable(iofd);
 		break;
-	case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
+	case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+		if (iofd->io_ops.recvmsg_cb)
+			osmo_iofd_ops.read_enable(iofd);
+		else
+			osmo_iofd_ops.read_disable(iofd);
+		break;
 	default:
 		OSMO_ASSERT(0);
 	}
@@ -780,7 +885,8 @@
  *  \param[in] iofd the file descriptor */
 void osmo_iofd_notify_connected(struct osmo_io_fd *iofd)
 {
-	OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
+	OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
+		    iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
 	OSMO_ASSERT(osmo_iofd_ops.notify_connected);
 	osmo_iofd_ops.notify_connected(iofd);
 }
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 9c86e05..af47a3d 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -4,6 +4,7 @@
 
 #include <unistd.h>
 #include <stdbool.h>
+#include <netinet/sctp.h>
 
 #include <osmocom/core/osmo_io.h>
 #include <osmocom/core/linuxlist.h>
@@ -72,6 +73,9 @@
 	/*! private number, extending \a data */
 	unsigned int priv_nr;
 
+	/*! size of iofd_msghdr.cmsg[] when allocated in recvmsg path */
+	size_t cmsg_size;
+
 	struct {
 		/*! talloc context from which to allocate msgb when reading */
 		const void *ctx;
@@ -109,7 +113,8 @@
 	IOFD_ACT_WRITE,
 	IOFD_ACT_RECVFROM,
 	IOFD_ACT_SENDTO,
-	// TODO: SCTP_*
+	IOFD_ACT_RECVMSG,
+	IOFD_ACT_SENDMSG,
 };
 
 
@@ -132,6 +137,9 @@
 	struct msgb *msg;
 	/*! I/O file descriptor on which we perform this I/O operation */
 	struct osmo_io_fd *iofd;
+
+	/*! control message buffer for passing sctp_sndrcvinfo along */
+	char cmsg[0]; /* size is determined by iofd->cmsg_size on recvmsg, and by mcghdr->msg_controllen on sendmsg */
 };
 
 enum iofd_seg_act {
@@ -140,7 +148,7 @@
 	IOFD_SEG_ACT_DEFER,
 };
 
-struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg);
+struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg, size_t cmsg_size);
 void iofd_msghdr_free(struct iofd_msghdr *msghdr);
 
 struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd);
diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c
index 8398a30..52e806d 100644
--- a/src/core/osmo_io_poll.c
+++ b/src/core/osmo_io_poll.c
@@ -49,6 +49,7 @@
 
 	if (what & OSMO_FD_READ) {
 		struct iofd_msghdr hdr;
+
 		msg = iofd_msgb_pending_or_alloc(iofd);
 		if (!msg) {
 			LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
@@ -64,6 +65,10 @@
 			.msg_name = &hdr.osa.u.sa,
 			.msg_namelen = sizeof(struct osmo_sockaddr),
 		};
+		if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) {
+			hdr.hdr.msg_control = alloca(iofd->cmsg_size);
+			hdr.hdr.msg_controllen = iofd->cmsg_size;
+		}
 
 		rc = recvmsg(ofd->fd, &hdr.hdr, flags);
 		if (rc > 0)
@@ -90,13 +95,15 @@
 			case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
 				iofd->io_ops.sendto_cb(iofd, 0, NULL, NULL);
 				break;
+			case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+				iofd->io_ops.sendmsg_cb(iofd, 0, NULL);
+				break;
 			default:
 				break;
 			}
 			if (osmo_iofd_txqueue_len(iofd) == 0)
 				iofd_poll_ops.write_disable(iofd);
 		}
-
 	}
 }
 
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index 24d1e08..aa9df85 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -3,6 +3,7 @@
  *
  * (C) 2022-2023 by sysmocom s.f.m.c.
  * Author: Daniel Willmann <daniel@sysmocom.de>
+ * (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
  *
  * All Rights Reserved.
  *
@@ -35,6 +36,8 @@
 #include <stdbool.h>
 #include <errno.h>
 
+#include <netinet/in.h>
+#include <netinet/sctp.h>
 #include <sys/eventfd.h>
 #include <liburing.h>
 
@@ -114,7 +117,7 @@
 		OSMO_ASSERT(0);
 	}
 
-	msghdr = iofd_msghdr_alloc(iofd, action, msg);
+	msghdr = iofd_msghdr_alloc(iofd, action, msg, iofd->cmsg_size);
 	if (!msghdr) {
 		LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
 		OSMO_ASSERT(0);
@@ -126,6 +129,10 @@
 	switch (action) {
 	case IOFD_ACT_READ:
 		break;
+	case IOFD_ACT_RECVMSG:
+		msghdr->hdr.msg_control = msghdr->cmsg;
+		msghdr->hdr.msg_controllen = iofd->cmsg_size;
+		/* fall-through */
 	case IOFD_ACT_RECVFROM:
 		msghdr->hdr.msg_iov = &msghdr->iov[0];
 		msghdr->hdr.msg_iovlen = 1;
@@ -146,6 +153,7 @@
 	case IOFD_ACT_READ:
 		io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
 		break;
+	case IOFD_ACT_RECVMSG:
 	case IOFD_ACT_RECVFROM:
 		io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
 		break;
@@ -210,10 +218,12 @@
 	switch (msghdr->action) {
 	case IOFD_ACT_READ:
 	case IOFD_ACT_RECVFROM:
+	case IOFD_ACT_RECVMSG:
 		iofd_uring_handle_recv(msghdr, res);
 		break;
 	case IOFD_ACT_WRITE:
 	case IOFD_ACT_SENDTO:
+	case IOFD_ACT_SENDMSG:
 		iofd_uring_handle_tx(msghdr, res);
 		break;
 	default:
@@ -273,6 +283,7 @@
 	switch (msghdr->action) {
 	case IOFD_ACT_WRITE:
 	case IOFD_ACT_SENDTO:
+	case IOFD_ACT_SENDMSG:
 		io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
 		break;
 	default:
@@ -334,21 +345,20 @@
 			LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
 			OSMO_ASSERT(0);
 		}
-		msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
+		msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
 		if (!msghdr) {
 			LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
 			OSMO_ASSERT(0);
 		}
 
 		msghdr->iov[0].iov_base = msgb_data(msg);
-		msghdr->iov[0].iov_len = msgb_tailroom(msg);
+		msghdr->iov[0].iov_len = msgb_length(msg);
 
 		sqe = io_uring_get_sqe(&g_ring.ring);
 		if (!sqe) {
 			LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
 			OSMO_ASSERT(0);
 		}
-		// Prep msgb/iov
 		io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
 		io_uring_sqe_set_data(sqe, msghdr);
 
@@ -376,6 +386,9 @@
 	case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
 		iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
 		break;
+	case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
+		iofd_uring_submit_recv(iofd, IOFD_ACT_RECVMSG);
+		break;
 	default:
 		OSMO_ASSERT(0);
 	}