bankd: terminate bankd connection once map is deleted

This implements a signal based mechanism by which the main thread can
inform worker threads that their mapping has just been removed and
they should hence terminate the connection and return themselves to the
pool.

Change-Id: Id932810d59e9e5d8994629d57aaf180bc96f90f5
diff --git a/src/bankd_main.c b/src/bankd_main.c
index 36f9625..f20d3b0 100644
--- a/src/bankd_main.c
+++ b/src/bankd_main.c
@@ -25,6 +25,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <stdint.h>
+#include <signal.h>
 #include <unistd.h>
 #include <errno.h>
 
@@ -53,6 +54,10 @@
 #include "debug.h"
 #include "rspro_util.h"
 
+/* signal indicates to worker thread that its map has been deleted */
+#define SIGMAPDEL	SIGRTMIN+1
+static void handle_sig_mapdel(int sig);
+
 __thread void *talloc_asn1_ctx;
 struct bankd *g_bankd;
 
@@ -184,9 +189,20 @@
 			if (!map)
 				resp = rspro_gen_RemoveMappingRes(ResultCode_unknownSlotmap);
 			else {
-				/* FIXME: kill/reset the respective worker, if any! */
 				slotmap_del(g_bankd->slotmaps, map);
 				resp = rspro_gen_RemoveMappingRes(ResultCode_ok);
+
+				/* kill/reset the respective worker, if any! */
+				struct bankd_worker *worker;
+				pthread_mutex_lock(&g_bankd->workers_mutex);
+				llist_for_each_entry(worker, &g_bankd->workers, list) {
+					if (bs.bank_id == worker->slot.bank_id &&
+					    bs.slot_nr == worker->slot.slot_nr) {
+						pthread_kill(worker->thread, SIGMAPDEL);
+						break;
+					}
+				}
+				pthread_mutex_unlock(&g_bankd->workers_mutex);
 			}
 		}
 		server_conn_send_rspro(srvc, resp);
@@ -199,6 +215,7 @@
 	return 0;
 }
 
+
 void handle_options(int argc, char **argv)
 {
 }
@@ -224,6 +241,8 @@
 
 	handle_options(argc, argv);
 
+	signal(SIGMAPDEL, handle_sig_mapdel);
+
 	/* Connection towards remsim-server */
 	rc = server_conn_fsm_alloc(g_bankd, srvc);
 	if (rc < 0) {
@@ -248,10 +267,6 @@
 	while (1) {
 		if (terminate)
 			break;
-		/* FIXME: Connect to remsim-server from the main thread, register with
-		 * it and await + process any slot mapping or other configuration commands.
-		 * Ensure to re-connect as needed. */
-		sleep(1);
 		osmo_select_main(0);
 	}
 
@@ -265,6 +280,8 @@
  * bankd worker thread
  ***********************************************************************/
 
+static __thread struct bankd_worker *g_worker;
+
 struct value_string worker_state_names[] = {
 	{ BW_ST_INIT, 			"INIT" },
 	{ BW_ST_ACCEPTING,		"ACCEPTING" },
@@ -273,6 +290,7 @@
 	{ BW_ST_CONN_CLIENT_WAIT_MAP,	"CONN_CLIENT_WAIT_MAP" },
 	{ BW_ST_CONN_CLIENT_MAPPED,	"CONN_CLIENT_MAPPED" },
 	{ BW_ST_CONN_CLIENT_MAPPED_CARD,"CONN_CLIENT_MAPPED_CARD" },
+	{ BW_ST_CONN_CLIENT_UNMAPPED,	"CONN_CLIENT_UNMAPPED" },
 	{ 0, NULL }
 };
 
@@ -304,6 +322,14 @@
 	worker->timeout = timeout_secs;
 }
 
+/* signal handler for receiving SIGMAPDEL from main thread */
+static void handle_sig_mapdel(int sig)
+{
+	LOGW(g_worker, "SIGMAPDEL received: Main thread informs us our map is gone\n");
+	OSMO_ASSERT(sig == SIGMAPDEL);
+	worker_set_state(g_worker, BW_ST_CONN_CLIENT_UNMAPPED);
+}
+
 static void worker_cleanup(void *arg)
 {
 	struct bankd_worker *worker = (struct bankd_worker *) arg;
@@ -359,7 +385,7 @@
 }
 
 
-static int blocking_ipa_read(int fd, uint8_t *buf, unsigned int buf_size)
+static int blocking_ipa_read(struct bankd_worker *worker, uint8_t *buf, unsigned int buf_size)
 {
 	struct ipaccess_head *hh;
 	uint16_t len;
@@ -370,17 +396,34 @@
 
 	hh = (struct ipaccess_head *) buf;
 
-	/* 1) blocking read from the socket (IPA header) */
-	rc = read(fd, buf, sizeof(*hh));
-	if (rc < sizeof(*hh))
+	/* we use 'recv' and not 'read' below, as 'recv' will always fail with -EINTR
+	 * in case of a signal being received */
+
+restart_hdr:
+	/* 1) blocking recv from the socket (IPA header) */
+	rc = recv(worker->client.fd, buf, sizeof(*hh), 0);
+	if (rc == -1 && errno == EINTR) {
+		if (worker->state == BW_ST_CONN_CLIENT_UNMAPPED)
+			return -23;
+		goto restart_hdr;
+	} else if (rc < 0)
+		return rc;
+	else if (rc < sizeof(*hh))
 		return -2;
 
 	len = ntohs(hh->len);
 	needed = len; //- sizeof(*hh);
 
-	/* 2) blocking read from the socket (payload) */
-	rc = read(fd, buf+sizeof(*hh), needed);
-	if (rc < needed)
+restart_body:
+	/* 2) blocking recv from the socket (payload) */
+	rc = recv(worker->client.fd, buf+sizeof(*hh), needed, 0);
+	if (rc == -1 && errno == EINTR) {
+		if (worker->state == BW_ST_CONN_CLIENT_UNMAPPED)
+			return -23;
+		goto restart_body;
+	} else if (rc < 0)
+		return rc;
+	else if (rc < needed)
 		return -3;
 
 	return len;
@@ -560,8 +603,15 @@
 	int data_len, rc;
 	RsproPDU_t *pdu = NULL;
 
+restart_wait:
 	rc = wait_for_fd_or_timeout(worker->client.fd, worker->timeout);
-	if (rc == 0) {
+	if (rc == -1 && errno == EINTR) {
+		if (worker->state == BW_ST_CONN_CLIENT_UNMAPPED)
+			return -23;
+		goto restart_wait;
+	} else if (rc < 0)
+		return rc;
+	else if (rc == 0) {
 		/* TIMEOUT case */
 		switch (worker->state) {
 		case BW_ST_CONN_CLIENT_WAIT_MAP:
@@ -580,7 +630,7 @@
 	};
 
 	/* 1) blocking read of entire IPA message from the socket */
-	rc = blocking_ipa_read(worker->client.fd, buf, sizeof(buf));
+	rc = blocking_ipa_read(worker, buf, sizeof(buf));
 	if (rc < 0)
 		return rc;
 	data_len = rc;
@@ -646,16 +696,17 @@
 /* worker thread main function */
 static void *worker_main(void *arg)
 {
-	struct bankd_worker *worker = (struct bankd_worker *) arg;
 	void *top_ctx;
 	int rc;
 	char worker_name[32];
 
+	g_worker = (struct bankd_worker *) arg;
+
 	/* set the thread name */
-	snprintf(worker_name, sizeof(worker_name), "bankd-worker(%u)", worker->num);
+	snprintf(worker_name, sizeof(worker_name), "bankd-worker(%u)", g_worker->num);
 	pthread_setname_np(pthread_self(), worker_name);
 
-	worker_set_state(worker, BW_ST_INIT);
+	worker_set_state(g_worker, BW_ST_INIT);
 
 	/* not permitted in multithreaded environment */
 	talloc_disable_null_tracking();
@@ -663,52 +714,52 @@
 	talloc_asn1_ctx = talloc_named_const(top_ctx, 0, "asn1");
 
 	/* push cleanup helper */
-	pthread_cleanup_push(&worker_cleanup, worker);
+	pthread_cleanup_push(&worker_cleanup, g_worker);
 
 	/* we continuously perform the same loop here, recycling the worker thread
 	 * once the client connection is gone or we have some trouble with the card/reader */
 	while (1) {
 		char buf[128];
 
-		worker->client.peer_addr_len = sizeof(worker->client.peer_addr);
+		g_worker->client.peer_addr_len = sizeof(g_worker->client.peer_addr);
 
-		worker_set_state(worker, BW_ST_ACCEPTING);
+		worker_set_state(g_worker, BW_ST_ACCEPTING);
 		/* first wait for an incoming TCP connection */
-		rc = accept(worker->bankd->accept_fd, (struct sockaddr *) &worker->client.peer_addr,
-			    &worker->client.peer_addr_len);
+		rc = accept(g_worker->bankd->accept_fd, (struct sockaddr *) &g_worker->client.peer_addr,
+			    &g_worker->client.peer_addr_len);
 		if (rc < 0) {
 			continue;
 		}
-		worker->client.fd = rc;
-		worker_client_addrstr(buf, sizeof(buf), worker);
-		LOGW(worker, "Accepted connection from %s\n", buf);
-		worker_set_state(worker, BW_ST_CONN_WAIT_ID);
+		g_worker->client.fd = rc;
+		worker_client_addrstr(buf, sizeof(buf), g_worker);
+		LOGW(g_worker, "Accepted connection from %s\n", buf);
+		worker_set_state(g_worker, BW_ST_CONN_WAIT_ID);
 
 		/* run the main worker transceive loop body until there was some error */
 		while (1) {
-			rc = worker_transceive_loop(worker);
+			rc = worker_transceive_loop(g_worker);
 			if (rc < 0)
 				break;
 		}
 
-		LOGW(worker, "Error %d occurred: Cleaning up state\n", rc);
+		LOGW(g_worker, "Error %d occurred: Cleaning up state\n", rc);
 
 		/* clean-up: reset to sane state */
-		if (worker->reader.pcsc.hCard) {
-			SCardDisconnect(worker->reader.pcsc.hCard, SCARD_UNPOWER_CARD);
-			worker->reader.pcsc.hCard = 0;
+		if (g_worker->reader.pcsc.hCard) {
+			SCardDisconnect(g_worker->reader.pcsc.hCard, SCARD_UNPOWER_CARD);
+			g_worker->reader.pcsc.hCard = 0;
 		}
-		if (worker->reader.pcsc.hContext) {
-			SCardReleaseContext(worker->reader.pcsc.hContext);
-			worker->reader.pcsc.hContext = 0;
+		if (g_worker->reader.pcsc.hContext) {
+			SCardReleaseContext(g_worker->reader.pcsc.hContext);
+			g_worker->reader.pcsc.hContext = 0;
 		}
-		if (worker->reader.name)
-			worker->reader.name = NULL;
-		if (worker->client.fd >= 0)
-			close(worker->client.fd);
-		memset(&worker->client.peer_addr, 0, sizeof(worker->client.peer_addr));
-		worker->client.fd = -1;
-		worker->client.clslot.client_id = worker->client.clslot.slot_nr = 0;
+		if (g_worker->reader.name)
+			g_worker->reader.name = NULL;
+		if (g_worker->client.fd >= 0)
+			close(g_worker->client.fd);
+		memset(&g_worker->client.peer_addr, 0, sizeof(g_worker->client.peer_addr));
+		g_worker->client.fd = -1;
+		g_worker->client.clslot.client_id = g_worker->client.clslot.slot_nr = 0;
 	}
 
 	pthread_cleanup_pop(1);