blob: d4e9d9650ffec391d257e16896187c56d511e1bc [file] [log] [blame]
Harald Welte176e28c2010-12-19 22:56:58 +01001% ip.access IPA multiplex protocol
2
3% (C) 2010 by Harald Welte <laforge@gnumonks.org>
4% (C) 2010 by On-Waves
5%
6% All Rights Reserved
7%
8% This program is free software; you can redistribute it and/or modify
9% it under the terms of the GNU General Public License as published by
10% the Free Software Foundation; either version 2 of the License, or
11% (at your option) any later version.
12%
13% This program is distributed in the hope that it will be useful,
14% but WITHOUT ANY WARRANTY; without even the implied warranty of
15% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16% GNU General Public License for more details.
17%
18% You should have received a copy of the GNU General Public License along
19% with this program; if not, write to the Free Software Foundation, Inc.,
20% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
22-module(ipa_proto).
23-author('Harald Welte <laforge@gnumonks.org>').
24-compile(export_all).
25
26-define(TIMEOUT, 1000).
27-define(IPA_SOCKOPTS, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]).
28
29-define(IPAC_MSGT_PING, 0).
30-define(IPAC_MSGT_PONG, 1).
31-define(IPAC_MSGT_ID_GET, 4).
32-define(IPAC_MSGT_ID_RESP, 5).
33-define(IPAC_MSGT_ID_ACK, 6).
34
35-export([register_socket/1, register_stream/3, unregister_stream/2,
36 send/3, connect/3, connect/4, listen_accept_handle/2,
37 start_listen/3, controlling_process/3]).
38
39-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
40
41
42
43% register a TCP socket with this IPA protocol implementation
44register_socket(Socket) ->
45 IpaPid = spawn(?MODULE, init_sock, [Socket, self()]),
46 % synchronously wait for init_sock to be done
47 receive
48 {ipa_init_sock_done, Socket} ->
49 % assign ownership of the socket to the new IPA handler process
50 gen_tcp:controlling_process(Socket, IpaPid),
51 {ok, IpaPid}
52 after
53 ?TIMEOUT ->
54 {error, timeout}
55 end.
56
57% call_sync() preceeded by a Socket -> Pid lookup
58call_sync_sock(Socket, Request) ->
59 % resolve PID responsible for this socket
60 case ets:lookup(ipa_sockets, Socket) of
61 [IpaSock] ->
62 call_sync(IpaSock#ipa_socket.ipaPid, Request);
63 _ ->
64 io:format("No Process for Socket ~p~n", [Socket]),
65 {error, no_sock_for_pid}
66 end.
67
68% a user process wants to register itself for a given Socket/StreamID tuple
69register_stream(Socket, StreamID, Pid) ->
70 call_sync_sock(Socket, {ipa_reg_stream, Socket, StreamID, Pid}).
71
72register_streams(_S, []) ->
73 ok;
74register_streams(S, [{StreamID, Pid}|SList]) ->
75 ipa_proto:register_stream(S, StreamID, Pid),
76 register_streams(S, SList).
77
78% unregister for a given stream
79unregister_stream(Socket, StreamID) ->
80 call_sync_sock(Socket, {ipa_unreg_stream, Socket, StreamID}).
81
82% change the controlling process for a given {Socket, StreamID}
83controlling_process(Socket, StreamID, NewPid) ->
84 call_sync_sock(Socket, {ipa_ctrl_proc, Socket, StreamID, NewPid}).
85
86% unblock the socket from further processing
87unblock(Socket) ->
88 send_ccm_id_get(Socket),
89 call_sync_sock(Socket, {ipa_unblock, Socket}).
90
91
92% server-side handler for unregister_stream()
93request({ipa_reg_stream, Socket, StreamID, Pid}) ->
94 io:format("Registering handler ~p for socket ~p Stream ~p~n", [Pid, Socket, StreamID]),
95 [IpaSock] = ets:lookup(ipa_sockets, Socket),
96 ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, Pid});
97% server-side handler for unregister_stream()
98request({ipa_unreg_stream, Socket, StreamID}) ->
99 io:format("Unregistering handler for Socket ~p Stream ~p~n", [Socket, StreamID]),
100 [IpaSock] = ets:lookup(ipa_sockets, Socket),
101 ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID});
102% server-side handler for controlling_process()
103request({ipa_ctrl_proc, Socket, StreamID, NewPid}) ->
104 io:format("Changing handler for socket ~p Stream ~p~n", [Socket, StreamID]),
105 [IpaSock] = ets:lookup(ipa_sockets, Socket),
106 ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID}),
107 ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, NewPid});
108% server-side handler for unblock()
109request({ipa_unblock, Socket}) ->
110 io:format("Unblocking socket ~p~n", [Socket]),
111 %[IpaSock] = ets:lookup(ipa_sockets, Socket),
112 Ret = inet:setopts(Socket, [{active, once}]),
113 io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
114
115% split an incoming IPA message and split it into Length/StreamID/Payload
116split_ipa_msg(DataBin) ->
117 % FIXME: This will throw an exception if DataBin doesn't contain all payload
118 <<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
119 io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
120 {StreamID, Payload, Trailer}.
121
122% deliver an incoming message to the process that is registered for the socket/stream_id
123deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
124 case ets:lookup(StreamMap, {Socket, StreamID}) of
125 [{_,{process_id, Pid}}] ->
126 Pid ! {ipa, Socket, StreamID, DataBin};
127 [{_,{callback_fn, Fn, Args}}] ->
128 Fn(Socket, StreamID, DataBin, Args);
129 [] ->
130 io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
131 end.
132
133% process (split + deliver) an incoming IPA message
134process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
135 ok;
136process_rx_ipa_msg(S, StreamMap, Data) ->
137 {StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
138 case StreamID of
139 254 ->
140 process_rx_ccm_msg(S, StreamID, PayloadBin);
141 _ ->
142 deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
143 end,
144 process_rx_ipa_msg(S, StreamMap, Trailer).
145
146send_close_signal([]) ->
147 ok;
148send_close_signal([StreamSpec|Tail]) ->
149 io:format("FIXME: send_close_signal ~p ~p~n", [StreamSpec, Tail]),
Harald Welte5cde7622012-01-23 23:18:28 +0100150 [{{Socket, StreamID, Pid}}] = StreamSpec,
151 Pid ! {ipa_closed, {Socket, StreamID}},
Harald Welte176e28c2010-12-19 22:56:58 +0100152 send_close_signal(Tail).
153
154process_tcp_closed(S, StreamMap) ->
155 % signal the closed socket to the user
156 StreamList = ets:match(StreamMap, '$1'),
157 send_close_signal(StreamList),
158 % remove the stream map for this socket
159 ets:delete(StreamMap),
160 % remove any entry regarding 'S' from ipa_sockets
161 ets:delete(ipa_sockets, S),
162 ok.
163
164% send a binary message through a given Socket / StreamID
165send(Socket, StreamID, DataBin) ->
166 Size = byte_size(DataBin),
167 gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
168
169
170call_sync(Pid, Request) ->
171 Ref = make_ref(),
172 Pid ! {request, {self(), Ref}, Request},
173 receive
174 {reply, Ref, Reply} -> Reply
175 after
176 ?TIMEOUT -> {error, timeout}
177 end.
178
179reply({From, Ref}, Reply) ->
180 From ! {reply, Ref, Reply}.
181
182
183% global module initialization
184init() ->
185 case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
186 ipa_sockets ->
187 ok;
188 _ ->
189 {error, ets_new_ipa_sockets}
190 end.
191
192% initialize a signle socket, create its handle process
193init_sock(Socket, CallingPid) ->
194 StreamMap = ets:new(stream_map, [set]),
195 ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
196 CallingPid ! {ipa_init_sock_done, Socket},
197 loop(Socket, StreamMap).
198
199loop(S, StreamMap) ->
200 receive
201 {request, From, Request} ->
202 Reply = ipa_proto:request(Request),
203 ipa_proto:reply(From, Reply),
204 ipa_proto:loop(S, StreamMap);
205 {ipa_send, S, StreamId, Data} ->
206 send(S, StreamId, Data),
207 ipa_proto:loop(S, StreamMap);
208 {tcp, S, Data} ->
209 % process incoming IPA message and mark socket active once more
210 ipa_proto:process_rx_ipa_msg(S, StreamMap, Data),
211 inet:setopts(S, [{active, once}]),
212 ipa_proto:loop(S, StreamMap);
213 {tcp_closed, S} ->
214 io:format("Socket ~w closed [~w]~n", [S,self()]),
215 ipa_proto:process_tcp_closed(S, StreamMap),
216 % terminate the process by not looping further
217 ok
218 end.
219
220% Respond with PONG to PING
221process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_PING, _) ->
222 io:format("Socket ~p Stream ~p: PING -> PONG~n", [Socket, StreamID]),
223 send(Socket, StreamID, <<?IPAC_MSGT_PONG>>);
224% Simply respond to ID_ACK with ID_ACK
Harald Welte5cde7622012-01-23 23:18:28 +0100225process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_ID_ACK, _) ->
226 io:format("Socket ~p Stream ~p: ID_ACK -> ID_ACK~n", [Socket, StreamID]),
227 send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
Harald Welte176e28c2010-12-19 22:56:58 +0100228% Simply respond to ID_RESP with ID_ACK
229process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_ID_RESP, _) ->
230 io:format("Socket ~p Stream ~p: ID_RESP -> ID_ACK~n", [Socket, StreamID]),
231 send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
232% Default message handler for unknown messages
233process_ccm_msg(Socket, StreamID, MsgType, Opts) ->
234 io:format("Socket ~p Stream ~p: Unknown CCM message type ~p Opts ~p~n",
235 [Socket, StreamID, MsgType, Opts]).
236
237% process an incoming CCM message (Stream ID 254)
238process_rx_ccm_msg(Socket, StreamID, PayloadBin) ->
239 [MsgType|Opts] = binary:bin_to_list(PayloadBin),
240 process_ccm_msg(Socket, StreamID, MsgType, Opts).
241
242send_ccm_id_get(Socket) ->
243 send(Socket, 254, <<?IPAC_MSGT_ID_GET>>).
244
245% convenience wrapper for interactive use / debugging from the shell
246listen_accept_handle(LPort, Opts) ->
247 case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
248 {ok, ListenSock} ->
249 {ok, Port} = inet:port(ListenSock),
250 {ok, Sock} = gen_tcp:accept(ListenSock),
251 {ok, IpaPid} = ipa_proto:register_socket(Sock),
252 ipa_proto:register_stream(Sock, 0, self()),
253 ipa_proto:register_stream(Sock, 255, self()),
254 gen_tcp:controlling_process(Sock, IpaPid),
255 {ok, Port};
256 {error, Reason} ->
257 {error, Reason}
258 end.
259
260% gen_tcp:connect() convenience wrappers
261connect(Address, Port, Options) ->
262 connect(Address, Port, Options, infinity).
263
264connect(Address, Port, Options, Timeout) ->
265 case gen_tcp:connect(Address, Port, ?IPA_SOCKOPTS ++ Options, Timeout) of
266 {ok, Socket} ->
267 case ipa_proto:register_socket(Socket) of
Harald Welte5cde7622012-01-23 23:18:28 +0100268 {ok, IpaPid} ->
269 {ok, {Socket, IpaPid}};
Harald Welte176e28c2010-12-19 22:56:58 +0100270 {error, Reason} ->
271 gen_tcp:close(Socket),
272 {error, Reason}
273 end;
274 {error, Reason} ->
275 {error, Reason}
276 end.
277
278% Utility function to continuously server incomming IPA connections on a
279% listening TCP socket
280start_listen(LPort, NumServers, Opts) ->
281 case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
282 {ok, ListenSock} ->
283 start_servers(NumServers, ListenSock, self()),
284 {ok, Port} = inet:port(ListenSock),
285 Port;
286 {error, Reason} ->
287 {error, Reason}
288 end.
289
290start_servers(0, _, _) ->
291 ok;
292start_servers(Num, LS, CtrlPid) ->
293 spawn(?MODULE, listen_server, [LS, CtrlPid]),
294 start_servers(Num-1, LS, CtrlPid).
295
296listen_server(LS, CtrlPid) ->
297 case gen_tcp:accept(LS) of
298 {ok, S} ->
299 io:format("Accepted TCP connection from ~p~n", [inet:peername(S)]),
300 % assign the socket to the Controlling process
301 gen_tcp:controlling_process(S, CtrlPid),
302 CtrlPid ! {ipa_tcp_accept, S},
303 listen_server(LS, CtrlPid);
304 Other ->
305 io:format("accept returned ~w - goodbye!~n", [Other]),
306 ok
307 end.