ipa: add ipa_server_peer infrastructure

This patch adds the ipa_server_peer abstraction which provide
helpers for the accept path of ipa_server_link.
diff --git a/include/osmocom/abis/ipa.h b/include/osmocom/abis/ipa.h
index 1ed4871..91afd01 100644
--- a/include/osmocom/abis/ipa.h
+++ b/include/osmocom/abis/ipa.h
@@ -8,7 +8,6 @@
 struct ipa_server_link {
 	struct e1inp_line		*line;
 	struct osmo_fd			ofd;
-	struct llist_head		tx_queue;
 	const char			*addr;
 	uint16_t			port;
 	int (*accept_cb)(struct ipa_server_link *link, int fd);
@@ -21,6 +20,17 @@
 int ipa_server_link_open(struct ipa_server_link *link);
 void ipa_server_link_close(struct ipa_server_link *link);
 
+struct ipa_server_peer {
+	struct ipa_server_link		*server;
+	struct osmo_fd			ofd;
+	struct llist_head		tx_queue;
+	int (*cb)(struct ipa_server_peer *peer, struct msgb *msg);
+	void				*data;
+};
+
+struct ipa_server_peer *ipa_server_peer_create(void *ctx, struct ipa_server_link *link, int fd, int (*cb)(struct ipa_server_peer *peer, struct msgb *msg), void *data);
+void ipa_server_peer_destroy(struct ipa_server_peer *peer);
+
 enum ipa_client_link_state {
 	IPA_CLIENT_LINK_STATE_NONE         = 0,
 	IPA_CLIENT_LINK_STATE_CONNECTING   = 1,
diff --git a/src/input/ipa.c b/src/input/ipa.c
index bb32fe1..6a2022e 100644
--- a/src/input/ipa.c
+++ b/src/input/ipa.c
@@ -316,3 +316,102 @@
 	osmo_fd_unregister(&link->ofd);
 	close(link->ofd.fd);
 }
+
+static void ipa_server_peer_read(struct ipa_server_peer *peer)
+{
+	struct osmo_fd *ofd = &peer->ofd;
+	struct msgb *msg;
+	int ret;
+
+	LOGP(DINP, LOGL_NOTICE, "message received\n");
+
+	ret = ipa_msg_recv(ofd->fd, &msg);
+	if (ret < 0) {
+		if (errno == EPIPE || errno == ECONNRESET) {
+			LOGP(DINP, LOGL_ERROR, "lost connection with server\n");
+		} else {
+			LOGP(DINP, LOGL_ERROR, "unknown error\n");
+		}
+		return;
+	} else if (ret == 0) {
+		LOGP(DINP, LOGL_ERROR, "connection closed with server\n");
+		ipa_server_peer_destroy(peer);
+		return;
+	}
+	if (peer->cb)
+		peer->cb(peer, msg);
+
+	return;
+}
+
+static void ipa_server_peer_write(struct ipa_server_peer *peer)
+{
+	struct osmo_fd *ofd = &peer->ofd;
+	struct msgb *msg;
+	struct llist_head *lh;
+	int ret;
+
+	LOGP(DINP, LOGL_NOTICE, "sending data\n");
+
+	if (llist_empty(&peer->tx_queue)) {
+		ofd->when &= ~BSC_FD_WRITE;
+		return;
+	}
+	lh = peer->tx_queue.next;
+	llist_del(lh);
+	msg = llist_entry(lh, struct msgb, list);
+
+	ret = send(peer->ofd.fd, msg->data, msg->len, 0);
+	if (ret < 0) {
+		LOGP(DINP, LOGL_ERROR, "error to send\n");
+	}
+	msgb_free(msg);
+}
+
+static int ipa_server_peer_cb(struct osmo_fd *ofd, unsigned int what)
+{
+	struct ipa_server_peer *peer = ofd->data;
+
+	LOGP(DINP, LOGL_NOTICE, "connected read/write\n");
+	if (what & BSC_FD_READ)
+		ipa_server_peer_read(peer);
+	if (what & BSC_FD_WRITE)
+		ipa_server_peer_write(peer);
+
+	return 0;
+}
+
+struct ipa_server_peer *
+ipa_server_peer_create(void *ctx, struct ipa_server_link *link, int fd,
+		int (*cb)(struct ipa_server_peer *peer, struct msgb *msg),
+		void *data)
+{
+	struct ipa_server_peer *peer;
+
+	peer = talloc_zero(ctx, struct ipa_server_peer);
+	if (peer == NULL) {
+		LOGP(DINP, LOGL_ERROR, "cannot allocate new peer in server, "
+			"reason=`%s'\n", strerror(errno));
+		return NULL;
+	}
+	peer->server = link;
+	peer->ofd.fd = fd;
+	peer->ofd.data = peer;
+	peer->ofd.cb = ipa_server_peer_cb;
+	peer->ofd.when = BSC_FD_READ;
+	peer->cb = cb;
+	peer->data = data;
+	if (osmo_fd_register(&peer->ofd) < 0) {
+		LOGP(DINP, LOGL_ERROR, "could not register FD\n");
+		talloc_free(peer);
+		return NULL;
+	}
+	return peer;
+}
+
+void ipa_server_peer_destroy(struct ipa_server_peer *peer)
+{
+	close(peer->ofd.fd);
+	osmo_fd_unregister(&peer->ofd);
+	talloc_free(peer);
+}