ccid: Fix AIO handling of IN and IRQ endpoint

Even though functionfs forces us to use Linux AIO, we actually don't
want to submit multiple concurrent pendign writes, but we simply want
to emulate the poll/select interface.  So we must introduce a write
(message) queue for the IN and IRQ endpoint, and we must always only
pull one message off the queue, submit the iocb.  On completion, we
dequeue the next message, and so on.

Change-Id: I8f7026f13cf542185cfec4d4780c64965d1706e2
diff --git a/ccid/ccid_main_functionfs.c b/ccid/ccid_main_functionfs.c
index 717a6ae..a289865 100644
--- a/ccid/ccid_main_functionfs.c
+++ b/ccid/ccid_main_functionfs.c
@@ -127,6 +127,7 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <unistd.h>
+#include <string.h>
 #include <assert.h>
 #include <fcntl.h>
 #include <sys/stat.h>
@@ -153,6 +154,8 @@
 	struct osmo_fd ep_in;
 	struct osmo_fd ep_out;
 	struct osmo_fd ep_int;
+	struct llist_head ep_in_queue;
+	struct llist_head ep_int_queue;
 #ifndef FUNCTIONFS_SUPPORTS_POLL
 	struct osmo_fd aio_evfd;
 	io_context_t aio_ctx;
@@ -184,7 +187,6 @@
 		}
 		msgb_put(msg, rc);
 		ccid_handle_out(uh->ccid_handle, msg);
-		msgb_free(msg);
 	}
 	return 0;
 }
@@ -245,23 +247,72 @@
 
 #ifndef FUNCTIONFS_SUPPORTS_POLL
 
+/* an AIO read (OUT) has just completed, let's refill the transfer */
 static void aio_refill_out(struct ufunc_handle *uh)
 {
 	int rc;
 	struct aio_help *ah = &uh->aio_out;
 
 	LOGP(DUSB, LOGL_DEBUG, "%s\n", __func__);
-	msgb_reset(ah->msg);
+	OSMO_ASSERT(!ah->msg);
+	ah->msg = msgb_alloc(512, "OUT-Rx-AIO");
+	OSMO_ASSERT(ah->msg);
 	io_prep_pread(ah->iocb, uh->ep_out.fd, msgb_data(ah->msg), msgb_tailroom(ah->msg), 0);
 	io_set_eventfd(ah->iocb, uh->aio_evfd.fd);
 	rc = io_submit(uh->aio_ctx, 1, &ah->iocb);
 	OSMO_ASSERT(rc >= 0);
 }
 
+/* dequeue the next msgb from ep_in_queue and set up AIO for it */
+static void dequeue_aio_write_in(struct ufunc_handle *uh)
+{
+	struct aio_help *ah = &uh->aio_in;
+	struct msgb *d;
+	int rc;
+
+	if (ah->msg)
+		return;
+
+	d = msgb_dequeue(&uh->ep_in_queue);
+	if (!d)
+		return;
+
+	OSMO_ASSERT(ah->iocb);
+	ah->msg = d;
+	io_prep_pwrite(ah->iocb, uh->ep_in.fd, msgb_data(d), msgb_length(d), 0);
+	io_set_eventfd(ah->iocb, uh->aio_evfd.fd);
+	rc = io_submit(uh->aio_ctx, 1, &ah->iocb);
+	OSMO_ASSERT(rc >= 0);
+
+}
+
+/* dequeue the next msgb from ep_int_queue and set up AIO for it */
+static void dequeue_aio_write_int(struct ufunc_handle *uh)
+{
+	struct aio_help *ah = &uh->aio_int;
+	struct msgb *d;
+	int rc;
+
+	if (ah->msg)
+		return;
+
+	d = msgb_dequeue(&uh->ep_int_queue);
+	if (!d)
+		return;
+
+	OSMO_ASSERT(ah->iocb);
+	ah->msg = d;
+	io_prep_pwrite(ah->iocb, uh->ep_int.fd, msgb_data(d), msgb_length(d), 0);
+	io_set_eventfd(ah->iocb, uh->aio_evfd.fd);
+	rc = io_submit(uh->aio_ctx, 1, &ah->iocb);
+	OSMO_ASSERT(rc >= 0);
+}
+
 static int evfd_cb(struct osmo_fd *ofd, unsigned int what)
 {
 	struct ufunc_handle *uh = (struct ufunc_handle *) ofd->data;
 	struct io_event evt[3];
+	struct msgb *msg;
 	uint64_t ev_cnt;
 	int i, rc;
 
@@ -282,20 +333,24 @@
 			LOGP(DUSB, LOGL_DEBUG, "IRQ AIO completed, free()ing msgb\n");
 			msgb_free(uh->aio_in.msg);
 			uh->aio_in.msg = NULL;
+			dequeue_aio_write_int(uh);
 		} else if (fd == uh->ep_in.fd) {
 			/* IN endpoint AIO has completed. This means the IN transfer which
 			 * we sent to the host has completed */
 			LOGP(DUSB, LOGL_DEBUG, "IN AIO completed, free()ing msgb\n");
 			msgb_free(uh->aio_in.msg);
 			uh->aio_in.msg = NULL;
+			dequeue_aio_write_in(uh);
 		} else if (fd == uh->ep_out.fd) {
 			/* OUT endpoint AIO has completed. This means the host has sent us
 			 * some OUT data */
 			LOGP(DUSB, LOGL_DEBUG, "OUT AIO completed, dispatching received msg\n");
 			msgb_put(uh->aio_out.msg, evt[i].res);
 			//printf("\t%s\n", msgb_hexdump(uh->aio_out.msg));
-			//ccid_handle_out(uh->ccid_handle, uh->aio_out.buf, evt[i].res);
-			ccid_handle_out(uh->ccid_handle, uh->aio_out.msg);
+			msg = uh->aio_out.msg;
+			uh->aio_out.msg = NULL;
+			/* CCID handler takes ownership of msgb */
+			ccid_handle_out(uh->ccid_handle, msg);
 			aio_refill_out(uh);
 		}
 	}
@@ -325,6 +380,7 @@
 	}
 
 	/* open other endpoint file descriptors */
+	INIT_LLIST_HEAD(&uh->ep_int_queue);
 	rc = open("ep1", O_RDWR);
 	assert(rc >= 0);
 	osmo_fd_setup(&uh->ep_int, rc, 0, &ep_int_cb, uh, 1);
@@ -339,6 +395,7 @@
 	osmo_fd_register(&uh->ep_out);
 #endif
 
+	INIT_LLIST_HEAD(&uh->ep_in_queue);
 	rc = open("ep3", O_RDWR);
 	assert(rc >= 0);
 	osmo_fd_setup(&uh->ep_in, rc, 0, &ep_in_cb, uh, 3);
@@ -362,9 +419,9 @@
 	osmo_fd_setup(&uh->aio_evfd, rc, BSC_FD_READ, &evfd_cb, uh, 0);
 	osmo_fd_register(&uh->aio_evfd);
 
-	uh->aio_out.msg = msgb_alloc(512, "OUT-Rx-AIO");
 	uh->aio_out.iocb = malloc(sizeof(struct iocb));
-
+	uh->aio_in.iocb = malloc(sizeof(struct iocb));
+	uh->aio_int.iocb = malloc(sizeof(struct iocb));
 #endif
 
 	return 0;
@@ -373,18 +430,32 @@
 static int ccid_ops_send_in(struct ccid_instance *ci, struct msgb *msg)
 {
 	struct ufunc_handle *uh = ci->priv;
-	struct aio_help *ah = &uh->aio_in;
-	int rc;
 
-	/* FIXME: does this work with multiple iocbs? probably not yet! */
-	ah->iocb = malloc(sizeof(struct iocb));
-	OSMO_ASSERT(ah->iocb);
-	ah->msg = msg;
-	io_prep_pwrite(ah->iocb, uh->ep_in.fd, msgb_data(msg), msgb_length(msg), 0);
-	io_set_eventfd(ah->iocb, uh->aio_evfd.fd);
-	rc = io_submit(uh->aio_ctx, 1, &ah->iocb);
-	OSMO_ASSERT(rc >= 0);
+	/* append to the queue */
+	msgb_enqueue(&uh->ep_in_queue, msg);
 
+	/* trigger, if needed */
+#ifndef FUNCTIONFS_SUPPORTS_POLL
+	dequeue_aio_write_in(uh);
+#else
+	uh->ep_in.when |= BSC_FD_WRITE;
+#endif
+	return 0;
+}
+
+static int ccid_ops_send_int(struct ccid_instance *ci, struct msgb *msg)
+{
+	struct ufunc_handle *uh = ci->priv;
+
+	/* append to the queue */
+	msgb_enqueue(&uh->ep_int_queue, msg);
+
+	/* trigger, if needed */
+#ifndef FUNCTIONFS_SUPPORTS_POLL
+	dequeue_aio_write_int(uh);
+#else
+	uh->ep_int.when |= BSC_FD_WRITE;
+#endif
 	return 0;
 }