stats: Support statsd Multi-Metric Packets

If the MTU is given, combine several messages into a single UDP
packet until the limit is reached. Flush all reporters after the
values have been scanned.

New vty commands (node config-stats):
  mtu <100-65535>     Enable multi-metric packets and set the maximum
                      packet size (in byte)
  no mtu              Disable multi-metric packets

Note that single messages that are longer than the given MTU (minus
28 octets protocol overhead) will be dropped.

Sponsored-by: On-Waves ehf
diff --git a/include/osmocom/core/stats.h b/include/osmocom/core/stats.h
index 489e0e4..ed461dd 100644
--- a/include/osmocom/core/stats.h
+++ b/include/osmocom/core/stats.h
@@ -22,6 +22,8 @@
 #include <sys/socket.h>
 #include <osmocom/core/linuxlist.h>
 
+struct msgb;
+
 enum stats_reporter_type {
 	STATS_REPORTER_STATSD,
 };
@@ -45,6 +47,8 @@
 	struct sockaddr bind_addr;
 	int bind_addr_len;
 	int fd;
+	struct msgb *buffer;
+	int agg_enabled;
 
 	struct llist_head list;
 };
@@ -71,6 +75,7 @@
 int stats_reporter_set_remote_addr(struct stats_reporter *srep, const char *addr);
 int stats_reporter_set_remote_port(struct stats_reporter *srep, int port);
 int stats_reporter_set_local_addr(struct stats_reporter *srep, const char *addr);
+int stats_reporter_set_mtu(struct stats_reporter *srep, int mtu);
 int stats_reporter_set_name_prefix(struct stats_reporter *srep, const char *prefix);
 int stats_reporter_enable(struct stats_reporter *srep);
 int stats_reporter_disable(struct stats_reporter *srep);
diff --git a/src/stats.c b/src/stats.c
index ef4be82..8faed89 100644
--- a/src/stats.c
+++ b/src/stats.c
@@ -38,11 +38,13 @@
 #include <osmocom/core/stat_item.h>
 #include <osmocom/core/timer.h>
 #include <osmocom/core/statistics.h>
+#include <osmocom/core/msgb.h>
 
 /* TODO: register properly */
 #define DSTATS DLGLOBAL
 
 #define STATS_DEFAULT_INTERVAL 5 /* secs */
+#define STATS_DEFAULT_STATSD_BUFLEN 256
 
 static LLIST_HEAD(stats_reporter_list);
 static void *stats_ctx = NULL;
@@ -60,6 +62,7 @@
 static int stats_reporter_statsd_close(struct stats_reporter *srep);
 static int stats_reporter_send(struct stats_reporter *srep, const char *data,
 	int data_len);
+static int stats_reporter_send_buffer(struct stats_reporter *srep);
 
 static int update_srep_config(struct stats_reporter *srep)
 {
@@ -213,6 +216,16 @@
 	return update_srep_config(srep);
 }
 
+int stats_reporter_set_mtu(struct stats_reporter *srep, int mtu)
+{
+	if (mtu < 0)
+		return -EINVAL;
+
+	srep->mtu = mtu;
+
+	return update_srep_config(srep);
+}
+
 int stats_set_interval(int interval)
 {
 	if (interval <= 0)
@@ -261,6 +274,21 @@
 	return rc;
 }
 
+static int stats_reporter_send_buffer(struct stats_reporter *srep)
+{
+	int rc;
+
+	if (!srep->buffer || msgb_length(srep->buffer) == 0)
+		return 0;
+
+	rc = stats_reporter_send(srep,
+		(const char *)msgb_data(srep->buffer), msgb_length(srep->buffer));
+
+	msgb_trim(srep->buffer, 0);
+
+	return rc;
+}
+
 /*** statsd reporter ***/
 
 struct stats_reporter *stats_reporter_create_statsd(const char *name)
@@ -275,6 +303,7 @@
 {
 	int sock;
 	int rc;
+	int buffer_size = STATS_DEFAULT_STATSD_BUFLEN;
 
 	if (srep->fd != -1)
 		stats_reporter_statsd_close(srep);
@@ -291,6 +320,13 @@
 
 	srep->fd = sock;
 
+	if (srep->mtu > 0) {
+		buffer_size = srep->mtu - 20 /* IP */ - 8 /* UDP */;
+		srep->agg_enabled = 1;
+	}
+
+	srep->buffer = msgb_alloc(buffer_size, "stats buffer");
+
 	return 0;
 
 failed:
@@ -306,8 +342,12 @@
 	if (srep->fd == -1)
 		return -EBADF;
 
+	stats_reporter_send_buffer(srep);
+
 	rc = close(srep->fd);
 	srep->fd = -1;
+	msgb_free(srep->buffer);
+	srep->buffer = NULL;
 	return rc == -1 ? -errno : 0;
 }
 
@@ -315,30 +355,62 @@
 	const char *name1, int index1, const char *name2, int value,
 	const char *unit)
 {
-	char buf[256];
-	int nchars, rc;
+	char *buf;
+	int buf_size;
+	int nchars, rc = 0;
 	char *fmt = NULL;
+	int old_len = msgb_length(srep->buffer);
 
 	if (name1) {
 		if (index1 > 0)
-			fmt = "%1$s.%2$s.%3$d.%4$s:%5$d|%6$s";
+			fmt = "%1$s.%2$s.%6$d.%3$s:%4$d|%5$s";
 		else
-			fmt = "%1$s.%2$s.%4$s:%5$d|%6$s";
+			fmt = "%1$s.%2$s.%3$s:%4$d|%5$s";
 	} else {
-		fmt = "%1$s.%4$s:%5$d|%6$s";
+		fmt = "%1$s.%2$0.0s%3$s:%4$d|%5$s";
 	}
 	if (!srep->name_prefix)
 		fmt += 5; /* skip prefix part */
 
-	nchars = snprintf(buf, sizeof(buf), fmt,
-		srep->name_prefix, name1, index1, name2,
-		value, unit);
+	if (srep->agg_enabled) {
+		if (msgb_length(srep->buffer) > 0 &&
+			msgb_tailroom(srep->buffer) > 0)
+		{
+			msgb_put_u8(srep->buffer, '\n');
+		}
+	}
 
-	if (nchars >= sizeof(buf))
+	buf = (char *)msgb_put(srep->buffer, 0);
+	buf_size = msgb_tailroom(srep->buffer);
+
+	nchars = snprintf(buf, buf_size, fmt,
+		srep->name_prefix, name1, name2,
+		value, unit, index1);
+
+	if (nchars >= buf_size) {
 		/* Truncated */
-		return -EMSGSIZE;
+		/* Restore original buffer (without trailing LF) */
+		msgb_trim(srep->buffer, old_len);
+		/* Send it */
+		rc = stats_reporter_send_buffer(srep);
 
-	rc = stats_reporter_send(srep, buf, nchars);
+		/* Try again */
+		buf = (char *)msgb_put(srep->buffer, 0);
+		buf_size = msgb_tailroom(srep->buffer);
+
+		nchars = snprintf(buf, buf_size, fmt,
+			srep->name_prefix, name1, name2,
+			value, unit, index1);
+
+		if (nchars >= buf_size)
+			return -EMSGSIZE;
+	}
+
+	if (nchars > 0)
+		msgb_trim(srep->buffer, msgb_length(srep->buffer) + nchars);
+
+	if (!srep->agg_enabled)
+		rc = stats_reporter_send_buffer(srep);
 
 	return rc;
 }
@@ -498,11 +570,25 @@
 
 /*** main reporting function ***/
 
+static void flush_all_reporters()
+{
+	struct stats_reporter *srep;
+
+	llist_for_each_entry(srep, &stats_reporter_list, list) {
+		if (!srep->running)
+			continue;
+
+		stats_reporter_send_buffer(srep);
+	}
+}
+
 int stats_report()
 {
 	osmo_counters_for_each(handle_counter, NULL);
 	rate_ctr_for_each_group(rate_ctr_group_handler, NULL);
 	stat_item_for_each_group(stat_item_group_handler, NULL);
 
+	flush_all_reporters();
+
 	return 0;
 }
diff --git a/src/vty/stats_vty.c b/src/vty/stats_vty.c
index a4fd7b0..775184c 100644
--- a/src/vty/stats_vty.c
+++ b/src/vty/stats_vty.c
@@ -128,6 +128,23 @@
 		argv[0], "remote port");
 }
 
+DEFUN(cfg_stats_reporter_mtu, cfg_stats_reporter_mtu_cmd,
+	"mtu <100-65535>",
+	"Set the maximum packet size\n"
+	"Size in byte\n")
+{
+	return set_srep_parameter_int(vty, stats_reporter_set_mtu,
+		argv[0], "mtu");
+}
+
+DEFUN(cfg_no_stats_reporter_mtu, cfg_no_stats_reporter_mtu_cmd,
+	"no mtu",
+	NO_STR "Set the maximum packet size\n")
+{
+	return set_srep_parameter_int(vty, stats_reporter_set_mtu,
+		0, "mtu");
+}
+
 DEFUN(cfg_stats_reporter_prefix, cfg_stats_reporter_prefix_cmd,
 	"prefix PREFIX",
 	"Set the item name prefix\n"
@@ -312,6 +329,8 @@
 	install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_local_ip_cmd);
 	install_element(CFG_STATS_NODE, &cfg_stats_reporter_remote_ip_cmd);
 	install_element(CFG_STATS_NODE, &cfg_stats_reporter_remote_port_cmd);
+	install_element(CFG_STATS_NODE, &cfg_stats_reporter_mtu_cmd);
+	install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_mtu_cmd);
 	install_element(CFG_STATS_NODE, &cfg_stats_reporter_prefix_cmd);
 	install_element(CFG_STATS_NODE, &cfg_no_stats_reporter_prefix_cmd);
 	install_element(CFG_STATS_NODE, &cfg_stats_reporter_enable_cmd);