stats: Support statsd Multi-Metric Packets

If the MTU is given, combine several messages into a single UDP
packet until the limit is reached. Flush all reporters after the
values have been scanned.

New vty commands (node config-stats):
  mtu <100-65535>     Enable multi-metric packets and set the maximum
                      packet size (in byte)
  no mtu              Disable multi-metric packets

Note that single messages that are longer than the given MTU (minus
28 octets protocol overhead) will be dropped.

Sponsored-by: On-Waves ehf
diff --git a/src/stats.c b/src/stats.c
index ef4be82..8faed89 100644
--- a/src/stats.c
+++ b/src/stats.c
@@ -38,11 +38,13 @@
 #include <osmocom/core/stat_item.h>
 #include <osmocom/core/timer.h>
 #include <osmocom/core/statistics.h>
+#include <osmocom/core/msgb.h>
 
 /* TODO: register properly */
 #define DSTATS DLGLOBAL
 
 #define STATS_DEFAULT_INTERVAL 5 /* secs */
+#define STATS_DEFAULT_STATSD_BUFLEN 256
 
 static LLIST_HEAD(stats_reporter_list);
 static void *stats_ctx = NULL;
@@ -60,6 +62,7 @@
 static int stats_reporter_statsd_close(struct stats_reporter *srep);
 static int stats_reporter_send(struct stats_reporter *srep, const char *data,
 	int data_len);
+static int stats_reporter_send_buffer(struct stats_reporter *srep);
 
 static int update_srep_config(struct stats_reporter *srep)
 {
@@ -213,6 +216,16 @@
 	return update_srep_config(srep);
 }
 
+int stats_reporter_set_mtu(struct stats_reporter *srep, int mtu)
+{
+	if (mtu < 0)
+		return -EINVAL;
+
+	srep->mtu = mtu;
+
+	return update_srep_config(srep);
+}
+
 int stats_set_interval(int interval)
 {
 	if (interval <= 0)
@@ -261,6 +274,21 @@
 	return rc;
 }
 
+static int stats_reporter_send_buffer(struct stats_reporter *srep)
+{
+	int rc;
+
+	if (!srep->buffer || msgb_length(srep->buffer) == 0)
+		return 0;
+
+	rc = stats_reporter_send(srep,
+		(const char *)msgb_data(srep->buffer), msgb_length(srep->buffer));
+
+	msgb_trim(srep->buffer, 0);
+
+	return rc;
+}
+
 /*** statsd reporter ***/
 
 struct stats_reporter *stats_reporter_create_statsd(const char *name)
@@ -275,6 +303,7 @@
 {
 	int sock;
 	int rc;
+	int buffer_size = STATS_DEFAULT_STATSD_BUFLEN;
 
 	if (srep->fd != -1)
 		stats_reporter_statsd_close(srep);
@@ -291,6 +320,13 @@
 
 	srep->fd = sock;
 
+	if (srep->mtu > 0) {
+		buffer_size = srep->mtu - 20 /* IP */ - 8 /* UDP */;
+		srep->agg_enabled = 1;
+	}
+
+	srep->buffer = msgb_alloc(buffer_size, "stats buffer");
+
 	return 0;
 
 failed:
@@ -306,8 +342,12 @@
 	if (srep->fd == -1)
 		return -EBADF;
 
+	stats_reporter_send_buffer(srep);
+
 	rc = close(srep->fd);
 	srep->fd = -1;
+	msgb_free(srep->buffer);
+	srep->buffer = NULL;
 	return rc == -1 ? -errno : 0;
 }
 
@@ -315,30 +355,62 @@
 	const char *name1, int index1, const char *name2, int value,
 	const char *unit)
 {
-	char buf[256];
-	int nchars, rc;
+	char *buf;
+	int buf_size;
+	int nchars, rc = 0;
 	char *fmt = NULL;
+	int old_len = msgb_length(srep->buffer);
 
 	if (name1) {
 		if (index1 > 0)
-			fmt = "%1$s.%2$s.%3$d.%4$s:%5$d|%6$s";
+			fmt = "%1$s.%2$s.%6$d.%3$s:%4$d|%5$s";
 		else
-			fmt = "%1$s.%2$s.%4$s:%5$d|%6$s";
+			fmt = "%1$s.%2$s.%3$s:%4$d|%5$s";
 	} else {
-		fmt = "%1$s.%4$s:%5$d|%6$s";
+		fmt = "%1$s.%2$0.0s%3$s:%4$d|%5$s";
 	}
 	if (!srep->name_prefix)
 		fmt += 5; /* skip prefix part */
 
-	nchars = snprintf(buf, sizeof(buf), fmt,
-		srep->name_prefix, name1, index1, name2,
-		value, unit);
+	if (srep->agg_enabled) {
+		if (msgb_length(srep->buffer) > 0 &&
+			msgb_tailroom(srep->buffer) > 0)
+		{
+			msgb_put_u8(srep->buffer, '\n');
+		}
+	}
 
-	if (nchars >= sizeof(buf))
+	buf = (char *)msgb_put(srep->buffer, 0);
+	buf_size = msgb_tailroom(srep->buffer);
+
+	nchars = snprintf(buf, buf_size, fmt,
+		srep->name_prefix, name1, name2,
+		value, unit, index1);
+
+	if (nchars >= buf_size) {
 		/* Truncated */
-		return -EMSGSIZE;
+		/* Restore original buffer (without trailing LF) */
+		msgb_trim(srep->buffer, old_len);
+		/* Send it */
+		rc = stats_reporter_send_buffer(srep);
 
-	rc = stats_reporter_send(srep, buf, nchars);
+		/* Try again */
+		buf = (char *)msgb_put(srep->buffer, 0);
+		buf_size = msgb_tailroom(srep->buffer);
+
+		nchars = snprintf(buf, buf_size, fmt,
+			srep->name_prefix, name1, name2,
+			value, unit, index1);
+
+		if (nchars >= buf_size)
+			return -EMSGSIZE;
+	}
+
+	if (nchars > 0)
+		msgb_trim(srep->buffer, msgb_length(srep->buffer) + nchars);
+
+	if (!srep->agg_enabled)
+		rc = stats_reporter_send_buffer(srep);
 
 	return rc;
 }
@@ -498,11 +570,25 @@
 
 /*** main reporting function ***/
 
+static void flush_all_reporters()
+{
+	struct stats_reporter *srep;
+
+	llist_for_each_entry(srep, &stats_reporter_list, list) {
+		if (!srep->running)
+			continue;
+
+		stats_reporter_send_buffer(srep);
+	}
+}
+
 int stats_report()
 {
 	osmo_counters_for_each(handle_counter, NULL);
 	rate_ctr_for_each_group(rate_ctr_group_handler, NULL);
 	stat_item_for_each_group(stat_item_group_handler, NULL);
 
+	flush_all_reporters();
+
 	return 0;
 }