ipa_proto.erl: Implement TCP/IPA reassembly

Related: OS#6377
Change-Id: I322e6ab9368ad66be66a9e8d113575f51f9c91c3
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index c96d6bb..d88af7e 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -129,12 +129,18 @@
 	Ret = inet:setopts(Socket, [{active, once}]),
 	io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
 
-% split an incoming IPA message and split it into Length/StreamID/Payload
+% split an incoming IPA message and split it into StreamID/Payload/Trailer
+split_ipa_msg(IPALen, _StreamID, DataRemainBin) when byte_size(DataRemainBin) < IPALen ->
+	need_more_data;
+split_ipa_msg(IPALen, StreamID, DataRemainBin) ->
+	<<Payload:IPALen/binary, Trailer/binary>> = DataRemainBin,
+	io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]),
+	{ok, StreamID, Payload, Trailer}.
+split_ipa_msg(DataBin) when byte_size(DataBin) < 3 ->
+	need_more_data;
 split_ipa_msg(DataBin) ->
-	% FIXME: This will throw an exception if DataBin doesn't contain all payload
-	<<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
-	io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
-	{StreamID, Payload, Trailer}.
+	<<IPALen:16/big-unsigned-integer, StreamID:8, DataRemainBin/binary>> = DataBin,
+	split_ipa_msg(IPALen, StreamID, DataRemainBin).
 
 % deliver an incoming message to the process that is registered for the socket/stream_id
 deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
@@ -177,19 +183,22 @@
 
 % process (split + deliver) an incoming IPA message
 process_rx_ipa_msg(_S, _StreamMap, _, <<>>) ->
-	ok;
+	{ok, <<>>};
 process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) ->
-	{StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
-	case StreamID of
-		?IPAC_PROTO_CCM ->
-			process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
-		?IPAC_PROTO_OSMO ->
-			<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
-			deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
-		_ ->
-			deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
-	end,
-	process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer).
+	case split_ipa_msg(Data) of
+		need_more_data -> {ok, Data};
+		{ok, StreamID, PayloadBin, Trailer} ->
+			case StreamID of
+				?IPAC_PROTO_CCM ->
+					process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
+				?IPAC_PROTO_OSMO ->
+					<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
+					deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
+				_ ->
+					deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
+			end,
+			process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer)
+	end.
 
 send_close_signal([]) ->
 	ok;
@@ -246,9 +255,9 @@
 	StreamMap = ets:new(stream_map, [set]),
 	ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
 	CallingPid ! {ipa_init_sock_done, Socket},
-	loop(Socket, StreamMap, #ipa_ccm_options{}).
+	loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>).
 
-loop(S, StreamMap, CcmOptions) ->
+loop(S, StreamMap, CcmOptions, RxPendingData) ->
 	receive
 		{request, From, Request} ->
 			case ipa_proto:request(Request, CcmOptions) of
@@ -260,15 +269,15 @@
 					Reply = EmbeddedReply
 			end,
 			ipa_proto:reply(From, Reply),
-			ipa_proto:loop(S, StreamMap, NextCcmOptions);
+			ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData);
 		{ipa_send, S, StreamId, Data} ->
 			send(S, StreamId, Data),
-			ipa_proto:loop(S, StreamMap, CcmOptions);
+			ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData);
 		{tcp, S, Data} ->
 			% process incoming IPA message and mark socket active once more
-			ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data),
+			{ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, <<RxPendingData/binary, Data/binary>>),
 			inet:setopts(S, [{active, once}]),
-			ipa_proto:loop(S, StreamMap, CcmOptions);
+			ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData);
 		{tcp_closed, S} ->
 			io:format("Socket ~w closed [~w]~n", [S,self()]),
 			ipa_proto:process_tcp_closed(S, StreamMap),