osmo_io: Factor out and use common send function from backend
This handles reenqueuing a message on EAGAIN and incomplete write
Change-Id: I6da2653d32aedd0e7872be0cf90a841b56462e59
diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c
index 8507f46..e059f87 100644
--- a/src/core/osmo_io.c
+++ b/src/core/osmo_io.c
@@ -344,6 +344,46 @@
}
}
+/*! completion handler: Calld by osmo_io backend after a given I/O operation has completed
+ * \param[in] iofd I/O file-descriptor on which I/O has completed
+ * \param[in] rc return value of the I/O operation
+ * \param[in] msghdr serialized msghdr containing state of completed I/O
+ */
+void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr)
+{
+ struct msgb *msg = msghdr->msg;
+
+ /* Incomplete write */
+ if (rc > 0 && rc < msgb_length(msg)) {
+ /* Re-enqueue remaining data */
+ msgb_pull(msg, rc);
+ msghdr->iov[0].iov_len = msgb_length(msg);
+ iofd_txqueue_enqueue_front(iofd, msghdr);
+ return;
+ }
+
+ /* Reenqueue the complete msgb */
+ if (rc == -EAGAIN) {
+ iofd_txqueue_enqueue_front(iofd, msghdr);
+ return;
+ }
+
+ /* All other failure and success cases are handled here */
+ switch (msghdr->action) {
+ case IOFD_ACT_WRITE:
+ iofd->io_ops.write_cb(iofd, rc, msg);
+ break;
+ case IOFD_ACT_SENDTO:
+ iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
+ break;
+ default:
+ OSMO_ASSERT(0);
+ }
+
+ msgb_free(msghdr->msg);
+ iofd_msghdr_free(msghdr);
+}
+
/* Public functions */
/*! Send a message through a connected socket.
diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h
index 73a81e1..e8f4ea2 100644
--- a/src/core/osmo_io_internal.h
+++ b/src/core/osmo_io_internal.h
@@ -146,6 +146,7 @@
struct msgb *iofd_msgb_pending_or_alloc(struct osmo_io_fd *iofd);
void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct iofd_msghdr *msghdr);
+void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr);
void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int rc);
int iofd_txqueue_enqueue(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr);
diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c
index 2c1e422..5000dca 100644
--- a/src/core/osmo_io_poll.c
+++ b/src/core/osmo_io_poll.c
@@ -78,33 +78,8 @@
if (what & OSMO_FD_WRITE) {
struct iofd_msghdr *msghdr = iofd_txqueue_dequeue(iofd);
if (msghdr) {
- msg = msghdr->msg;
-
rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags);
- if (rc > 0 && rc < msgb_length(msg)) {
- msgb_pull(msg, rc);
- iofd_txqueue_enqueue_front(iofd, msghdr);
- return;
- }
- if (rc == -EAGAIN) {
- iofd_txqueue_enqueue_front(iofd, msghdr);
- return;
- }
-
- switch (iofd->mode) {
- case OSMO_IO_FD_MODE_READ_WRITE:
- iofd->io_ops.write_cb(iofd, rc, msg);
- break;
- case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
- iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
- break;
- case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
- OSMO_ASSERT(false);
- break;
- }
-
- talloc_free(msghdr);
- msgb_free(msg);
+ iofd_handle_send_completion(iofd, rc, msghdr);
} else {
if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE)
/* Socket is writable, but we have no data to send. A non-blocking/async
diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c
index abeea79..1aa17d4 100644
--- a/src/core/osmo_io_uring.c
+++ b/src/core/osmo_io_uring.c
@@ -186,43 +186,14 @@
static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
{
struct osmo_io_fd *iofd = msghdr->iofd;
- struct msgb *msg = msghdr->msg;
- if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
- goto out_free;
-
- /* Error during write */
- if (rc < 0) {
- if (msghdr->action == IOFD_ACT_WRITE)
- iofd->io_ops.write_cb(iofd, rc, msg);
- else if (msghdr->action == IOFD_ACT_SENDTO)
- iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
- else
- OSMO_ASSERT(0);
- goto out_free;
+ if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
+ msgb_free(msghdr->msg);
+ iofd_msghdr_free(msghdr);
+ } else {
+ iofd_handle_send_completion(iofd, rc, msghdr);
}
- /* Incomplete write */
- if (rc < msgb_length(msg)) {
- /* Re-enqueue remaining data */
- msgb_pull(msg, rc);
- msghdr->iov[0].iov_len = msgb_length(msg);
- iofd_txqueue_enqueue_front(iofd, msghdr);
- goto out;
- }
-
- if (msghdr->action == IOFD_ACT_WRITE)
- iofd->io_ops.write_cb(iofd, rc, msg);
- else if (msghdr->action == IOFD_ACT_SENDTO)
- iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
- else
- OSMO_ASSERT(0);
-
-out_free:
- msgb_free(msghdr->msg);
- iofd_msghdr_free(msghdr);
-
-out:
iofd->u.uring.write_msghdr = NULL;
/* submit the next to-be-transmitted message for this file descriptor */
if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))