Merge branch 'zecke/sms-queue'
diff --git a/openbsc/contrib/sms/fill-hlr.st b/openbsc/contrib/sms/fill-hlr.st
new file mode 100644
index 0000000..da0643e
--- /dev/null
+++ b/openbsc/contrib/sms/fill-hlr.st
@@ -0,0 +1,66 @@
+"I create output for some simple SQL statements for the HLR db"
+
+
+Eval [
+
+"Create tables if they don't exist"
+Transcript show: 'CREATE TABLE SMS (
+                    id INTEGER PRIMARY KEY AUTOINCREMENT,
+                    created TIMESTAMP NOT NULL,
+                    sent TIMESTAMP,
+                    sender_id INTEGER NOT NULL,
+                    receiver_id INTEGER NOT NULL,
+                    deliver_attempts INTEGER NOT NULL DEFAULT 0,
+                    valid_until TIMESTAMP,
+                    reply_path_req INTEGER NOT NULL,
+                    status_rep_req INTEGER NOT NULL,
+                    protocol_id INTEGER NOT NULL,
+                    data_coding_scheme INTEGER NOT NULL,
+                    ud_hdr_ind INTEGER NOT NULL,
+                    dest_addr TEXT,
+                    user_data BLOB,
+                    header BLOB,
+                    text TEXT);'; nl;
+	     show: 'CREATE TABLE Subscriber (
+                    id INTEGER PRIMARY KEY AUTOINCREMENT,
+                    created TIMESTAMP NOT NULL,
+                    updated TIMESTAMP NOT NULL,
+                    imsi NUMERIC UNIQUE NOT NULL,
+                    name TEXT,
+                    extension TEXT UNIQUE,
+                    authorized INTEGER NOT NULL DEFAULT 0,
+                    tmsi TEXT UNIQUE,
+                    lac INTEGER NOT NULL DEFAULT 0);'; nl.
+
+"Create some dummy subscribers"
+num_sub := 1000.
+num_sms := 30.
+lac := 1.
+
+Transcript show: 'BEGIN;'; nl.
+
+1 to: num_sub do: [:each |
+   Transcript show: 'INSERT INTO Subscriber
+                        (imsi, created, updated, authorized, lac, extension)
+                        VALUES
+                        (%1, datetime(''now''), datetime(''now''), 1, %2, %3);' %
+                        {(274090000000000 + each). lac. each}; nl.
+].
+
+1 to: num_sms do: [:sms |
+    1 to: num_sub do: [:sub |
+        Transcript show: 'INSERT INTO SMS
+                            (created, sender_id, receiver_id, valid_until,
+                             reply_path_req, status_rep_req, protocol_id,
+                             data_coding_scheme, ud_hdr_ind, dest_addr,
+                             text) VALUES
+                            (datetime(''now''), 1, %1, ''2222-2-2'',
+                             0, 0, 0,
+                             0, 0, ''123456'',
+                             ''abc'');' % {sub}; nl.
+    ]
+].
+
+Transcript show: 'COMMIT;'; nl.
+
+]
diff --git a/openbsc/contrib/sms/hlr-query.st b/openbsc/contrib/sms/hlr-query.st
new file mode 100644
index 0000000..bd3f97a
--- /dev/null
+++ b/openbsc/contrib/sms/hlr-query.st
@@ -0,0 +1,10 @@
+"Query for one SMS"
+
+Eval [
+1 to: 100 do: [:each |
+    Transcript show: 'SELECT SMS.* FROM SMS
+                        JOIN Subscriber ON SMS.receiver_id = Subscriber.id
+                            WHERE SMS.id >= 1 AND SMS.sent IS NULL AND Subscriber.lac > 0
+                            ORDER BY SMS.id LIMIT 1;'; nl.
+].
+]
diff --git a/openbsc/contrib/sms/sqlite-probe.tap.d b/openbsc/contrib/sms/sqlite-probe.tap.d
new file mode 100644
index 0000000..e75cdfc
--- /dev/null
+++ b/openbsc/contrib/sms/sqlite-probe.tap.d
@@ -0,0 +1,5 @@
+probe process("/usr/lib/libsqlite3.so.0.8.6").function("sqlite3_get_table")
+{
+  a = user_string($zSql);
+  printf("sqlite3_get_table called '%s'\n", a);
+}
diff --git a/openbsc/include/openbsc/Makefile.am b/openbsc/include/openbsc/Makefile.am
index 3dffe4b..6cf8c3b 100644
--- a/openbsc/include/openbsc/Makefile.am
+++ b/openbsc/include/openbsc/Makefile.am
@@ -11,7 +11,7 @@
 		gb_proxy.h gprs_sgsn.h gsm_04_08_gprs.h sgsn.h \
 		gprs_ns_frgre.h auth.h osmo_msc.h bsc_msc.h bsc_nat.h \
 		osmo_bsc_rf.h osmo_bsc.h network_listen.h bsc_nat_sccp.h \
-		osmo_msc_data.h osmo_bsc_grace.h
+		osmo_msc_data.h osmo_bsc_grace.h sms_queue.h
 
 openbsc_HEADERS = gsm_04_08.h meas_rep.h bsc_api.h
 openbscdir = $(includedir)/openbsc
diff --git a/openbsc/include/openbsc/db.h b/openbsc/include/openbsc/db.h
index ba7c5a7..5949f9c 100644
--- a/openbsc/include/openbsc/db.h
+++ b/openbsc/include/openbsc/db.h
@@ -63,8 +63,9 @@
 
 /* SMS store-and-forward */
 int db_sms_store(struct gsm_sms *sms);
+struct gsm_sms *db_sms_get(struct gsm_network *net, unsigned long long id);
 struct gsm_sms *db_sms_get_unsent(struct gsm_network *net, unsigned long long min_id);
-struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id);
+struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id, unsigned int failed);
 struct gsm_sms *db_sms_get_unsent_for_subscr(struct gsm_subscriber *subscr);
 int db_sms_mark_sent(struct gsm_sms *sms);
 int db_sms_inc_deliver_attempts(struct gsm_sms *sms);
diff --git a/openbsc/include/openbsc/gsm_04_11.h b/openbsc/include/openbsc/gsm_04_11.h
index d62a392..2078778 100644
--- a/openbsc/include/openbsc/gsm_04_11.h
+++ b/openbsc/include/openbsc/gsm_04_11.h
@@ -33,5 +33,7 @@
 void _gsm411_sms_trans_free(struct gsm_trans *trans);
 int gsm411_send_sms_subscr(struct gsm_subscriber *subscr,
 			   struct gsm_sms *sms);
+int gsm411_send_sms(struct gsm_subscriber_connection *conn,
+		    struct gsm_sms *sms);
 void gsm411_sapi_n_reject(struct gsm_subscriber_connection *conn);
 #endif
diff --git a/openbsc/include/openbsc/gsm_data.h b/openbsc/include/openbsc/gsm_data.h
index f42ae1b..4c87f83 100644
--- a/openbsc/include/openbsc/gsm_data.h
+++ b/openbsc/include/openbsc/gsm_data.h
@@ -5,6 +5,7 @@
 
 struct osmo_msc_data;
 struct osmo_bsc_sccp_con;
+struct gsm_sms_queue;
 
 enum gsm_phys_chan_config {
 	GSM_PCHAN_NONE,
@@ -83,6 +84,7 @@
 	GSM_PAGING_SUCCEEDED,
 	GSM_PAGING_EXPIRED,
 	GSM_PAGING_OOM,
+	GSM_PAGING_BUSY,
 };
 
 enum bts_gprs_mode {
@@ -733,6 +735,7 @@
 
 	/* subscriber related features */
 	int keep_subscr;
+	struct gsm_sms_queue *sms_queue;
 };
 
 #define SMS_HDR_SIZE	128
diff --git a/openbsc/include/openbsc/paging.h b/openbsc/include/openbsc/paging.h
index 68d8a6a..a78eb8e 100644
--- a/openbsc/include/openbsc/paging.h
+++ b/openbsc/include/openbsc/paging.h
@@ -46,6 +46,9 @@
 	/* Timer 3113: how long do we try to page? */
 	struct timer_list T3113;
 
+	/* How often did we ask the BTS to page? */
+	int attempts;
+
 	/* callback to be called in case paging completes */
 	gsm_cbfn *cbfn;
 	void *cbfn_param;
diff --git a/openbsc/include/openbsc/signal.h b/openbsc/include/openbsc/signal.h
index 40766db..dfb837d 100644
--- a/openbsc/include/openbsc/signal.h
+++ b/openbsc/include/openbsc/signal.h
@@ -61,6 +61,7 @@
 	S_SMS_DELIVERED,	/* A SMS has been successfully delivered to a MS */
 	S_SMS_SMMA,		/* A MS tells us it has more space available */
 	S_SMS_MEM_EXCEEDED,	/* A MS tells us it has no more space available */
+	S_SMS_UNKNOWN_ERROR,	/* A MS tells us it has an error */
 };
 
 /* SS_ABISIP signals */
@@ -140,6 +141,8 @@
 	struct gsm_subscriber *subscr;
 	struct gsm_bts *bts;
 
+	int paging_result;
+
 	/* NULL in case the paging didn't work */
 	struct gsm_subscriber_connection *conn;
 };
@@ -170,6 +173,15 @@
 	struct gsm_network *net;
 };
 
+struct sms_signal_data {
+	/* The transaction where this occured */
+	struct gsm_trans *trans;
+	/* Can be NULL for SMMA */
+	struct gsm_sms *sms;
+	/* int paging result. Only the ones with > 0 */
+	int paging_result;
+};
+
 enum signal_ns {
 	S_NS_RESET,
 	S_NS_BLOCK,
diff --git a/openbsc/include/openbsc/sms_queue.h b/openbsc/include/openbsc/sms_queue.h
new file mode 100644
index 0000000..2a8bd58
--- /dev/null
+++ b/openbsc/include/openbsc/sms_queue.h
@@ -0,0 +1,17 @@
+#ifndef SMS_QUEUE_H
+#define SMS_QUEUE_H
+
+struct gsm_network;
+struct gsm_sms_queue;
+struct vty;
+
+int sms_queue_start(struct gsm_network *, int in_flight);
+int sms_queue_trigger(struct gsm_sms_queue *);
+
+/* vty helper functions */
+int sms_queue_stats(struct gsm_sms_queue *, struct vty* vty);
+int sms_queue_set_max_pending(struct gsm_sms_queue *, int max);
+int sms_queue_set_max_failure(struct gsm_sms_queue *, int fail);
+int sms_queue_clear(struct gsm_sms_queue *);
+
+#endif
diff --git a/openbsc/src/Makefile.am b/openbsc/src/Makefile.am
index 2766d93..4f3bf9e 100644
--- a/openbsc/src/Makefile.am
+++ b/openbsc/src/Makefile.am
@@ -35,7 +35,7 @@
 
 libmgcp_a_SOURCES = mgcp/mgcp_protocol.c mgcp/mgcp_network.c mgcp/mgcp_vty.c
 
-bsc_hack_SOURCES = bsc_hack.c bsc_init.c bsc_vty.c vty_interface_layer3.c
+bsc_hack_SOURCES = bsc_hack.c bsc_init.c bsc_vty.c vty_interface_layer3.c sms_queue.c
 bsc_hack_LDADD = libmsc.a libbsc.a libvty.a libmsc.a \
 		-ldl -ldbi $(LIBCRYPT) $(LIBOSMOVTY_LIBS)
 
diff --git a/openbsc/src/bsc_hack.c b/openbsc/src/bsc_hack.c
index 4efc782..54c8942 100644
--- a/openbsc/src/bsc_hack.c
+++ b/openbsc/src/bsc_hack.c
@@ -274,6 +274,10 @@
 	signal(SIGUSR2, &signal_handler);
 	signal(SIGPIPE, SIG_IGN);
 
+	/* start the SMS queue */
+	if (sms_queue_start(bsc_gsmnet, 20) != 0)
+		return -1;
+
 	if (daemonize) {
 		rc = osmo_daemonize();
 		if (rc < 0) {
diff --git a/openbsc/src/db.c b/openbsc/src/db.c
index ec1e72c..fd5dd81 100644
--- a/openbsc/src/db.c
+++ b/openbsc/src/db.c
@@ -1063,6 +1063,28 @@
 	return sms;
 }
 
+struct gsm_sms *db_sms_get(struct gsm_network *net, unsigned long long id)
+{
+	dbi_result result;
+	struct gsm_sms *sms;
+
+	result = dbi_conn_queryf(conn,
+		"SELECT * FROM SMS WHERE SMS.id = %llu", id);
+	if (!result)
+		return NULL;
+
+	if (!dbi_result_next_row(result)) {
+		dbi_result_free(result);
+		return NULL;
+	}
+
+	sms = sms_from_result(net, result);
+
+	dbi_result_free(result);
+
+	return sms;
+}
+
 /* retrieve the next unsent SMS with ID >= min_id */
 struct gsm_sms *db_sms_get_unsent(struct gsm_network *net, unsigned long long min_id)
 {
@@ -1092,7 +1114,9 @@
 	return sms;
 }
 
-struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net, unsigned long long min_subscr_id)
+struct gsm_sms *db_sms_get_unsent_by_subscr(struct gsm_network *net,
+					    unsigned long long min_subscr_id,
+					    unsigned int failed)
 {
 	dbi_result result;
 	struct gsm_sms *sms;
@@ -1102,9 +1126,9 @@
 			"FROM SMS JOIN Subscriber ON "
 				"SMS.receiver_id = Subscriber.id "
 			"WHERE SMS.receiver_id >= %llu AND SMS.sent IS NULL "
-				"AND Subscriber.lac > 0 "
+				"AND Subscriber.lac > 0 AND SMS.deliver_attempts < %u "
 			"ORDER BY SMS.receiver_id, SMS.id LIMIT 1",
-		min_subscr_id);
+		min_subscr_id, failed);
 	if (!result)
 		return NULL;
 
diff --git a/openbsc/src/gsm_04_08.c b/openbsc/src/gsm_04_08.c
index 42dd1b7..92da6cc 100644
--- a/openbsc/src/gsm_04_08.c
+++ b/openbsc/src/gsm_04_08.c
@@ -1409,6 +1409,7 @@
 			gsm48_cc_tx_setup(transt, &transt->cc.msg);
 			break;
 		case GSM_PAGING_EXPIRED:
+		case GSM_PAGING_BUSY:
 			DEBUGP(DCC, "Paging subscr %s expired!\n",
 				subscr->extension);
 			/* Temporarily out of order */
diff --git a/openbsc/src/gsm_04_11.c b/openbsc/src/gsm_04_11.c
index c46f772..25fe467 100644
--- a/openbsc/src/gsm_04_11.c
+++ b/openbsc/src/gsm_04_11.c
@@ -101,7 +101,6 @@
 	{ 0, NULL }
 };
 
-static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms);
 
 struct gsm_sms *sms_alloc(void)
 {
@@ -119,6 +118,18 @@
 	talloc_free(sms);
 }
 
+static void send_signal(int sig_no,
+			struct gsm_trans *trans,
+			struct gsm_sms *sms,
+			int paging_result)
+{
+	struct sms_signal_data sig;
+	sig.trans = trans;
+	sig.sms = sms;
+	sig.paging_result = paging_result;
+	dispatch_signal(SS_SMS, sig_no, &sig);
+}
+
 /*
  * This should be called whenever all SMS to a given subscriber
  * on a given connection has been sent. This will inform the higher
@@ -422,7 +433,7 @@
 		return GSM411_RP_CAUSE_MO_NET_OUT_OF_ORDER;
 	}
 	/* dispatch a signal to tell higher level about it */
-	dispatch_signal(SS_SMS, S_SMS_SUBMITTED, gsms);
+	send_signal(S_SMS_SUBMITTED, NULL, gsms, 0);
 
 	return 0;
 }
@@ -614,7 +625,8 @@
 
 	gsms->validity_minutes = gsm340_validity_period(sms_vpf, sms_vp);
 
-	dispatch_signal(SS_SMS, 0, gsms);
+	/* FIXME: This looks very wrong */
+	send_signal(0, NULL, gsms, 0);
 
 	/* determine gsms->receiver based on dialled number */
 	gsms->receiver = subscr_get_by_extension(conn->bts->network, gsms->dest_addr);
@@ -754,7 +766,7 @@
 	/* mark this SMS as sent in database */
 	db_sms_mark_sent(sms);
 
-	dispatch_signal(SS_SMS, S_SMS_DELIVERED, sms);
+	send_signal(S_SMS_DELIVERED, trans, sms, 0);
 
 	sms_free(sms);
 	trans->sms.sms = NULL;
@@ -807,12 +819,14 @@
 
 	if (cause == GSM411_RP_CAUSE_MT_MEM_EXCEEDED) {
 		/* MS has not enough memory to store the message.  We need
-		 * to store this in our database and wati for a SMMA message */
+		 * to store this in our database and wait for a SMMA message */
 		/* FIXME */
-		dispatch_signal(SS_SMS, S_SMS_MEM_EXCEEDED, trans->subscr);
+		send_signal(S_SMS_MEM_EXCEEDED, trans, sms, 0);
 		counter_inc(net->stats.sms.rp_err_mem);
-	} else
+	} else {
+		send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
 		counter_inc(net->stats.sms.rp_err_other);
+	}
 
 	sms_free(sms);
 	trans->sms.sms = NULL;
@@ -832,7 +846,7 @@
 	/* MS tells us that it has memory for more SMS, we need
 	 * to check if we have any pending messages for it and then
 	 * transfer those */
-	dispatch_signal(SS_SMS, S_SMS_SMMA, trans->subscr);
+	send_signal(S_SMS_SMMA, trans, NULL, 0);
 
 	/* check for more messages for this subscriber */
 	sms = db_sms_get_unsent_for_subscr(trans->subscr);
@@ -1032,7 +1046,7 @@
 /* Take a SMS in gsm_sms structure and send it through an already
  * existing lchan. We also assume that the caller ensured this lchan already
  * has a SAPI3 RLL connection! */
-static int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms)
+int gsm411_send_sms(struct gsm_subscriber_connection *conn, struct gsm_sms *sms)
 {
 	struct msgb *msg = gsm411_msgb_alloc();
 	struct gsm_trans *trans;
@@ -1044,6 +1058,7 @@
 	transaction_id = trans_assign_trans_id(conn->subscr, GSM48_PDISC_SMS, 0);
 	if (transaction_id == -1) {
 		LOGP(DSMS, LOGL_ERROR, "No available transaction ids\n");
+		send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, 0);
 		sms_free(sms);
 		return -EBUSY;
 	}
@@ -1055,6 +1070,7 @@
 			    transaction_id, new_callref++);
 	if (!trans) {
 		LOGP(DSMS, LOGL_ERROR, "No memory for trans\n");
+		send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, 0);
 		sms_free(sms);
 		/* FIXME: send some error message */
 		return -ENOMEM;
@@ -1088,6 +1104,7 @@
 	/* generate the 03.40 TPDU */
 	rc = gsm340_gen_tpdu(msg, sms);
 	if (rc < 0) {
+		send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
 		trans_free(trans);
 		sms_free(sms);
 		msgb_free(msg);
@@ -1126,6 +1143,8 @@
 		break;
 	case GSM_PAGING_EXPIRED:
 	case GSM_PAGING_OOM:
+	case GSM_PAGING_BUSY:
+		send_signal(S_SMS_UNKNOWN_ERROR, NULL, sms, event);
 		sms_free(sms);
 		rc = -ETIMEDOUT;
 		break;
@@ -1154,36 +1173,11 @@
 	return 0;
 }
 
-static int subscr_sig_cb(unsigned int subsys, unsigned int signal,
-			 void *handler_data, void *signal_data)
-{
-	struct gsm_subscriber *subscr;
-	struct gsm_subscriber_connection *conn;
-	struct gsm_sms *sms;
-
-	switch (signal) {
-	case S_SUBSCR_ATTACHED:
-		/* A subscriber has attached. Check if there are
-		 * any pending SMS for him to be delivered */
-		subscr = signal_data;
-		conn = connection_for_subscr(subscr);
-		if (!conn)
-			break;
-		sms = db_sms_get_unsent_for_subscr(subscr);
-		if (!sms)
-			break;
-		gsm411_send_sms(conn, sms);
-		break;
-	default:
-		break;
-	}
-	return 0;
-}
-
 void _gsm411_sms_trans_free(struct gsm_trans *trans)
 {
 	if (trans->sms.sms) {
 		LOGP(DSMS, LOGL_ERROR, "Transaction contains SMS.\n");
+		send_signal(S_SMS_UNKNOWN_ERROR, trans, trans->sms.sms, 0);
 		sms_free(trans->sms.sms);
 		trans->sms.sms = NULL;
 	}
@@ -1203,6 +1197,7 @@
 				continue;
 			}
 
+			send_signal(S_SMS_UNKNOWN_ERROR, trans, sms, 0);
 			sms_free(sms);
 			trans->sms.sms = NULL;
 			trans_free(trans);
@@ -1211,7 +1206,3 @@
 	gsm411_release_conn(conn);
 }
 
-static __attribute__((constructor)) void on_dso_load_sms(void)
-{
-	register_signal_handler(SS_SUBSCR, subscr_sig_cb, NULL);
-}
diff --git a/openbsc/src/gsm_subscriber.c b/openbsc/src/gsm_subscriber.c
index f066eca..dc7d638 100644
--- a/openbsc/src/gsm_subscriber.c
+++ b/openbsc/src/gsm_subscriber.c
@@ -84,6 +84,7 @@
 	sig_data.subscr = subscr;
 	sig_data.bts	= conn ? conn->bts : NULL;
 	sig_data.conn	= conn;
+	sig_data.paging_result = event;
 	dispatch_signal(
 		SS_PAGING,
 		event == GSM_PAGING_SUCCEEDED ?
@@ -169,7 +170,7 @@
 
 	/* paging failed, quit now */
 	if (rc <= 0) {
-		subscr_paging_cb(GSM_HOOK_RR_PAGING, GSM_PAGING_EXPIRED,
+		subscr_paging_cb(GSM_HOOK_RR_PAGING, GSM_PAGING_BUSY,
 				 NULL, NULL, subscr);
 	}
 }
@@ -293,6 +294,8 @@
 
 int subscr_update(struct gsm_subscriber *s, struct gsm_bts *bts, int reason)
 {
+	int rc;
+
 	/* FIXME: Migrate pending requests from one BSC to another */
 	switch (reason) {
 	case GSM_SUBSCRIBER_UPDATE_ATTACHED:
@@ -301,6 +304,8 @@
 		s->lac = bts->location_area_code;
 		LOGP(DMM, LOGL_INFO, "Subscriber %s ATTACHED LAC=%u\n",
 			subscr_name(s), s->lac);
+		rc = db_sync_subscriber(s);
+		db_subscriber_update(s);
 		dispatch_signal(SS_SUBSCR, S_SUBSCR_ATTACHED, s);
 		break;
 	case GSM_SUBSCRIBER_UPDATE_DETACHED:
@@ -308,14 +313,19 @@
 		if (bts->location_area_code == s->lac)
 			s->lac = GSM_LAC_RESERVED_DETACHED;
 		LOGP(DMM, LOGL_INFO, "Subscriber %s DETACHED\n", subscr_name(s));
+		rc = db_sync_subscriber(s);
+		db_subscriber_update(s);
 		dispatch_signal(SS_SUBSCR, S_SUBSCR_DETACHED, s);
 		break;
 	default:
 		fprintf(stderr, "subscr_update with unknown reason: %d\n",
 			reason);
+		rc = db_sync_subscriber(s);
+		db_subscriber_update(s);
 		break;
 	};
-	return db_sync_subscriber(s);
+
+	return rc;
 }
 
 void subscr_update_from_db(struct gsm_subscriber *sub)
diff --git a/openbsc/src/paging.c b/openbsc/src/paging.c
index 06e6860..bfe3020 100644
--- a/openbsc/src/paging.c
+++ b/openbsc/src/paging.c
@@ -209,6 +209,7 @@
 	/* handle the paging request now */
 	page_ms(request);
 	paging_bts->available_slots--;
+	request->attempts++;
 
 	/* take the current and add it to the back */
 	llist_del(&request->entry);
@@ -253,6 +254,7 @@
 	struct gsm_paging_request *req = (struct gsm_paging_request *)data;
 	void *cbfn_param;
 	gsm_cbfn *cbfn;
+	int msg;
 
 	LOGP(DPAG, LOGL_INFO, "T3113 expired for request %p (%s)\n",
 		req, req->subscr->imsi);
@@ -261,10 +263,15 @@
 	counter_inc(req->bts->network->stats.paging.expired);
 	cbfn_param = req->cbfn_param;
 	cbfn = req->cbfn;
+
+	/* did we ever manage to page the subscriber */
+	msg = req->attempts > 0 ? GSM_PAGING_EXPIRED : GSM_PAGING_BUSY;
+
+	/* destroy it now. Do not access req afterwards */
 	paging_remove_request(&req->bts->paging, req);
 
 	if (cbfn)
-		cbfn(GSM_HOOK_RR_PAGING, GSM_PAGING_EXPIRED, NULL, NULL,
+		cbfn(GSM_HOOK_RR_PAGING, msg, NULL, NULL,
 			  cbfn_param);
 }
 
diff --git a/openbsc/src/silent_call.c b/openbsc/src/silent_call.c
index 29ade00..2eb37ba 100644
--- a/openbsc/src/silent_call.c
+++ b/openbsc/src/silent_call.c
@@ -60,6 +60,7 @@
 		dispatch_signal(SS_SCALL, S_SCALL_SUCCESS, &sigdata);
 		break;
 	case GSM_PAGING_EXPIRED:
+	case GSM_PAGING_BUSY:
 		DEBUGP(DSMS, "expired\n");
 		dispatch_signal(SS_SCALL, S_SCALL_EXPIRED, &sigdata);
 		break;
diff --git a/openbsc/src/sms_queue.c b/openbsc/src/sms_queue.c
new file mode 100644
index 0000000..dda8ce9
--- /dev/null
+++ b/openbsc/src/sms_queue.c
@@ -0,0 +1,478 @@
+/* SMS queue to continously attempt to deliver SMS */
+/*
+ * (C) 2010 by Holger Hans Peter Freyther <zecke@selfish.org>
+ * All Rights Reserved
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ */
+
+/**
+ * The difficulty of such a queue is to send a lot of SMS without
+ * overloading the paging subsystem and the database and other users
+ * of the MSC. To make the best use we would need to know the number
+ * of pending paging requests, then throttle the number of SMS we
+ * want to send and such.
+ * We will start with a very simple SMS Queue and then try to speed
+ * things up by collecting data from other parts of the system.
+ */
+
+#include <openbsc/sms_queue.h>
+#include <openbsc/chan_alloc.h>
+#include <openbsc/db.h>
+#include <openbsc/debug.h>
+#include <openbsc/gsm_data.h>
+#include <openbsc/gsm_04_11.h>
+#include <openbsc/gsm_subscriber.h>
+#include <openbsc/signal.h>
+
+#include <osmocore/talloc.h>
+
+#include <osmocom/vty/vty.h>
+
+/*
+ * One pending SMS that we wait for.
+ */
+struct gsm_sms_pending {
+	struct llist_head entry;
+
+	struct gsm_subscriber *subscr;
+	unsigned long long sms_id;
+	int failed_attempts;
+	int resend;
+};
+
+struct gsm_sms_queue {
+	struct timer_list resend_pending;
+	struct timer_list push_queue;
+	struct gsm_network *network;
+	int max_fail;
+	int max_pending;
+	int pending;
+
+	struct llist_head pending_sms;
+	unsigned long long last_subscr_id;
+};
+
+static int sms_subscr_cb(unsigned int, unsigned int, void *, void *);
+static int sms_sms_cb(unsigned int, unsigned int, void *, void *);
+
+static struct gsm_sms_pending *sms_find_pending(struct gsm_sms_queue *smsq,
+						struct gsm_sms *sms)
+{
+	struct gsm_sms_pending *pending;
+
+	llist_for_each_entry(pending, &smsq->pending_sms, entry) {
+		if (pending->sms_id == sms->id)
+			return pending;
+	}
+
+	return NULL;
+}
+
+static int sms_is_in_pending(struct gsm_sms_queue *smsq, struct gsm_sms *sms)
+{
+	return sms_find_pending(smsq, sms) != NULL;
+}
+
+static int sms_subscriber_is_pending(struct gsm_sms_queue *smsq,
+				     struct gsm_subscriber *subscr)
+{
+	struct gsm_sms_pending *pending;
+
+	llist_for_each_entry(pending, &smsq->pending_sms, entry) {
+		if (pending->subscr == subscr)
+			return 1;
+	}
+
+	return 0;
+}
+
+static struct gsm_sms_pending *sms_pending_from(struct gsm_sms_queue *smsq,
+						struct gsm_sms *sms)
+{
+	struct gsm_sms_pending *pending;
+
+	pending = talloc_zero(smsq, struct gsm_sms_pending);
+	if (!pending)
+		return NULL;
+
+	pending->subscr = subscr_get(sms->receiver);
+	pending->sms_id = sms->id;
+	return pending;
+}
+
+static void sms_pending_free(struct gsm_sms_pending *pending)
+{
+	subscr_put(pending->subscr);
+	llist_del(&pending->entry);
+	talloc_free(pending);
+}
+
+static void sms_pending_resend(struct gsm_sms_pending *pending)
+{
+	struct gsm_sms_queue *smsq;
+	LOGP(DSMS, LOGL_DEBUG,
+	     "Scheduling resend of SMS %llu.\n", pending->sms_id);
+
+	pending->resend = 1;
+
+	smsq = pending->subscr->net->sms_queue;
+	if (bsc_timer_pending(&smsq->resend_pending))
+		return;
+
+	bsc_schedule_timer(&smsq->resend_pending, 1, 0);
+}
+
+static void sms_pending_failed(struct gsm_sms_pending *pending, int paging_error)
+{
+	struct gsm_sms_queue *smsq;
+
+	LOGP(DSMS, LOGL_NOTICE, "Sending SMS %llu failed %d times.\n",
+	     pending->sms_id, pending->failed_attempts);
+
+	smsq = pending->subscr->net->sms_queue;
+	if (++pending->failed_attempts < smsq->max_fail)
+		return sms_pending_resend(pending);
+
+	if (paging_error) {
+		LOGP(DSMS, LOGL_NOTICE,
+		     "Subscriber %llu is not reachable. Setting LAC=0.\n", pending->subscr->id);
+		pending->subscr->lac = GSM_LAC_RESERVED_DETACHED;
+		db_sync_subscriber(pending->subscr);
+
+		/* Workaround a failing sync */
+		db_subscriber_update(pending->subscr);
+	}
+
+	sms_pending_free(pending);
+	smsq->pending -= 1;
+	sms_queue_trigger(smsq);
+}
+
+/*
+ * Resend all SMS that are scheduled for a resend. This is done to
+ * avoid an immediate failure.
+ */
+static void sms_resend_pending(void *_data)
+{
+	struct gsm_sms_pending *pending, *tmp;
+	struct gsm_sms_queue *smsq = _data;
+
+	llist_for_each_entry_safe(pending, tmp, &smsq->pending_sms, entry) {
+		struct gsm_sms *sms;
+		if (!pending->resend)
+			continue;
+
+		sms = db_sms_get(smsq->network, pending->sms_id);
+
+		/* the sms is gone? Move to the next */
+		if (!sms) {
+			sms_pending_free(pending);
+			smsq->pending -= 1;
+			sms_queue_trigger(smsq);
+		} else {
+			pending->resend = 0;
+			gsm411_send_sms_subscr(sms->receiver, sms);
+		}
+	}
+}
+
+static struct gsm_sms *take_next_sms(struct gsm_sms_queue *smsq)
+{
+	struct gsm_sms *sms;
+
+	sms = db_sms_get_unsent_by_subscr(smsq->network, smsq->last_subscr_id, 10);
+	if (sms) {
+		smsq->last_subscr_id = sms->receiver->id + 1;
+		return sms;
+	}
+
+	/* need to wrap around */
+	smsq->last_subscr_id = 0;
+	sms = db_sms_get_unsent_by_subscr(smsq->network,
+					  smsq->last_subscr_id, 10);
+	if (sms)
+		smsq->last_subscr_id = sms->receiver->id + 1;
+	return sms;
+}
+
+/**
+ * I will submit up to max_pending - pending SMS to the
+ * subsystem.
+ */
+static void sms_submit_pending(void *_data)
+{
+	struct gsm_sms_queue *smsq = _data;
+	int attempts = smsq->max_pending - smsq->pending;
+	int initialized = 0;
+	unsigned long long first_sub = 0;
+	int attempted = 0, rounds = 0;
+
+	LOGP(DSMS, LOGL_NOTICE, "Attempting to send %d SMS\n", attempts);
+
+	do {
+		struct gsm_sms_pending *pending;
+		struct gsm_sms *sms;
+
+
+		sms = take_next_sms(smsq);
+		if (!sms)
+			break;
+
+		rounds += 1;
+
+		/*
+		 * This code needs to detect a loop. It assumes that no SMS
+		 * will vanish during the time this is executed. We will remember
+		 * the id of the first GSM subscriber we see and then will
+		 * compare this. The Database code should make sure that we will
+		 * see all other subscribers first before seeing this one again.
+		 *
+		 * It is always scary to have an infinite loop like this.
+		 */
+		if (!initialized) {
+			first_sub = sms->receiver->id;
+			initialized = 1;
+		} else if (first_sub == sms->receiver->id) {
+			sms_free(sms);
+			break;
+		}
+
+		/* no need to send a pending sms */
+		if (sms_is_in_pending(smsq, sms)) {
+			LOGP(DSMS, LOGL_DEBUG,
+			     "SMSqueue with pending sms: %llu\n. Skipping", sms->id);
+			sms_free(sms);
+			continue;
+		}
+
+		/* no need to send a SMS with the same receiver */
+		if (sms_subscriber_is_pending(smsq, sms->receiver)) {
+			LOGP(DSMS, LOGL_DEBUG,
+			     "SMSqueue with pending sub: %llu. Skipping\n", sms->receiver->id);
+			sms_free(sms);
+			continue;
+		}
+
+		pending = sms_pending_from(smsq, sms);
+		if (!pending) {
+			LOGP(DSMS, LOGL_ERROR,
+			     "Failed to create pending SMS entry.\n");
+			sms_free(sms);
+			continue;
+		}
+
+		attempted += 1;
+		smsq->pending += 1;
+		llist_add(&pending->entry, &smsq->pending_sms);
+		gsm411_send_sms_subscr(sms->receiver, sms);
+	} while (attempted < attempts && rounds < 1000);
+
+	LOGP(DSMS, LOGL_DEBUG, "SMSqueue added %d messages in %d rounds\n", attempted, rounds);
+}
+
+/*
+ * Kick off the queue again.
+ */
+int sms_queue_trigger(struct gsm_sms_queue *smsq)
+{
+	if (bsc_timer_pending(&smsq->push_queue))
+		return 0;
+
+	bsc_schedule_timer(&smsq->push_queue, 1, 0);
+	return 0;
+}
+
+int sms_queue_start(struct gsm_network *network, int max_pending)
+{
+	struct gsm_sms_queue *sms = talloc_zero(network, struct gsm_sms_queue);
+	if (!sms) {
+		LOGP(DMSC, LOGL_ERROR, "Failed to create the SMS queue.\n");
+		return -1;
+	}
+
+	register_signal_handler(SS_SUBSCR, sms_subscr_cb, network);
+	register_signal_handler(SS_SMS, sms_sms_cb, network);
+
+	network->sms_queue = sms;
+	INIT_LLIST_HEAD(&sms->pending_sms);
+	sms->max_fail = 1;
+	sms->network = network;
+	sms->max_pending = max_pending;
+	sms->push_queue.data = sms;
+	sms->push_queue.cb = sms_submit_pending;
+	sms->resend_pending.data = sms;
+	sms->resend_pending.cb = sms_resend_pending;
+
+	sms_submit_pending(sms);
+
+	return 0;
+}
+
+static int sub_ready_for_sm(struct gsm_subscriber *subscr)
+{
+	struct gsm_subscriber_connection *conn;
+	struct gsm_sms *sms;
+
+	/* A subscriber has attached. Check if there are
+	 * any pending SMS for him to be delivered */
+	conn = connection_for_subscr(subscr);
+	if (!conn)
+		return -1;
+	sms = db_sms_get_unsent_for_subscr(subscr);
+	if (!sms)
+		return -1;
+	gsm411_send_sms(conn, sms);
+	return 0;
+}
+
+static int sms_subscr_cb(unsigned int subsys, unsigned int signal,
+			 void *handler_data, void *signal_data)
+{
+	struct gsm_subscriber *subscr = signal_data;
+
+	if (signal != S_SUBSCR_ATTACHED)
+		return 0;
+
+	/* this is readyForSM */
+	return sub_ready_for_sm(subscr);
+}
+
+static int sms_sms_cb(unsigned int subsys, unsigned int signal,
+		      void *handler_data, void *signal_data)
+{
+	struct gsm_network *network = handler_data;
+	struct sms_signal_data *sig_sms = signal_data;
+	struct gsm_sms_pending *pending;
+
+	/* We got a new SMS and maybe should launch the queue again. */
+	if (signal == S_SMS_SUBMITTED || signal == S_SMS_SMMA) {
+		sms_queue_trigger(network->sms_queue);
+		return 0;
+	}
+
+	if (!sig_sms->sms)
+		return -1;
+
+
+	/*
+	 * Find the entry of our queue. The SMS subsystem will submit
+	 * sms that are not in our control as we just have a channel
+	 * open anyway.
+	 */
+	pending = sms_find_pending(network->sms_queue, sig_sms->sms);
+	if (!pending)
+		return 0;
+
+	switch (signal) {
+	case S_SMS_DELIVERED:
+		/*
+		 * Create place for a new SMS but keep the pending data
+		 * so we will not attempt to send the SMS for this subscriber
+		 * as we still have an open channel and will attempt to submit
+		 * SMS to it anyway.
+		 */
+		network->sms_queue->pending -= 1;
+		sms_submit_pending(network->sms_queue);
+		sms_pending_free(pending);
+		break;
+	case S_SMS_MEM_EXCEEDED:
+		network->sms_queue->pending -= 1;
+		sms_pending_free(pending);
+		sms_queue_trigger(network->sms_queue);
+		break;
+	case S_SMS_UNKNOWN_ERROR:
+		/*
+		 * There can be many reasons for this failure. E.g. the paging
+		 * timed out, the subscriber was not paged at all, or there was
+		 * a protocol error. The current strategy is to try sending the
+		 * next SMS for busy/oom and to retransmit when we have paged.
+		 *
+		 * When the paging expires three times we will disable the
+		 * subscriber. If we have some kind of other transmit error we
+		 * should flag the SMS as bad.
+		 */
+		switch (sig_sms->paging_result) {
+		case 0:
+			/* BAD SMS? */
+			db_sms_inc_deliver_attempts(sig_sms->sms);
+			sms_pending_failed(pending, 0);
+			break;
+		case GSM_PAGING_EXPIRED:
+			sms_pending_failed(pending, 1);
+			break;
+
+		case GSM_PAGING_OOM:
+		case GSM_PAGING_BUSY:
+			network->sms_queue->pending -= 1;
+			sms_pending_free(pending);
+			sms_queue_trigger(network->sms_queue);
+			break;
+		default:
+			LOGP(DSMS, LOGL_ERROR, "Unhandled result: %d\n",
+			     sig_sms->paging_result);
+		}
+		break;
+	default:
+		LOGP(DSMS, LOGL_ERROR, "Unhandled result: %d\n",
+		     sig_sms->paging_result);
+	}
+
+	return 0;
+}
+
+/* VTY helper functions */
+int sms_queue_stats(struct gsm_sms_queue *smsq, struct vty *vty)
+{
+	struct gsm_sms_pending *pending;
+
+	vty_out(vty, "SMSqueue with max_pending: %d pending: %d%s",
+		smsq->max_pending, smsq->pending, VTY_NEWLINE);
+
+	llist_for_each_entry(pending, &smsq->pending_sms, entry)
+		vty_out(vty, " SMS Pending for Subscriber: %llu%s",
+			pending->subscr->id, VTY_NEWLINE);
+	return 0;
+}
+
+int sms_queue_set_max_pending(struct gsm_sms_queue *smsq, int max_pending)
+{
+	LOGP(DSMS, LOGL_NOTICE, "SMSqueue old max: %d new: %d\n",
+	     smsq->max_pending, max_pending);
+	smsq->max_pending = max_pending;
+	return 0;
+}
+
+int sms_queue_set_max_failure(struct gsm_sms_queue *smsq, int max_fail)
+{
+	LOGP(DSMS, LOGL_NOTICE, "SMSqueue max failure old: %d new: %d\n",
+	     smsq->max_fail, max_fail);
+	smsq->max_fail = max_fail;
+	return 0;
+}
+
+int sms_queue_clear(struct gsm_sms_queue *smsq)
+{
+	struct gsm_sms_pending *pending, *tmp;
+
+	llist_for_each_entry_safe(pending, tmp, &smsq->pending_sms, entry) {
+		LOGP(DSMS, LOGL_NOTICE,
+		     "SMSqueue clearing for sub %llu\n", pending->subscr->id);
+		sms_pending_free(pending);
+	}
+
+	return 0;
+}
diff --git a/openbsc/src/token_auth.c b/openbsc/src/token_auth.c
index dc8cce2..9e67152 100644
--- a/openbsc/src/token_auth.c
+++ b/openbsc/src/token_auth.c
@@ -117,7 +117,8 @@
 static int token_sms_cb(unsigned int subsys, unsigned int signal,
 			void *handler_data, void *signal_data)
 {
-	struct gsm_sms *sms = signal_data;
+	struct sms_signal_data *sig = signal_data;
+	struct gsm_sms *sms = sig->sms;;
 	struct gsm_subscriber_connection *conn;
 	u_int8_t auth_rand[16];
 
diff --git a/openbsc/src/vty_interface_layer3.c b/openbsc/src/vty_interface_layer3.c
index 0552653..c01d9c6 100644
--- a/openbsc/src/vty_interface_layer3.c
+++ b/openbsc/src/vty_interface_layer3.c
@@ -20,6 +20,7 @@
  */
 
 #include <stdlib.h>
+#include <limits.h>
 #include <unistd.h>
 #include <sys/types.h>
 
@@ -45,6 +46,7 @@
 #include <openbsc/vty.h>
 #include <openbsc/gsm_04_80.h>
 #include <openbsc/chan_alloc.h>
+#include <openbsc/sms_queue.h>
 
 extern struct gsm_network *gsmnet_from_vty(struct vty *v);
 
@@ -61,6 +63,8 @@
 	if (subscr->extension)
 		vty_out(vty, "    Extension: %s%s", subscr->extension,
 			VTY_NEWLINE);
+	vty_out(vty, "    LAC: %d/0x%x%s",
+		subscr->lac, subscr->lac, VTY_NEWLINE);
 	if (subscr->imsi)
 		vty_out(vty, "    IMSI: %s%s", subscr->imsi, VTY_NEWLINE);
 	if (subscr->tmsi != GSM_RESERVED_TMSI)
@@ -123,7 +127,7 @@
 	int id = 0;
 
 	while (1) {
-		sms = db_sms_get_unsent_by_subscr(gsmnet, id);
+		sms = db_sms_get_unsent_by_subscr(gsmnet, id, UINT_MAX);
 		if (!sms)
 			break;
 
@@ -167,16 +171,15 @@
 	sms = sms_from_text(receiver, str);
 	sms->protocol_id = tp_pid;
 
-	if(!receiver->lac){
-		/* subscriber currently not attached, store in database */
-		if (db_sms_store(sms) != 0) {
-			LOGP(DSMS, LOGL_ERROR, "Failed to store SMS in Database\n");
-			return CMD_WARNING;
-		}
-	} else {
-		gsm411_send_sms_subscr(receiver, sms);
+	/* store in database for the queue */
+	if (db_sms_store(sms) != 0) {
+		LOGP(DSMS, LOGL_ERROR, "Failed to store SMS in Database\n");
+		sms_free(sms);
+		return CMD_WARNING;
 	}
 
+	sms_free(sms);
+	sms_queue_trigger(receiver->net->sms_queue);
 	return CMD_SUCCESS;
 }
 
@@ -630,6 +633,60 @@
 	return CMD_SUCCESS;
 }
 
+DEFUN(show_smsqueue,
+      show_smsqueue_cmd,
+      "show sms-queue",
+      SHOW_STR "Display SMSqueue statistics\n")
+{
+	struct gsm_network *net = gsmnet_from_vty(vty);
+
+	sms_queue_stats(net->sms_queue, vty);
+	return CMD_SUCCESS;
+}
+
+DEFUN(smsqueue_trigger,
+      smsqueue_trigger_cmd,
+      "sms-queue trigger",
+      "SMS Queue\n" "Trigger sending messages\n")
+{
+	struct gsm_network *net = gsmnet_from_vty(vty);
+
+	sms_queue_trigger(net->sms_queue);
+	return CMD_SUCCESS;
+}
+
+DEFUN(smsqueue_max,
+      smsqueue_max_cmd,
+      "sms-queue max-pending <1-500>",
+      "SMS Queue\n" "SMS to attempt to deliver at the same time\n")
+{
+	struct gsm_network *net = gsmnet_from_vty(vty);
+
+	sms_queue_set_max_pending(net->sms_queue, atoi(argv[0]));
+	return CMD_SUCCESS;
+}
+
+DEFUN(smsqueue_clear,
+      smsqueue_clear_cmd,
+      "sms-queue clear",
+      "SMS Queue\n" "Clear the queue of pending SMS\n")
+{
+	struct gsm_network *net = gsmnet_from_vty(vty);
+
+	sms_queue_clear(net->sms_queue);
+	return CMD_SUCCESS;
+}
+
+DEFUN(smsqueue_fail,
+      smsqueue_fail_cmd,
+      "sms-queue max-failure <1-500>",
+      "SMS Queue\n" "Set maximum amount of failures\n")
+{
+	struct gsm_network *net = gsmnet_from_vty(vty);
+
+	sms_queue_set_max_failure(net->sms_queue, atoi(argv[0]));
+	return CMD_SUCCESS;
+}
 
 int bsc_vty_init_extra(void)
 {
@@ -647,12 +704,17 @@
 	install_element_ve(&subscriber_ussd_notify_cmd);
 	install_element_ve(&subscriber_update_cmd);
 	install_element_ve(&show_stats_cmd);
+	install_element_ve(&show_smsqueue_cmd);
 
 	install_element(ENABLE_NODE, &ena_subscr_name_cmd);
 	install_element(ENABLE_NODE, &ena_subscr_extension_cmd);
 	install_element(ENABLE_NODE, &ena_subscr_authorized_cmd);
 	install_element(ENABLE_NODE, &ena_subscr_a3a8_cmd);
 	install_element(ENABLE_NODE, &subscriber_purge_cmd);
+	install_element(ENABLE_NODE, &smsqueue_trigger_cmd);
+	install_element(ENABLE_NODE, &smsqueue_max_cmd);
+	install_element(ENABLE_NODE, &smsqueue_clear_cmd);
+	install_element(ENABLE_NODE, &smsqueue_fail_cmd);
 
 	return 0;
 }