Skip to content
44 changes: 40 additions & 4 deletions src/riak_repl_aae_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

%% API
-export([start_link/4, init_sync/1]).
-export([sender_init/2, sender_loop/1]).

-record(state, {
clustername,
socket,
transport,
tree_pid :: pid(), %% pid of the AAE tree
partition,
sender :: pid(),
owner :: pid() %% our fssource owner
}).

Expand Down Expand Up @@ -51,7 +53,8 @@ handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket})
{nodelay, true},
{header, 1}],
ok = Transport:setopts(Socket, TcpOptions),
{reply, ok, State};
Sender = spawn_sender(Transport, Socket),
{reply, ok, State#state{sender=Sender}};

handle_call(status, _From, State) ->
Reply = [{partition_syncing, State#state.partition}],
Expand All @@ -73,7 +76,7 @@ handle_cast(_Msg, State) ->
handle_info({Proto, _Socket, Data}, State=#state{transport=Transport,
socket=Socket}) when Proto==tcp; Proto==ssl ->
TcpOptions = [{active, once}], %% reset to receive next tcp message
ok = Transport:setopts(Socket, TcpOptions),
Transport:setopts(Socket, TcpOptions),
case Data of
[MsgType] ->
process_msg(MsgType, State);
Expand Down Expand Up @@ -164,8 +167,41 @@ process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) ->

%% Send a response back to the aae_source worker

send_reply(Msg, State=#state{socket=Socket, transport=Transport}) ->
send_reply(Msg, State=#state{sender=Sender}) ->
Data = term_to_binary(Msg),
ok = Transport:send(Socket, <<?MSG_REPLY:8, Data/binary>>),
Sender ! <<?MSG_REPLY:8, Data/binary>>,
{noreply, State}.

%%%===================================================================
%%% TCP send helper
%%%===================================================================

%% Sending is performed in a separate process to ensure that the sink
%% never blocks and can therefore always receive messages + reset the
%% active once flag. If a separate process is not used, it is possible
%% for both the source and sink to deadlock on a TCP send that each
%% stall waiting for the remote side to receive.
%%
%% There is currently no backpressure between the sink and this helper
%% process. Adding backpressure runs the risk of once again hitting
%% the aforementioned deadlock scenario.
%%
%% The correct approach is to add backpressure to the actual fullsync
%% protocol itself. This remains as future work.
%%
%% NOTE: the SSL transport wraps the underlying socket in a gen_server,
%% and all socket functions route through this server. As such, splitting
%% sending and receiving between two processes does not help. The SSL
%% transport is thus known to *not* be safe and *can* deadlock.

spawn_sender(Transport, Socket) ->
spawn_link(?MODULE, sender_init, [Transport, Socket]).

sender_init(Transport, Socket) ->
sender_loop({Transport, Socket}).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be kind to your fellow debuggers and set something in the process dictionary we can postmortem what this process is and why it existed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this means: use proc_lib.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I used to do until the inestimable @Vagabond pointed out that anything started with proclib should comply with OTP Design principles and handle ``sys` messages etc.
Maybe a poor mans version would set the the procdict entries so that etop etc showed them nicely.


sender_loop(State={Transport, Socket}) ->
receive Msg ->
ok = Transport:send(Socket, Msg)
end,
?MODULE:sender_loop(State).
Loading