stats: Use a global index for stat item values

Currently each stat item has a separate index value which basically
counts each single value added to the item and which can be used by
a reporter to get all new values that have not been reported yet.
The drawback is, that such an index must be stored for each stat
item.

This commit introduces a global index which is incremented for each
new stat item value. This index is then stored together with the item
value. So a single stored index per reporter is sufficient to make
sure that only new values are reported.

Sponsored-by: On-Waves ehf
diff --git a/include/osmocom/core/stat_item.h b/include/osmocom/core/stat_item.h
index e166579..003c9e0 100644
--- a/include/osmocom/core/stat_item.h
+++ b/include/osmocom/core/stat_item.h
@@ -12,6 +12,13 @@
 
 struct stat_item_desc;
 
+#define STAT_ITEM_NOVALUE_ID 0
+
+struct stat_item_value {
+	int32_t id;
+	int32_t value;
+};
+
 /*! \brief data we keep for each actual value */
 struct stat_item {
 	const struct stat_item_desc *desc;
@@ -20,7 +27,7 @@
 	/*! \brief offset to the freshest value in the value fifo */
 	int16_t last_offs;
 	/*! \brief value fifo */
-	int32_t values[0];
+	struct stat_item_value values[0];
 };
 
 /*! \brief statistics value description */
@@ -61,9 +68,9 @@
 	const struct stat_item_group_desc *desc,
 	unsigned int idx);
 
-void stat_item_group_free(struct stat_item_group *grp);
+void stat_item_group_free(struct stat_item_group *statg);
 
-void stat_item_set(struct stat_item *val, int32_t value);
+void stat_item_set(struct stat_item *item, int32_t value);
 
 int stat_item_init(void *tall_ctx);
 
@@ -71,7 +78,7 @@
 	const char *name, const unsigned int idx);
 
 const struct stat_item *stat_item_get_by_name(
-	const struct stat_item_group *valg, const char *name);
+	const struct stat_item_group *statg, const char *name);
 
 /*! \brief Retrieve the next value from the stat_item object.
  * If a new value has been set, it is returned. The idx is used to decide
@@ -89,16 +96,19 @@
  *           1: one value has been taken,
  *           (1+n): n values have been skipped, one has been taken)
  */
-int stat_item_get_next(const struct stat_item *val, int32_t *idx, int32_t *value);
+int stat_item_get_next(const struct stat_item *item, int32_t *idx, int32_t *value);
 
 /*! \brief Get the last (freshest) value */
-static int32_t stat_item_get_last(const struct stat_item *val);
+static int32_t stat_item_get_last(const struct stat_item *item);
 
 /*! \brief Skip all values of the item and update idx accordingly */
-int stat_item_discard(const struct stat_item *val, int32_t *idx);
+int stat_item_discard(const struct stat_item *item, int32_t *idx);
 
-static inline int32_t stat_item_get_last(const struct stat_item *val)
+/*! \brief Skip all values of all items and update idx accordingly */
+int stat_item_discard_all(int32_t *idx);
+
+static inline int32_t stat_item_get_last(const struct stat_item *item)
 {
-	return val->values[val->last_offs];
+	return item->values[item->last_offs].value;
 }
 /*! @} */
diff --git a/src/stat_item.c b/src/stat_item.c
index 7b169ea..1e283d4 100644
--- a/src/stat_item.c
+++ b/src/stat_item.c
@@ -38,6 +38,7 @@
 #include <osmocom/core/stat_item.h>
 
 static LLIST_HEAD(stat_item_groups);
+static int32_t global_value_id = 0;
 
 static void *tall_stat_item_ctx;
 
@@ -74,7 +75,8 @@
 	for (item_idx = 0; item_idx < desc->num_items; item_idx++) {
 		unsigned int size;
 		size = sizeof(struct stat_item) +
-			sizeof(int32_t) * desc->item_desc[item_idx].num_values;
+			sizeof(struct stat_item_value) *
+			desc->item_desc[item_idx].num_values;
 		/* Align to pointer size */
 		size = (size + sizeof(void *) - 1) & ~(sizeof(void *) - 1);
 
@@ -101,8 +103,10 @@
 		item->last_value_index = -1;
 		item->desc = &desc->item_desc[item_idx];
 
-		for (i = 0; i <= item->last_offs; i++)
-			item->values[i] = desc->item_desc[item_idx].default_value;
+		for (i = 0; i <= item->last_offs; i++) {
+			item->values[i].value = desc->item_desc[item_idx].default_value;
+			item->values[i].id = STAT_ITEM_NOVALUE_ID;
+		}
 	}
 
 	llist_add(&group->list, &stat_item_groups);
@@ -123,49 +127,68 @@
 	if (item->last_offs >= item->desc->num_values)
 		item->last_offs = 0;
 
-	item->last_value_index += 1;
+	global_value_id += 1;
+	if (global_value_id == STAT_ITEM_NOVALUE_ID)
+		global_value_id += 1;
 
-	item->values[item->last_offs] = value;
+	item->values[item->last_offs].value = value;
+	item->values[item->last_offs].id    = global_value_id;
 }
 
 int stat_item_get_next(const struct stat_item *item, int32_t *next_idx,
 	int32_t *value)
 {
-	int32_t delta = item->last_value_index + 1 - *next_idx;
-	int n_values = 0;
+	const struct stat_item_value *next_value;
+	const struct stat_item_value *item_value = NULL;
+	int idx_delta;
 	int next_offs;
 
-	if (delta == 0)
+	next_offs = item->last_offs;
+	next_value = &item->values[next_offs];
+
+	while (next_value->id - *next_idx >= 0 &&
+		next_value->id != STAT_ITEM_NOVALUE_ID)
+	{
+		item_value = next_value;
+
+		next_offs -= 1;
+		if (next_offs < 0)
+			next_offs = item->desc->num_values - 1;
+		if (next_offs == item->last_offs)
+			break;
+		next_value = &item->values[next_offs];
+	}
+
+	if (!item_value)
 		/* All items have been read */
 		return 0;
 
-	if (delta < 0 || delta > item->desc->num_values) {
-		n_values = delta - item->desc->num_values;
-		delta = item->desc->num_values;
-	}
+	*value = item_value->value;
 
-	next_offs = item->last_offs + 1 - delta;
-	if (next_offs < 0)
-		next_offs += item->desc->num_values;
+	idx_delta = item_value->id + 1 - *next_idx;
 
-	*value = item->values[next_offs];
+	*next_idx = item_value->id + 1;
 
-	n_values += 1;
-	delta -= 1;
-	*next_idx = item->last_value_index + 1 - delta;
-
-	return n_values;
+	return idx_delta;
 }
 
-/*! \brief Skip all values and update idx accordingly */
+/*! \brief Skip all values of this item and update idx accordingly */
 int stat_item_discard(const struct stat_item *item, int32_t *idx)
 {
-	int discarded = item->last_value_index + 1 - *idx;
-	*idx = item->last_value_index + 1;
+	int discarded = item->values[item->last_offs].id + 1 - *idx;
+	*idx = item->values[item->last_offs].id + 1;
 
 	return discarded;
 }
 
+/*! \brief Skip all values of all items and update idx accordingly */
+int stat_item_discard_all(int32_t *idx)
+{
+	int discarded = global_value_id + 1 - *idx;
+	*idx = global_value_id + 1;
+
+	return discarded;
+}
 
 /*! \brief Initialize the stat item module */
 int stat_item_init(void *tall_ctx)
diff --git a/tests/stats/stats_test.c b/tests/stats/stats_test.c
index b414385..9da49a4 100644
--- a/tests/stats/stats_test.c
+++ b/tests/stats/stats_test.c
@@ -91,7 +91,7 @@
 	OSMO_ASSERT(value == 1);
 
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-	OSMO_ASSERT(rc == 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 1);
 
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
@@ -102,11 +102,11 @@
 		stat_item_set(statg->items[TEST_B_ITEM], 1000 + i);
 
 		rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == i);
 
 		rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == 1000 + i);
 	}
 
@@ -119,20 +119,20 @@
 		stat_item_set(statg->items[TEST_B_ITEM], 1000 + i);
 
 		rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == i-1);
 
 		rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == 1000 + i-1);
 	}
 
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-	OSMO_ASSERT(rc == 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 64);
 
 	rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
-	OSMO_ASSERT(rc == 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 1000 + 64);
 
 	/* Overrun FIFOs */
@@ -142,29 +142,29 @@
 	}
 
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-	OSMO_ASSERT(rc == 93 - 65 + 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 93);
 
 	for (i = 94; i <= 96; i++) {
 		rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == i);
 	}
 
 	rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
-	OSMO_ASSERT(rc == 90 - 65 + 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 1000 + 90);
 
 	for (i = 91; i <= 96; i++) {
 		rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
-		OSMO_ASSERT(rc == 1);
+		OSMO_ASSERT(rc > 0);
 		OSMO_ASSERT(value == 1000 + i);
 	}
 
-	/* Test Discard */
+	/* Test Discard (single item) */
 	stat_item_set(statg->items[TEST_A_ITEM], 97);
 	rc = stat_item_discard(statg->items[TEST_A_ITEM], &rd_a);
-	OSMO_ASSERT(rc == 1);
+	OSMO_ASSERT(rc > 0);
 
 	rc = stat_item_discard(statg->items[TEST_A_ITEM], &rd_a);
 	OSMO_ASSERT(rc == 0);
@@ -174,12 +174,27 @@
 
 	stat_item_set(statg->items[TEST_A_ITEM], 98);
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
-	OSMO_ASSERT(rc == 1);
+	OSMO_ASSERT(rc > 0);
 	OSMO_ASSERT(value == 98);
 
 	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
 	OSMO_ASSERT(rc == 0);
 
+	/* Test Discard (all items) */
+	stat_item_set(statg->items[TEST_A_ITEM], 99);
+	stat_item_set(statg->items[TEST_A_ITEM], 100);
+	stat_item_set(statg->items[TEST_A_ITEM], 101);
+	stat_item_set(statg->items[TEST_B_ITEM], 99);
+	stat_item_set(statg->items[TEST_B_ITEM], 100);
+
+	rc = stat_item_discard_all(&rd_a);
+	rc = stat_item_discard_all(&rd_b);
+
+	rc = stat_item_get_next(statg->items[TEST_A_ITEM], &rd_a, &value);
+	OSMO_ASSERT(rc == 0);
+	rc = stat_item_get_next(statg->items[TEST_B_ITEM], &rd_b, &value);
+	OSMO_ASSERT(rc == 0);
+
 	stat_item_group_free(statg);
 
 	sgrp2 = stat_item_get_group_by_name_idx("test.one", 0);