sms_queue: Introduce rate_ctr / stat_item

This allows us to monitor the load of the SMS queue.

Change-Id: I8c00b5b0d33695fbb5f89fd2a4c8e35c9f7df6ac
diff --git a/src/libmsc/sms_queue.c b/src/libmsc/sms_queue.c
index 63d9631..1d14ff0 100644
--- a/src/libmsc/sms_queue.c
+++ b/src/libmsc/sms_queue.c
@@ -40,9 +40,71 @@
 #include <osmocom/msc/vlr.h>
 
 #include <osmocom/core/talloc.h>
+#include <osmocom/core/utils.h>
+#include <osmocom/core/rate_ctr.h>
+#include <osmocom/core/stat_item.h>
 
 #include <osmocom/vty/vty.h>
 
+enum smsq_stat_item_idx {
+	SMSQ_STAT_SMS_RAM_PENDING,
+};
+
+static const struct osmo_stat_item_desc smsq_stat_item_desc[] = {
+	[SMSQ_STAT_SMS_RAM_PENDING]			= { "ram:pending",
+		"Number of SMSs in the in-RAM pending delivery queue" },
+};
+
+static const struct osmo_stat_item_group_desc smsq_statg_desc = {
+	"sms_queue",
+	"SMS queue",
+	OSMO_STATS_CLASS_GLOBAL,
+	ARRAY_SIZE(smsq_stat_item_desc),
+	smsq_stat_item_desc,
+};
+
+enum smsq_rate_ctr_idx {
+	SMSQ_CTR_SMS_DELIVERY_ATTEMPTS,
+	SMSQ_CTR_SMS_DELIVERY_ACK,
+	SMSQ_CTR_SMS_DELIVERY_ERR,
+	SMSQ_CTR_SMS_DELIVERY_NOMEM,
+	SMSQ_CTR_SMS_DELIVERY_TIMEOUT,
+};
+
+static const struct rate_ctr_desc smsq_ctr_desc[] = {
+	[SMSQ_CTR_SMS_DELIVERY_ATTEMPTS]	= { "delivery:attempts",
+		"Attempted MT SMS deliveries to subscriber" },
+	[SMSQ_CTR_SMS_DELIVERY_ACK]		= { "deliver:ack",
+		"Successful MT SMS delivery to subscriber" },
+	[SMSQ_CTR_SMS_DELIVERY_ERR]		= { "deliver:error",
+		"Erroneous MT SMS delivery" },
+	[SMSQ_CTR_SMS_DELIVERY_NOMEM]		= { "deliver:no_memory",
+		"Failed MT SMS delivery due to no memory on MS" },
+	[SMSQ_CTR_SMS_DELIVERY_TIMEOUT]		= { "deliver:paging_timeout",
+		"Failed MT SMS delivery due to paging timeout (MS gone?)" },
+};
+
+static const struct rate_ctr_group_desc smsq_ctrg_desc = {
+	"sms_queue",
+	"SMS queue",
+	OSMO_STATS_CLASS_GLOBAL,
+	ARRAY_SIZE(smsq_ctr_desc),
+	smsq_ctr_desc,
+};
+
+#define smsq_rate_ctr_inc(smsq, idx) \
+	rate_ctr_inc(rate_ctr_group_get_ctr((smsq)->ctrg, idx))
+#define smsq_rate_ctr_add(smsq, idx, val) \
+	rate_ctr_add(rate_ctr_group_get_ctr((smsq)->ctrg, idx), val)
+
+#define smsq_stat_item_inc(smsq, idx) \
+	osmo_stat_item_inc(osmo_stat_item_group_get_item((smsq)->statg, idx), 1)
+#define smsq_stat_item_dec(smsq, idx) \
+	osmo_stat_item_dec(osmo_stat_item_group_get_item((smsq)->statg, idx), 1)
+#define smsq_stat_item_set(smsq, idx, val) \
+	osmo_stat_item_set(osmo_stat_item_group_get_item((smsq)->statg, idx), val)
+
+
 /* One in-RAM record of a "pending SMS".  This is not the SMS itself, but merely
  * a pointer to the database record.  It holds a reference on the vlr_subscriber
  * and some counters.  While this object exists in RAM, we are regularly attempting
@@ -70,8 +132,19 @@
 
 	/* last MSISDN for which we read SMS from the database and created gsm_sms_pending records */
 	char last_msisdn[GSM23003_MSISDN_MAX_DIGITS+1];
+
+	/* statistics / counters */
+	struct osmo_stat_item_group *statg;
+	struct rate_ctr_group *ctrg;
 };
 
+/* private wrapper function to make sure we count all SMS delivery attempts */
+static void _gsm411_send_sms(struct gsm_network *net, struct vlr_subscr *vsub, struct gsm_sms *sms)
+{
+	smsq_rate_ctr_inc(net->sms_queue, SMSQ_CTR_SMS_DELIVERY_ATTEMPTS);
+	gsm411_send_sms(net, vsub, sms);
+}
+
 static int sms_subscr_cb(unsigned int, unsigned int, void *, void *);
 static int sms_sms_cb(unsigned int, unsigned int, void *, void *);
 
@@ -176,12 +249,13 @@
 
 	sms_pending_free(pending);
 	smsq->pending -= 1;
+	smsq_stat_item_dec(smsq, SMSQ_STAT_SMS_RAM_PENDING);
 }
 
 /* Resend all SMS that are scheduled for a resend. This is done to
  * avoid an immediate failure.  This iterates over all the (in RAM)
  * pending_sms records, checks for resend == true, reads them from the
- * DB and attempts to send them via gsm411_send_sms() */
+ * DB and attempts to send them via _gsm411_send_sms() */
 static void sms_resend_pending(void *_data)
 {
 	struct gsm_sms_pending *pending, *tmp;
@@ -198,10 +272,11 @@
 		if (!sms) {
 			sms_pending_free(pending);
 			smsq->pending -= 1;
+			smsq_stat_item_dec(smsq, SMSQ_STAT_SMS_RAM_PENDING);
 			sms_queue_trigger(smsq);
 		} else {
 			pending->resend = 0;
-			gsm411_send_sms(smsq->network, sms->receiver, sms);
+			_gsm411_send_sms(smsq->network, sms->receiver, sms);
 		}
 	}
 }
@@ -333,8 +408,9 @@
 
 		attempted += 1;
 		smsq->pending += 1;
+		smsq_stat_item_inc(smsq, SMSQ_STAT_SMS_RAM_PENDING);
 		llist_add_tail(&pending->entry, &smsq->pending_sms);
-		gsm411_send_sms(smsq->network, sms->receiver, sms);
+		_gsm411_send_sms(smsq->network, sms->receiver, sms);
 	} while (attempted < attempts && rounds < 1000);
 
 	LOGP(DLSMS, LOGL_DEBUG, "SMSqueue added %d messages in %d rounds\n", attempted, rounds);
@@ -372,8 +448,9 @@
 	}
 
 	smsq->pending += 1;
+	smsq_stat_item_inc(smsq, SMSQ_STAT_SMS_RAM_PENDING);
 	llist_add_tail(&pending->entry, &smsq->pending_sms);
-	gsm411_send_sms(smsq->network, sms->receiver, sms);
+	_gsm411_send_sms(smsq->network, sms->receiver, sms);
 	return;
 
 no_pending_sms:
@@ -402,8 +479,13 @@
 		return -1;
 	}
 
-	osmo_signal_register_handler(SS_SUBSCR, sms_subscr_cb, network);
-	osmo_signal_register_handler(SS_SMS, sms_sms_cb, network);
+	sms->statg = osmo_stat_item_group_alloc(sms, &smsq_statg_desc, 0);
+	if (!sms->statg)
+		goto err_free;
+
+	sms->ctrg = rate_ctr_group_alloc(sms, &smsq_ctrg_desc, 0);
+	if (!sms->ctrg)
+		goto err_statg;
 
 	network->sms_queue = sms;
 	INIT_LLIST_HEAD(&sms->pending_sms);
@@ -413,9 +495,19 @@
 	osmo_timer_setup(&sms->push_queue, sms_submit_pending, sms);
 	osmo_timer_setup(&sms->resend_pending, sms_resend_pending, sms);
 
+	osmo_signal_register_handler(SS_SUBSCR, sms_subscr_cb, network);
+	osmo_signal_register_handler(SS_SMS, sms_sms_cb, network);
+
 	sms_submit_pending(sms);
 
 	return 0;
+
+err_statg:
+	osmo_stat_item_group_free(sms->statg);
+err_free:
+	talloc_free(sms);
+
+	return -ENOMEM;
 }
 
 /* call-back: Given subscriber is now ready for short messages. */
@@ -452,7 +544,7 @@
 	if (!sms)
 		return -1;
 
-	gsm411_send_sms(net, vsub, sms);
+	_gsm411_send_sms(net, vsub, sms);
 	return 0;
 }
 
@@ -500,8 +592,10 @@
 
 	switch (signal) {
 	case S_SMS_DELIVERED:
+		smsq_rate_ctr_inc(network->sms_queue, SMSQ_CTR_SMS_DELIVERY_ACK);
 		/* Remember the subscriber and clear the pending entry */
 		network->sms_queue->pending -= 1;
+		smsq_stat_item_dec(network->sms_queue, SMSQ_STAT_SMS_RAM_PENDING);
 		vsub = pending->vsub;
 		vlr_subscr_get(vsub, __func__);
 		db_sms_delete_sent_message_by_id(pending->sms_id);
@@ -511,7 +605,9 @@
 		vlr_subscr_put(vsub, __func__);
 		break;
 	case S_SMS_MEM_EXCEEDED:
+		smsq_rate_ctr_inc(network->sms_queue, SMSQ_CTR_SMS_DELIVERY_NOMEM);
 		network->sms_queue->pending -= 1;
+		smsq_stat_item_dec(network->sms_queue, SMSQ_STAT_SMS_RAM_PENDING);
 		sms_pending_free(pending);
 		sms_queue_trigger(network->sms_queue);
 		break;
@@ -527,10 +623,12 @@
 		 * should flag the SMS as bad.
 		 */
 		if (sig_sms->paging_result) {
+			smsq_rate_ctr_inc(network->sms_queue, SMSQ_CTR_SMS_DELIVERY_ERR);
 			/* BAD SMS? */
 			db_sms_inc_deliver_attempts(sig_sms->sms);
 			sms_pending_failed(pending, 0);
 		} else {
+			smsq_rate_ctr_inc(network->sms_queue, SMSQ_CTR_SMS_DELIVERY_TIMEOUT);
 			sms_pending_failed(pending, 1);
 		}
 		break;
@@ -587,5 +685,6 @@
 	}
 
 	smsq->pending = 0;
+	smsq_stat_item_set(smsq, SMSQ_STAT_SMS_RAM_PENDING, 0);
 	return 0;
 }