Import (heavily modified) ipa_proto implementation from osmo-map-masq

The local changes here need to be synchronized back to the osmo-map-masq
diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl
new file mode 100644
index 0000000..ebefe15
--- /dev/null
+++ b/src/ipa_proto.erl
@@ -0,0 +1,307 @@
+% ip.access IPA multiplex protocol 
+
+% (C) 2010 by Harald Welte <laforge@gnumonks.org>
+% (C) 2010 by On-Waves
+%
+% All Rights Reserved
+%
+% This program is free software; you can redistribute it and/or modify
+% it under the terms of the GNU General Public License as published by
+% the Free Software Foundation; either version 2 of the License, or
+% (at your option) any later version.
+%
+% This program is distributed in the hope that it will be useful,
+% but WITHOUT ANY WARRANTY; without even the implied warranty of
+% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+% GNU General Public License for more details.
+%
+% You should have received a copy of the GNU General Public License along
+% with this program; if not, write to the Free Software Foundation, Inc.,
+% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+-module(ipa_proto).
+-author('Harald Welte <laforge@gnumonks.org>').
+-compile(export_all).
+
+-define(TIMEOUT, 1000).
+-define(IPA_SOCKOPTS, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]).
+
+-define(IPAC_MSGT_PING,		0).
+-define(IPAC_MSGT_PONG, 	1).
+-define(IPAC_MSGT_ID_GET, 	4).
+-define(IPAC_MSGT_ID_RESP, 	5).
+-define(IPAC_MSGT_ID_ACK, 	6).
+
+-export([register_socket/1, register_stream/3, unregister_stream/2,
+	 send/3, connect/3, connect/4, listen_accept_handle/2,
+	 start_listen/3, controlling_process/3]).
+
+-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
+
+
+
+% register a TCP socket with this IPA protocol implementation
+register_socket(Socket) ->
+	IpaPid = spawn(?MODULE, init_sock, [Socket, self()]),
+	% synchronously wait for init_sock to be done
+	receive 
+		{ipa_init_sock_done, Socket} ->
+			% assign ownership of the socket to the new IPA handler process
+			gen_tcp:controlling_process(Socket, IpaPid),
+			{ok, IpaPid}
+	after 
+		?TIMEOUT ->
+			{error, timeout}
+	end.
+
+% call_sync() preceeded by a Socket -> Pid lookup
+call_sync_sock(Socket, Request) ->
+	% resolve PID responsible for this socket
+	case ets:lookup(ipa_sockets, Socket) of
+		[IpaSock] ->
+			call_sync(IpaSock#ipa_socket.ipaPid, Request);
+		_ ->
+			io:format("No Process for Socket ~p~n", [Socket]),
+			{error, no_sock_for_pid}
+	end.
+
+% a user process wants to register itself for a given Socket/StreamID tuple
+register_stream(Socket, StreamID, Pid) ->
+	call_sync_sock(Socket, {ipa_reg_stream, Socket, StreamID, Pid}).
+
+register_streams(_S, []) ->
+	ok;
+register_streams(S, [{StreamID, Pid}|SList]) ->
+	ipa_proto:register_stream(S, StreamID, Pid),
+	register_streams(S, SList).
+
+% unregister for a given stream
+unregister_stream(Socket, StreamID) ->
+	call_sync_sock(Socket, {ipa_unreg_stream, Socket, StreamID}).
+
+% change the controlling process for a given {Socket, StreamID}
+controlling_process(Socket, StreamID, NewPid) ->
+	call_sync_sock(Socket, {ipa_ctrl_proc, Socket, StreamID, NewPid}).
+
+% unblock the socket from further processing
+unblock(Socket) ->
+	send_ccm_id_get(Socket),
+	call_sync_sock(Socket, {ipa_unblock, Socket}).
+
+
+% server-side handler for unregister_stream()
+request({ipa_reg_stream, Socket, StreamID, Pid}) ->
+	io:format("Registering handler ~p for socket ~p Stream ~p~n", [Pid, Socket, StreamID]),
+	[IpaSock] = ets:lookup(ipa_sockets, Socket),
+	ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, Pid});
+% server-side handler for unregister_stream()
+request({ipa_unreg_stream, Socket, StreamID}) ->
+	io:format("Unregistering handler for Socket ~p Stream ~p~n", [Socket, StreamID]),
+	[IpaSock] = ets:lookup(ipa_sockets, Socket),
+	ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID});
+% server-side handler for controlling_process()
+request({ipa_ctrl_proc, Socket, StreamID, NewPid}) ->
+	io:format("Changing handler for socket ~p Stream ~p~n", [Socket, StreamID]),
+	[IpaSock] = ets:lookup(ipa_sockets, Socket),
+	ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID}),
+	ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, NewPid});
+% server-side handler for unblock()
+request({ipa_unblock, Socket}) ->
+	io:format("Unblocking socket ~p~n", [Socket]),
+	%[IpaSock] = ets:lookup(ipa_sockets, Socket),
+	Ret = inet:setopts(Socket, [{active, once}]),
+	io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
+
+% split an incoming IPA message and split it into Length/StreamID/Payload
+split_ipa_msg(DataBin) ->
+	% FIXME: This will throw an exception if DataBin doesn't contain all payload
+	<<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
+	io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
+	{StreamID, Payload, Trailer}.
+
+% deliver an incoming message to the process that is registered for the socket/stream_id
+deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
+	case ets:lookup(StreamMap, {Socket, StreamID}) of
+		[{_,{process_id, Pid}}] ->
+			Pid ! {ipa, Socket, StreamID, DataBin};
+		[{_,{callback_fn, Fn, Args}}] ->
+			Fn(Socket, StreamID, DataBin, Args);
+		[] ->
+			io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
+	end.
+
+% process (split + deliver) an incoming IPA message
+process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
+	ok;
+process_rx_ipa_msg(S, StreamMap, Data) ->
+	{StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
+	case StreamID of
+		254 ->
+			process_rx_ccm_msg(S, StreamID, PayloadBin);
+		_ ->
+			deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
+	end,
+	process_rx_ipa_msg(S, StreamMap, Trailer).
+
+send_close_signal([]) ->
+	ok;
+send_close_signal([StreamSpec|Tail]) ->
+	io:format("FIXME: send_close_signal ~p ~p~n", [StreamSpec, Tail]),
+	%[{{Socket, StreamID, Pid}}] = StreamSpec,
+	%Pid ! {ipa_closed, {Socket, StreamID}},
+	send_close_signal(Tail).
+	
+process_tcp_closed(S, StreamMap) ->
+	% signal the closed socket to the user
+	StreamList = ets:match(StreamMap, '$1'),
+	send_close_signal(StreamList),
+	% remove the stream map for this socket
+	ets:delete(StreamMap),
+	% remove any entry regarding 'S' from ipa_sockets
+	ets:delete(ipa_sockets, S),
+	ok.
+
+% send a binary message through a given Socket / StreamID
+send(Socket, StreamID, DataBin) ->
+	Size = byte_size(DataBin),
+	gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
+
+
+call_sync(Pid, Request) ->
+	Ref = make_ref(),
+	Pid ! {request, {self(), Ref}, Request},
+	receive
+		{reply, Ref, Reply} -> Reply
+	after
+		?TIMEOUT -> {error, timeout}
+	end.
+
+reply({From, Ref}, Reply) ->
+	From ! {reply, Ref, Reply}.
+
+
+% global module initialization
+init() ->
+	case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
+		ipa_sockets ->
+			ok;
+		_ ->
+			{error, ets_new_ipa_sockets}
+	end.
+
+% initialize a signle socket, create its handle process
+init_sock(Socket, CallingPid) ->
+	StreamMap = ets:new(stream_map, [set]),
+	ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
+	CallingPid ! {ipa_init_sock_done, Socket},
+	loop(Socket, StreamMap).
+
+loop(S, StreamMap) ->
+	receive
+		{request, From, Request} ->
+			Reply = ipa_proto:request(Request),
+			ipa_proto:reply(From, Reply),
+			ipa_proto:loop(S, StreamMap);
+		{ipa_send, S, StreamId, Data} ->
+			send(S, StreamId, Data),
+			ipa_proto:loop(S, StreamMap);
+		{tcp, S, Data} ->
+			% process incoming IPA message and mark socket active once more
+			ipa_proto:process_rx_ipa_msg(S, StreamMap, Data),
+			inet:setopts(S, [{active, once}]),
+			ipa_proto:loop(S, StreamMap);
+		{tcp_closed, S} ->
+			io:format("Socket ~w closed [~w]~n", [S,self()]),
+			ipa_proto:process_tcp_closed(S, StreamMap),
+			% terminate the process by not looping further
+			ok
+	end.
+
+% Respond with PONG to PING
+process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_PING, _) ->
+	io:format("Socket ~p Stream ~p: PING -> PONG~n", [Socket, StreamID]),
+	send(Socket, StreamID, <<?IPAC_MSGT_PONG>>);
+% Simply respond to ID_ACK with ID_ACK
+process_ccm_msg(_Socket, _StreamID, ?IPAC_MSGT_ID_ACK, _) ->
+%	send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
+	ok;
+% Simply respond to ID_RESP with ID_ACK
+process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_ID_RESP, _) ->
+	io:format("Socket ~p Stream ~p: ID_RESP -> ID_ACK~n", [Socket, StreamID]),
+	send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
+% Default message handler for unknown messages
+process_ccm_msg(Socket, StreamID, MsgType, Opts) ->
+	io:format("Socket ~p Stream ~p: Unknown CCM message type ~p Opts ~p~n",
+		  [Socket, StreamID, MsgType, Opts]).
+
+% process an incoming CCM message (Stream ID 254)
+process_rx_ccm_msg(Socket, StreamID, PayloadBin) ->
+	[MsgType|Opts] = binary:bin_to_list(PayloadBin),
+	process_ccm_msg(Socket, StreamID, MsgType, Opts).
+
+send_ccm_id_get(Socket) ->
+	send(Socket, 254, <<?IPAC_MSGT_ID_GET>>).
+
+% convenience wrapper for interactive use / debugging from the shell
+listen_accept_handle(LPort, Opts) ->
+	case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
+		{ok, ListenSock} ->
+			{ok, Port} = inet:port(ListenSock),
+			{ok, Sock} = gen_tcp:accept(ListenSock),
+			{ok, IpaPid} = ipa_proto:register_socket(Sock),
+			ipa_proto:register_stream(Sock, 0, self()),
+			ipa_proto:register_stream(Sock, 255, self()),
+			gen_tcp:controlling_process(Sock, IpaPid),
+			{ok, Port};
+		{error, Reason} ->
+			{error, Reason}
+	end.
+
+% gen_tcp:connect() convenience wrappers
+connect(Address, Port, Options) ->
+	connect(Address, Port, Options, infinity).
+
+connect(Address, Port, Options, Timeout) ->
+	case gen_tcp:connect(Address, Port, ?IPA_SOCKOPTS ++ Options, Timeout) of
+		{ok, Socket} ->
+			case ipa_proto:register_socket(Socket) of
+				{ok, _} ->
+					{ok, Socket};
+				{error, Reason} ->
+					gen_tcp:close(Socket),
+					{error, Reason}
+			end;
+		{error, Reason} ->
+			{error, Reason}
+	end.
+
+% Utility function to continuously server incomming IPA connections on a
+% listening TCP socket
+start_listen(LPort, NumServers, Opts) ->
+	case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
+		{ok, ListenSock} ->
+			start_servers(NumServers, ListenSock, self()),
+			{ok, Port} = inet:port(ListenSock),
+			Port;
+		{error, Reason} ->
+			{error, Reason}
+	end.
+
+start_servers(0, _, _) ->
+	ok;
+start_servers(Num, LS, CtrlPid) ->
+	spawn(?MODULE, listen_server, [LS, CtrlPid]),
+	start_servers(Num-1, LS, CtrlPid).
+
+listen_server(LS, CtrlPid) ->
+	case gen_tcp:accept(LS) of
+		{ok, S} ->
+			io:format("Accepted TCP connection from ~p~n", [inet:peername(S)]),
+			% assign the socket to the Controlling process
+			gen_tcp:controlling_process(S, CtrlPid),
+			CtrlPid ! {ipa_tcp_accept, S},
+			listen_server(LS, CtrlPid);
+		Other ->
+			io:format("accept returned ~w - goodbye!~n", [Other]),
+			ok
+	end.