ipa_proto: Add notion of 'codecs'

A codec for a given Stream Identifier can be registered with encode
and decode functions.  This codec transcodes from the binary payload
of messages within that stream identifier and some abstract
representation.  Any received messages will be passed through decode,
while any to-be-transmitted messages will be passed through encode.

Change-Id: I8eaf888402545a1a871df9ae3dfbce690729dd03
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
index c191a74..9a3f3d0 100644
--- a/src/ipa_proto.erl
+++ b/src/ipa_proto.erl
@@ -37,11 +37,16 @@
 
 -export([register_socket/1, register_stream/3, unregister_stream/2,
 	 send/3, connect/3, connect/4, listen_accept_handle/2,
-	 start_listen/3, controlling_process/3]).
+	 start_listen/3, controlling_process/3, register_codec/3]).
+
+-type stream_id() :: integer() | {osmo, integer()}.
 
 -record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
 
-
+-record(ipa_codec, {streamId :: stream_id(),
+		    encodeFn :: fun(),
+		    decodeFn :: fun()
+	}).
 
 % register a TCP socket with this IPA protocol implementation
 register_socket(Socket) ->
@@ -124,15 +129,43 @@
 
 % deliver an incoming message to the process that is registered for the socket/stream_id
 deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
+	DataDec = try_decode(StreamID, DataBin),
 	case ets:lookup(StreamMap, {Socket, StreamID}) of
 		[{_,{process_id, Pid}}] ->
-			Pid ! {ipa, Socket, StreamID, DataBin};
+			Pid ! {ipa, Socket, StreamID, DataDec};
 		[{_,{callback_fn, Fn, Args}}] ->
-			Fn(Socket, StreamID, DataBin, Args);
+			Fn(Socket, StreamID, DataDec, Args);
 		[] ->
 			io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
 	end.
 
+% register a Codec with this IPA protocol implementation
+-spec register_codec(stream_id(), fun(), fun()) -> boolean().
+register_codec(StreamID, EncodeFn, DecodeFn) ->
+	ets:insert(ipa_codecs, #ipa_codec{streamId=StreamID, encodeFn=EncodeFn, decodeFn=DecodeFn}).
+
+-spec try_decode(stream_id(), binary()) -> any().
+try_decode(StreamID, Data) ->
+	case ets:lookup(ipa_codecs, StreamID) of
+		[IpaCodec] ->
+			Fun = IpaCodec#ipa_codec.decodeFn,
+			Fun(Data);
+		[] ->
+			Data
+	end.
+
+-spec try_encode(stream_id(), any()) -> binary().
+try_encode(_StreamID, Data) when is_binary(Data) ->
+	Data;
+try_encode(StreamID, Data) ->
+	case ets:lookup(ipa_codecs, StreamID) of
+		[IpaCodec] ->
+			Fun = IpaCodec#ipa_codec.encodeFn,
+			Fun(Data);
+		[] ->
+			Data
+	end.
+
 % process (split + deliver) an incoming IPA message
 process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
 	ok;
@@ -173,10 +206,12 @@
 
 % send a binary message through a given Socket / StreamID
 send(Socket, {osmo, StreamIdExt}, DataBin) ->
-	send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataBin]);
+	DataEnc = try_encode({osmo, StreamIdExt}, DataBin),
+	send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataEnc]);
 send(Socket, StreamID, DataBin) ->
-	Size = iolist_size(DataBin),
-	gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
+	DataEnc = try_encode(StreamID, DataBin),
+	Size = iolist_size(DataEnc),
+	gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataEnc])).
 
 
 call_sync(Pid, Request) ->
@@ -194,12 +229,8 @@
 
 % global module initialization
 init() ->
-	case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
-		ipa_sockets ->
-			ok;
-		_ ->
-			{error, ets_new_ipa_sockets}
-	end.
+	ipa_sockets = ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]),
+	ipa_codecs = ets:new(ipa_codecs, [named_table, set, public, {keypos, #ipa_codec.streamId}]).
 
 % initialize a signle socket, create its handle process
 init_sock(Socket, CallingPid) ->