migrate mgcp_client from osmo_wqueue to osmo_io
The new osmo_io framework means that we can [optionally] make use
of the io_uring backend, which greatly reduces the syscall load
compared to the legacy osmo_wqueue + osmo_select_main + read/write.
We only use features already present in the intiial osmo_io support
of libosmocore 1.9.0, so no entry in TODO-RELEASE is needed.
Closes: OS#5754
Related: OS#5755
Change-Id: I766224da4691695c023d4d08d042a4bbeba05e47
diff --git a/src/libosmo-mgcp-client/mgcp_client.c b/src/libosmo-mgcp-client/mgcp_client.c
index 489ce69..8b311d3 100644
--- a/src/libosmo-mgcp-client/mgcp_client.c
+++ b/src/libosmo-mgcp-client/mgcp_client.c
@@ -728,7 +728,7 @@
/* Feed an MGCP message into the receive processing.
* Parse the head and call any callback registered for the transaction id found
* in the MGCP message. This is normally called directly from the internal
- * mgcp_do_read that reads from the socket connected to the MGCP gateway. This
+ * mgcp_read_cb that reads from the socket connected to the MGCP gateway. This
* function is published mainly to be able to feed data from the test suite.
*/
int mgcp_client_rx(struct mgcp_client *mgcp, struct msgb *msg)
@@ -781,55 +781,54 @@
return rc;
}
-static int mgcp_do_read(struct osmo_fd *fd)
+static void mgcp_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
{
- struct mgcp_client *mgcp = fd->data;
- struct msgb *msg;
- int ret;
+ struct mgcp_client *mgcp = osmo_iofd_get_data(iofd);
- msg = msgb_alloc_headroom(4096, 128, "mgcp_from_gw");
- if (!msg) {
- LOGPMGW(mgcp, LOGL_ERROR, "Failed to allocate MGCP message.\n");
- return -1;
- }
-
- /* msgb_tailroom() is basically (4096 - 128); -1 is for '\0' */
- ret = read(fd->fd, msg->data, msgb_tailroom(msg) - 1);
- if (ret <= 0) {
+ if (res <= 0) {
LOGPMGW(mgcp, LOGL_ERROR, "Failed to read: %s: %d='%s'\n",
- osmo_sock_get_name2(fd->fd), errno, strerror(errno));
+ osmo_iofd_get_name(iofd), res, strerror(res));
msgb_free(msg);
- return -1;
+ return;
}
- msg->l2h = msgb_put(msg, ret);
- ret = mgcp_client_rx(mgcp, msg);
+ msg->l2h = msg->head;
+ mgcp_client_rx(mgcp, msg);
talloc_free(msg);
- return ret;
}
-static int mgcp_do_write(struct osmo_fd *fd, struct msgb *msg)
+static int mgcp_do_write(struct mgcp_client *mgcp, struct msgb *msg)
{
int ret;
- struct mgcp_client *mgcp = fd->data;
LOGPMGW(mgcp, LOGL_DEBUG, "Tx MGCP: %s: len=%u '%s'...\n",
- osmo_sock_get_name2(fd->fd), msg->len,
+ osmo_iofd_get_name(mgcp->iofd), msg->len,
osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
- ret = write(fd->fd, msg->data, msg->len);
- if (OSMO_UNLIKELY(ret != msg->len))
- LOGPMGW(mgcp, LOGL_ERROR, "Failed to Tx MGCP: %s: %d='%s'; msg: len=%u '%s'...\n",
- osmo_sock_get_name2(fd->fd), errno, strerror(errno),
- msg->len, osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
+ ret = osmo_iofd_write_msgb(mgcp->iofd, msg);
+ if (ret < 0)
+ msgb_free(msg);
/* Re-arm the keepalive Tx timer: */
if (mgcp->actual.keepalive.req_interval_sec > 0)
osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0);
+
return ret;
}
+static void mgcp_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
+{
+ struct mgcp_client *mgcp = osmo_iofd_get_data(iofd);
+
+ if (OSMO_UNLIKELY(res != msg->len)) {
+ LOGPMGW(mgcp, LOGL_ERROR, "Failed to Tx MGCP: %s: %d='%s'; msg: len=%u '%s'...\n",
+ osmo_iofd_get_name(mgcp->iofd), res, strerror(res),
+ msg->len, osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
+ }
+}
+
+
static const char *_mgcp_client_name_append_domain(const struct mgcp_client *mgcp, const char *name)
{
static char endpoint[MGCP_ENDPOINT_MAXLEN];
@@ -941,11 +940,6 @@
if (conf->description)
mgcp->actual.description = talloc_strdup(mgcp, conf->description);
- osmo_wqueue_init(&mgcp->wq, 1024);
- mgcp->wq.read_cb = mgcp_do_read;
- mgcp->wq.write_cb = mgcp_do_write;
- osmo_fd_setup(&mgcp->wq.bfd, -1, OSMO_FD_READ, osmo_wqueue_bfd_cb, mgcp, 0);
-
memcpy(&mgcp->actual.keepalive, &conf->keepalive, sizeof(conf->keepalive));
osmo_timer_setup(&mgcp->keepalive_tx_timer, mgcp_client_keepalive_tx_timer_cb, mgcp);
osmo_timer_setup(&mgcp->keepalive_rx_timer, mgcp_client_keepalive_rx_timer_cb, mgcp);
@@ -953,6 +947,11 @@
return mgcp;
}
+static const struct osmo_io_ops mgcp_clnt_ioops = {
+ .read_cb = mgcp_read_cb,
+ .write_cb = mgcp_write_cb,
+};
+
/*! Initialize client connection (opens socket)
* \param[in,out] mgcp MGCP client descriptor.
* \returns 0 on success, -EINVAL on error. */
@@ -968,19 +967,28 @@
return -EINVAL;
}
- rc = osmo_sock_init2_ofd(&mgcp->wq.bfd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, mgcp->actual.local_addr,
- mgcp->actual.local_port, mgcp->actual.remote_addr, mgcp->actual.remote_port,
- OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
+ rc = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, mgcp->actual.local_addr,
+ mgcp->actual.local_port, mgcp->actual.remote_addr, mgcp->actual.remote_port,
+ OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
if (rc < 0) {
LOGPMGW(mgcp, LOGL_FATAL,
"Failed to initialize socket %s:%u -> %s:%u for MGW: %s\n",
mgcp->actual.local_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.local_port,
mgcp->actual.remote_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.remote_port,
strerror(errno));
- goto error_close_fd;
+ goto error_free;
}
- LOGPMGW(mgcp, LOGL_INFO, "MGW connection: %s\n", osmo_sock_get_name2(mgcp->wq.bfd.fd));
+ mgcp->iofd = osmo_iofd_setup(mgcp, rc, osmo_sock_get_name2(rc), OSMO_IO_FD_MODE_READ_WRITE,
+ &mgcp_clnt_ioops, mgcp);
+ if (!mgcp->iofd)
+ goto error_close_fd;
+
+ LOGPMGW(mgcp, LOGL_INFO, "MGW connection: %s\n", osmo_iofd_get_name(mgcp->iofd));
+
+ osmo_iofd_register(mgcp->iofd, -1);
+ osmo_iofd_set_alloc_info(mgcp->iofd, 4096, 128);
+ osmo_iofd_set_txqueue_max_length(mgcp->iofd, 1024);
/* If configured, send a DLCX message to the endpoints that are configured to
* be reset on startup. Usually this is a wildcarded endpoint. */
@@ -1006,9 +1014,10 @@
osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0);
return 0;
+
error_close_fd:
- close(mgcp->wq.bfd.fd);
- mgcp->wq.bfd.fd = -1;
+ close(rc);
+error_free:
return rc;
}
@@ -1025,8 +1034,6 @@
* \returns 0 on success, -EINVAL on error. */
void mgcp_client_disconnect(struct mgcp_client *mgcp)
{
- struct osmo_wqueue *wq;
-
if (!mgcp) {
LOGP(DLMGCP, LOGL_FATAL, "MGCP client not initialized properly\n");
return;
@@ -1037,13 +1044,9 @@
osmo_timer_del(&mgcp->keepalive_tx_timer);
mgcp->conn_up = false;
- wq = &mgcp->wq;
- osmo_wqueue_clear(wq);
- LOGPMGW(mgcp, LOGL_INFO, "MGCP association: %s -- closed!\n", osmo_sock_get_name2(wq->bfd.fd));
- if (osmo_fd_is_registered(&wq->bfd))
- osmo_fd_unregister(&wq->bfd);
- close(wq->bfd.fd);
- wq->bfd.fd = -1;
+ osmo_iofd_txqueue_clear(mgcp->iofd);
+ LOGPMGW(mgcp, LOGL_INFO, "MGCP association: %s -- closed!\n", osmo_iofd_get_name(mgcp->iofd));
+ osmo_iofd_free(mgcp->iofd);
}
/*! Get the IP-Aaddress of the associated MGW as string.
@@ -1197,10 +1200,9 @@
goto mgcp_tx_error;
}
- rc = osmo_wqueue_enqueue(&mgcp->wq, msg);
+ rc = mgcp_do_write(mgcp, msg);
if (rc) {
LOGPMGW(mgcp, LOGL_FATAL, "Could not queue message to MGW\n");
- msgb_free(msg);
goto mgcp_tx_error;
} else
LOGPMGW(mgcp, LOGL_DEBUG, "Queued %u bytes for MGW\n",
diff --git a/src/libosmo-mgcp-client/mgcp_client_vty.c b/src/libosmo-mgcp-client/mgcp_client_vty.c
index d57447c..9acb621 100644
--- a/src/libosmo-mgcp-client/mgcp_client_vty.c
+++ b/src/libosmo-mgcp-client/mgcp_client_vty.c
@@ -322,7 +322,7 @@
/* If client already exists, apply the change immediately if possible: */
mgcp->actual.keepalive.req_interval_sec = atoi(argv[0]);
- if (mgcp->wq.bfd.fd != -1) { /* UDP MGCP socket connected */
+ if (mgcp->iofd) { /* UDP MGCP socket connected */
if (mgcp->actual.keepalive.req_interval_sec > 0) {
/* Re-schedule: */
osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0);
@@ -375,7 +375,7 @@
/* If client already exists, apply the change immediately if possible: */
mgcp->actual.keepalive.timeout_sec = atoi(argv[0]);
- if (mgcp->wq.bfd.fd != -1) { /* UDP MGCP socket connected */
+ if (mgcp->iofd) { /* UDP MGCP socket connected */
if (mgcp->actual.keepalive.timeout_sec > 0) {
/* Re-schedule: */
osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0);
@@ -680,7 +680,7 @@
const struct mgcp_client *cli = pool_member->client;
vty_out(vty, "%% MGW %s%s", mgcp_client_pool_member_name(pool_member), VTY_NEWLINE);
vty_out(vty, "%% MGCP link: %s,%s%s",
- cli && cli->wq.bfd.fd != -1 ? "connected" : "disconnected",
+ cli && cli->iofd ? "connected" : "disconnected",
cli && cli->conn_up ?
((cli->actual.keepalive.timeout_sec > 0) ? "UP" : "MAYBE") :
"DOWN",