blob: 2d851e3d3db1990729a184a6f1fe41267f03f067 [file] [log] [blame]
Harald Welte176e28c2010-12-19 22:56:58 +01001% ip.access IPA multiplex protocol
2
3% (C) 2010 by Harald Welte <laforge@gnumonks.org>
4% (C) 2010 by On-Waves
5%
6% All Rights Reserved
7%
8% This program is free software; you can redistribute it and/or modify
9% it under the terms of the GNU General Public License as published by
10% the Free Software Foundation; either version 2 of the License, or
11% (at your option) any later version.
12%
13% This program is distributed in the hope that it will be useful,
14% but WITHOUT ANY WARRANTY; without even the implied warranty of
15% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16% GNU General Public License for more details.
17%
18% You should have received a copy of the GNU General Public License along
19% with this program; if not, write to the Free Software Foundation, Inc.,
20% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
22-module(ipa_proto).
23-author('Harald Welte <laforge@gnumonks.org>').
24-compile(export_all).
25
26-define(TIMEOUT, 1000).
27-define(IPA_SOCKOPTS, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]).
28
29-define(IPAC_MSGT_PING, 0).
30-define(IPAC_MSGT_PONG, 1).
31-define(IPAC_MSGT_ID_GET, 4).
32-define(IPAC_MSGT_ID_RESP, 5).
33-define(IPAC_MSGT_ID_ACK, 6).
34
35-export([register_socket/1, register_stream/3, unregister_stream/2,
36 send/3, connect/3, connect/4, listen_accept_handle/2,
37 start_listen/3, controlling_process/3]).
38
39-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
40
41
42
43% register a TCP socket with this IPA protocol implementation
44register_socket(Socket) ->
45 IpaPid = spawn(?MODULE, init_sock, [Socket, self()]),
46 % synchronously wait for init_sock to be done
47 receive
48 {ipa_init_sock_done, Socket} ->
49 % assign ownership of the socket to the new IPA handler process
50 gen_tcp:controlling_process(Socket, IpaPid),
51 {ok, IpaPid}
52 after
53 ?TIMEOUT ->
54 {error, timeout}
55 end.
56
57% call_sync() preceeded by a Socket -> Pid lookup
58call_sync_sock(Socket, Request) ->
59 % resolve PID responsible for this socket
60 case ets:lookup(ipa_sockets, Socket) of
61 [IpaSock] ->
62 call_sync(IpaSock#ipa_socket.ipaPid, Request);
63 _ ->
64 io:format("No Process for Socket ~p~n", [Socket]),
65 {error, no_sock_for_pid}
66 end.
67
68% a user process wants to register itself for a given Socket/StreamID tuple
69register_stream(Socket, StreamID, Pid) ->
70 call_sync_sock(Socket, {ipa_reg_stream, Socket, StreamID, Pid}).
71
72register_streams(_S, []) ->
73 ok;
74register_streams(S, [{StreamID, Pid}|SList]) ->
75 ipa_proto:register_stream(S, StreamID, Pid),
76 register_streams(S, SList).
77
78% unregister for a given stream
79unregister_stream(Socket, StreamID) ->
80 call_sync_sock(Socket, {ipa_unreg_stream, Socket, StreamID}).
81
82% change the controlling process for a given {Socket, StreamID}
83controlling_process(Socket, StreamID, NewPid) ->
84 call_sync_sock(Socket, {ipa_ctrl_proc, Socket, StreamID, NewPid}).
85
86% unblock the socket from further processing
87unblock(Socket) ->
88 send_ccm_id_get(Socket),
89 call_sync_sock(Socket, {ipa_unblock, Socket}).
90
91
92% server-side handler for unregister_stream()
93request({ipa_reg_stream, Socket, StreamID, Pid}) ->
94 io:format("Registering handler ~p for socket ~p Stream ~p~n", [Pid, Socket, StreamID]),
95 [IpaSock] = ets:lookup(ipa_sockets, Socket),
96 ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, Pid});
97% server-side handler for unregister_stream()
98request({ipa_unreg_stream, Socket, StreamID}) ->
99 io:format("Unregistering handler for Socket ~p Stream ~p~n", [Socket, StreamID]),
100 [IpaSock] = ets:lookup(ipa_sockets, Socket),
101 ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID});
102% server-side handler for controlling_process()
103request({ipa_ctrl_proc, Socket, StreamID, NewPid}) ->
104 io:format("Changing handler for socket ~p Stream ~p~n", [Socket, StreamID]),
105 [IpaSock] = ets:lookup(ipa_sockets, Socket),
106 ets:delete(IpaSock#ipa_socket.streamTbl, {Socket, StreamID}),
107 ets:insert_new(IpaSock#ipa_socket.streamTbl, {{Socket, StreamID}, NewPid});
108% server-side handler for unblock()
109request({ipa_unblock, Socket}) ->
110 io:format("Unblocking socket ~p~n", [Socket]),
111 %[IpaSock] = ets:lookup(ipa_sockets, Socket),
112 Ret = inet:setopts(Socket, [{active, once}]),
113 io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
114
115% split an incoming IPA message and split it into Length/StreamID/Payload
116split_ipa_msg(DataBin) ->
117 % FIXME: This will throw an exception if DataBin doesn't contain all payload
118 <<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
119 io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
120 {StreamID, Payload, Trailer}.
121
122% deliver an incoming message to the process that is registered for the socket/stream_id
123deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
124 case ets:lookup(StreamMap, {Socket, StreamID}) of
125 [{_,{process_id, Pid}}] ->
126 Pid ! {ipa, Socket, StreamID, DataBin};
127 [{_,{callback_fn, Fn, Args}}] ->
128 Fn(Socket, StreamID, DataBin, Args);
129 [] ->
130 io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
131 end.
132
133% process (split + deliver) an incoming IPA message
134process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
135 ok;
136process_rx_ipa_msg(S, StreamMap, Data) ->
137 {StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
138 case StreamID of
139 254 ->
140 process_rx_ccm_msg(S, StreamID, PayloadBin);
141 _ ->
142 deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
143 end,
144 process_rx_ipa_msg(S, StreamMap, Trailer).
145
146send_close_signal([]) ->
147 ok;
148send_close_signal([StreamSpec|Tail]) ->
Harald Weltef66bbfa2012-01-28 14:08:52 +0100149 io:format("send_close_signal ~p ~p~n", [StreamSpec, Tail]),
150 case StreamSpec of
151 [{{Socket, StreamID}, {process_id, Pid}}] ->
152 Pid ! {ipa_closed, {Socket, StreamID}};
153 [{{Socket, StreamID}, {callback_fn, Fn, Args}}] ->
154 Fn(Socket, StreamID, ipa_closed, Args)
155 end,
Harald Welte176e28c2010-12-19 22:56:58 +0100156 send_close_signal(Tail).
Harald Weltef66bbfa2012-01-28 14:08:52 +0100157
Harald Welte176e28c2010-12-19 22:56:58 +0100158process_tcp_closed(S, StreamMap) ->
159 % signal the closed socket to the user
160 StreamList = ets:match(StreamMap, '$1'),
161 send_close_signal(StreamList),
162 % remove the stream map for this socket
163 ets:delete(StreamMap),
164 % remove any entry regarding 'S' from ipa_sockets
165 ets:delete(ipa_sockets, S),
166 ok.
167
168% send a binary message through a given Socket / StreamID
169send(Socket, StreamID, DataBin) ->
170 Size = byte_size(DataBin),
171 gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
172
173
174call_sync(Pid, Request) ->
175 Ref = make_ref(),
176 Pid ! {request, {self(), Ref}, Request},
177 receive
178 {reply, Ref, Reply} -> Reply
179 after
180 ?TIMEOUT -> {error, timeout}
181 end.
182
183reply({From, Ref}, Reply) ->
184 From ! {reply, Ref, Reply}.
185
186
187% global module initialization
188init() ->
189 case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
190 ipa_sockets ->
191 ok;
192 _ ->
193 {error, ets_new_ipa_sockets}
194 end.
195
196% initialize a signle socket, create its handle process
197init_sock(Socket, CallingPid) ->
198 StreamMap = ets:new(stream_map, [set]),
199 ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
200 CallingPid ! {ipa_init_sock_done, Socket},
201 loop(Socket, StreamMap).
202
203loop(S, StreamMap) ->
204 receive
205 {request, From, Request} ->
206 Reply = ipa_proto:request(Request),
207 ipa_proto:reply(From, Reply),
208 ipa_proto:loop(S, StreamMap);
209 {ipa_send, S, StreamId, Data} ->
210 send(S, StreamId, Data),
211 ipa_proto:loop(S, StreamMap);
212 {tcp, S, Data} ->
213 % process incoming IPA message and mark socket active once more
214 ipa_proto:process_rx_ipa_msg(S, StreamMap, Data),
215 inet:setopts(S, [{active, once}]),
216 ipa_proto:loop(S, StreamMap);
217 {tcp_closed, S} ->
218 io:format("Socket ~w closed [~w]~n", [S,self()]),
219 ipa_proto:process_tcp_closed(S, StreamMap),
220 % terminate the process by not looping further
221 ok
222 end.
223
224% Respond with PONG to PING
225process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_PING, _) ->
226 io:format("Socket ~p Stream ~p: PING -> PONG~n", [Socket, StreamID]),
227 send(Socket, StreamID, <<?IPAC_MSGT_PONG>>);
228% Simply respond to ID_ACK with ID_ACK
Harald Welte5cde7622012-01-23 23:18:28 +0100229process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_ID_ACK, _) ->
230 io:format("Socket ~p Stream ~p: ID_ACK -> ID_ACK~n", [Socket, StreamID]),
231 send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
Harald Welte176e28c2010-12-19 22:56:58 +0100232% Simply respond to ID_RESP with ID_ACK
233process_ccm_msg(Socket, StreamID, ?IPAC_MSGT_ID_RESP, _) ->
234 io:format("Socket ~p Stream ~p: ID_RESP -> ID_ACK~n", [Socket, StreamID]),
235 send(Socket, StreamID, <<?IPAC_MSGT_ID_ACK>>);
236% Default message handler for unknown messages
237process_ccm_msg(Socket, StreamID, MsgType, Opts) ->
238 io:format("Socket ~p Stream ~p: Unknown CCM message type ~p Opts ~p~n",
239 [Socket, StreamID, MsgType, Opts]).
240
241% process an incoming CCM message (Stream ID 254)
242process_rx_ccm_msg(Socket, StreamID, PayloadBin) ->
243 [MsgType|Opts] = binary:bin_to_list(PayloadBin),
244 process_ccm_msg(Socket, StreamID, MsgType, Opts).
245
246send_ccm_id_get(Socket) ->
247 send(Socket, 254, <<?IPAC_MSGT_ID_GET>>).
248
249% convenience wrapper for interactive use / debugging from the shell
250listen_accept_handle(LPort, Opts) ->
251 case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
252 {ok, ListenSock} ->
253 {ok, Port} = inet:port(ListenSock),
254 {ok, Sock} = gen_tcp:accept(ListenSock),
255 {ok, IpaPid} = ipa_proto:register_socket(Sock),
256 ipa_proto:register_stream(Sock, 0, self()),
257 ipa_proto:register_stream(Sock, 255, self()),
258 gen_tcp:controlling_process(Sock, IpaPid),
259 {ok, Port};
260 {error, Reason} ->
261 {error, Reason}
262 end.
263
264% gen_tcp:connect() convenience wrappers
265connect(Address, Port, Options) ->
266 connect(Address, Port, Options, infinity).
267
268connect(Address, Port, Options, Timeout) ->
269 case gen_tcp:connect(Address, Port, ?IPA_SOCKOPTS ++ Options, Timeout) of
270 {ok, Socket} ->
271 case ipa_proto:register_socket(Socket) of
Harald Welte5cde7622012-01-23 23:18:28 +0100272 {ok, IpaPid} ->
273 {ok, {Socket, IpaPid}};
Harald Welte176e28c2010-12-19 22:56:58 +0100274 {error, Reason} ->
275 gen_tcp:close(Socket),
276 {error, Reason}
277 end;
278 {error, Reason} ->
279 {error, Reason}
280 end.
281
282% Utility function to continuously server incomming IPA connections on a
283% listening TCP socket
284start_listen(LPort, NumServers, Opts) ->
285 case gen_tcp:listen(LPort, ?IPA_SOCKOPTS ++ Opts) of
286 {ok, ListenSock} ->
287 start_servers(NumServers, ListenSock, self()),
288 {ok, Port} = inet:port(ListenSock),
289 Port;
290 {error, Reason} ->
291 {error, Reason}
292 end.
293
294start_servers(0, _, _) ->
295 ok;
296start_servers(Num, LS, CtrlPid) ->
297 spawn(?MODULE, listen_server, [LS, CtrlPid]),
298 start_servers(Num-1, LS, CtrlPid).
299
300listen_server(LS, CtrlPid) ->
301 case gen_tcp:accept(LS) of
302 {ok, S} ->
303 io:format("Accepted TCP connection from ~p~n", [inet:peername(S)]),
304 % assign the socket to the Controlling process
305 gen_tcp:controlling_process(S, CtrlPid),
306 CtrlPid ! {ipa_tcp_accept, S},
307 listen_server(LS, CtrlPid);
308 Other ->
309 io:format("accept returned ~w - goodbye!~n", [Other]),
310 ok
311 end.