Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/riak_repl2_fscoordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,6 @@ remote_node_available({_Partition, _, RemoteNode}, Busies) ->
start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) ->
#state{owners = Owners} = State,
LocalNode = proplists:get_value(Partition, Owners),
lager:info("Starting fssource for ~p on ~p to ~p", [Partition, LocalNode,
Ip]),
case riak_repl2_fssource_sup:enable(LocalNode, Partition, {Ip, Port}) of
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be switched to lager:debug like the other messages instead of being removed outright?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK to just go with this?
2014-10-16 11:26:53.304 [info] <0.1531.0>@riak_repl2_fssource_sup:enable:15 Starting replication fullsync source for 1278813932664540053428224228626747642198940975104 from 'dev1@127.0.0.1' to {{127,0,0,1},10026}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with the log entry from riak_repl2_fssource_sup sounds good to me.

{ok, Pid} ->
link(Pid),
Expand Down Expand Up @@ -863,7 +861,7 @@ schedule_stat_refresh(StatCache) ->

%% @private Exported just to be able to spawn with arguments more nicely.
refresh_stats_worker(ReportTo, Sources) ->
lager:info("Gathering source data for ~p", [Sources]),
lager:debug("Gathering source data for ~p", [Sources]),
SourceStats = gather_source_stats(Sources),
Time = riak_core_util:moment(),
Self = self(),
Expand Down
28 changes: 28 additions & 0 deletions src/riak_repl2_fssink_pool.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
%% Fullsync pool to be shared by all sinks. Globally bounded in size in case multiple
%% fullsyncs are running.

-module(riak_repl2_fssink_pool).
-export([start_link/0, status/0, bin_put/1]).

start_link() ->
MinPool = app_helper:get_env(riak_repl, fssink_min_workers, 5),
MaxPool = app_helper:get_env(riak_repl, fssink_max_workers, 100),
PoolArgs = [{name, {local, ?MODULE}},
{worker_module, riak_repl_fullsync_worker},
{worker_args, []},
{size, MinPool}, {max_overflow, MaxPool}],
poolboy:start_link(PoolArgs).

%% @doc Return the poolboy status
status() ->
{StateName, WorkerQueueLen, Overflow, NumMonitors} = poolboy:status(?MODULE),
[{statename, StateName},
{worker_queue_len, WorkerQueueLen},
{overflow, Overflow},
{num_monitors, NumMonitors}].

%% @doc Send a replication wire-encoded binary to the worker pool
%% for running a put against. No guarantees of completion.
bin_put(BinObj) ->
Pid = poolboy:checkout(?MODULE, true, infinity),
riak_repl_fullsync_worker:do_binput(Pid, BinObj, ?MODULE).
2 changes: 1 addition & 1 deletion src/riak_repl2_fssource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ maybe_exchange_caps(_, Caps, Socket, Transport) ->
%% Start a connection to the remote sink node at IP, using the given fullsync strategy,
%% for the given partition. The protocol version will be determined from the strategy.
connect(IP, Strategy, Partition) ->
lager:info("Connecting to remote ~p for partition ~p", [IP, Partition]),
lager:debug("Connecting to remote ~p for partition ~p", [IP, Partition]),
TcpOptions = [{keepalive, true},
{nodelay, true},
{packet, 4},
Expand Down
50 changes: 43 additions & 7 deletions src/riak_repl_aae_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

%% API
-export([start_link/4, init_sync/1]).
-export([sender_init/2, sender_loop/1]).

-record(state, {
clustername,
socket,
transport,
tree_pid :: pid(), %% pid of the AAE tree
partition,
sender :: pid(),
owner :: pid() %% our fssource owner
}).

Expand Down Expand Up @@ -51,7 +53,8 @@ handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket})
{nodelay, true},
{header, 1}],
ok = Transport:setopts(Socket, TcpOptions),
{reply, ok, State};
Sender = spawn_sender(Transport, Socket),
{reply, ok, State#state{sender=Sender}};

handle_call(status, _From, State) ->
Reply = [{partition_syncing, State#state.partition}],
Expand All @@ -73,7 +76,7 @@ handle_cast(_Msg, State) ->
handle_info({Proto, _Socket, Data}, State=#state{transport=Transport,
socket=Socket}) when Proto==tcp; Proto==ssl ->
TcpOptions = [{active, once}], %% reset to receive next tcp message
ok = Transport:setopts(Socket, TcpOptions),
Transport:setopts(Socket, TcpOptions),
case Data of
[MsgType] ->
process_msg(MsgType, State);
Expand Down Expand Up @@ -141,9 +144,9 @@ process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=Tre

%% no reply
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
RObj = riak_repl_util:from_wire(BObj),
%% do the put
riak_repl_util:do_repl_put(RObj),
%% may block on worker pool, ok return means work was submitted
%% to pool, not that put FSM completed successfully.
ok = riak_repl2_fssink_pool:bin_put(BObj),
{noreply, State};

%% replies: ok | not_responsible
Expand All @@ -164,8 +167,41 @@ process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) ->

%% Send a response back to the aae_source worker

send_reply(Msg, State=#state{socket=Socket, transport=Transport}) ->
send_reply(Msg, State=#state{sender=Sender}) ->
Data = term_to_binary(Msg),
ok = Transport:send(Socket, <<?MSG_REPLY:8, Data/binary>>),
Sender ! <<?MSG_REPLY:8, Data/binary>>,
{noreply, State}.

%%%===================================================================
%%% TCP send helper
%%%===================================================================

%% Sending is performed in a separate process to ensure that the sink
%% never blocks and can therefore always receive messages + reset the
%% active once flag. If a separate process is not used, it is possible
%% for both the source and sink to deadlock on a TCP send that each
%% stall waiting for the remote side to receive.
%%
%% There is currently no backpressure between the sink and this helper
%% process. Adding backpressure runs the risk of once again hitting
%% the aforementioned deadlock scenario.
%%
%% The correct approach is to add backpressure to the actual fullsync
%% protocol itself. This remains as future work.
%%
%% NOTE: the SSL transport wraps the underlying socket in a gen_server,
%% and all socket functions route through this server. As such, splitting
%% sending and receiving between two processes does not help. The SSL
%% transport is thus known to *not* be safe and *can* deadlock.

spawn_sender(Transport, Socket) ->
spawn_link(?MODULE, sender_init, [Transport, Socket]).

sender_init(Transport, Socket) ->
sender_loop({Transport, Socket}).

sender_loop(State={Transport, Socket}) ->
receive Msg ->
ok = Transport:send(Socket, Msg)
end,
?MODULE:sender_loop(State).
Loading