diff --git a/src/riak_repl_aae_sink.erl b/src/riak_repl_aae_sink.erl index a573e07f..ede387d8 100644 --- a/src/riak_repl_aae_sink.erl +++ b/src/riak_repl_aae_sink.erl @@ -15,6 +15,7 @@ %% API -export([start_link/4, init_sync/1]). +-export([sender_init/2, sender_loop/1]). -record(state, { clustername, @@ -22,6 +23,7 @@ transport, tree_pid :: pid(), %% pid of the AAE tree partition, + sender :: pid(), owner :: pid() %% our fssource owner }). @@ -51,7 +53,8 @@ handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket}) {nodelay, true}, {header, 1}], ok = Transport:setopts(Socket, TcpOptions), - {reply, ok, State}; + Sender = spawn_sender(Transport, Socket), + {reply, ok, State#state{sender=Sender}}; handle_call(status, _From, State) -> Reply = [{partition_syncing, State#state.partition}], @@ -73,7 +76,7 @@ handle_cast(_Msg, State) -> handle_info({Proto, _Socket, Data}, State=#state{transport=Transport, socket=Socket}) when Proto==tcp; Proto==ssl -> TcpOptions = [{active, once}], %% reset to receive next tcp message - ok = Transport:setopts(Socket, TcpOptions), + Transport:setopts(Socket, TcpOptions), case Data of [MsgType] -> process_msg(MsgType, State); @@ -164,8 +167,41 @@ process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) -> %% Send a response back to the aae_source worker -send_reply(Msg, State=#state{socket=Socket, transport=Transport}) -> +send_reply(Msg, State=#state{sender=Sender}) -> Data = term_to_binary(Msg), - ok = Transport:send(Socket, <>), + Sender ! <>, {noreply, State}. +%%%=================================================================== +%%% TCP send helper +%%%=================================================================== + +%% Sending is performed in a separate process to ensure that the sink +%% never blocks and can therefore always receive messages + reset the +%% active once flag. If a separate process is not used, it is possible +%% for both the source and sink to deadlock on a TCP send that each +%% stall waiting for the remote side to receive. +%% +%% There is currently no backpressure between the sink and this helper +%% process. Adding backpressure runs the risk of once again hitting +%% the aforementioned deadlock scenario. +%% +%% The correct approach is to add backpressure to the actual fullsync +%% protocol itself. This remains as future work. +%% +%% NOTE: the SSL transport wraps the underlying socket in a gen_server, +%% and all socket functions route through this server. As such, splitting +%% sending and receiving between two processes does not help. The SSL +%% transport is thus known to *not* be safe and *can* deadlock. + +spawn_sender(Transport, Socket) -> + spawn_link(?MODULE, sender_init, [Transport, Socket]). + +sender_init(Transport, Socket) -> + sender_loop({Transport, Socket}). + +sender_loop(State={Transport, Socket}) -> + receive Msg -> + ok = Transport:send(Socket, Msg) + end, + ?MODULE:sender_loop(State). diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index ae2f79b5..b2c280f3 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -30,6 +30,15 @@ -type index() :: non_neg_integer(). -type index_n() :: {index(), pos_integer()}. +-record(exchange, {mode :: inline | buffered, + buffer :: ets:tid(), + bloom :: reference(), %% ebloom + limit :: non_neg_integer(), + count :: non_neg_integer() + }). + +-type exchange() :: #exchange{}. + -record(state, {cluster, client, %% riak:local_client() transport, @@ -44,12 +53,17 @@ diff_batch_size = 1000 :: non_neg_integer(), local_lock = false :: boolean(), owner :: pid(), - proto :: term() + proto :: term(), + exchange :: exchange() }). %% Per state transition timeout used by certain transitions -define(DEFAULT_ACTION_TIMEOUT, 300000). %% 5 minutes +%% the first this many differences are not put in the bloom +%% filter, but simply sent to the remote site directly. +-define(GET_OBJECT_LIMIT, 1000). + %%%=================================================================== %%% API %%%=================================================================== @@ -190,7 +204,7 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% try to get the remote lock case send_synchronous_msg(?MSG_LOCK_TREE, State) of ok -> - update_trees(start_exchange, State); + update_trees(init, State); Error -> lager:info("Remote lock tree for partition ~p failed, got ~p", [Partition, Error]), @@ -203,16 +217,44 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% a timely manner, the exchange will timeout. Since the trees will %% continue to finish the update even after the exchange times out, %% a future exchange should eventually make progress. +update_trees(init, State) -> + NumKeys = 10000000, + {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), + Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), + Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline), + Buffer = case Mode of + inline -> + undefined; + buffered -> + ets:new(?MODULE, [public, set]) + end, + Exchange = #exchange{mode=Mode, + buffer=Buffer, + bloom=Bloom, + limit=Limit, + count=0}, + State2 = State#state{exchange=Exchange}, + update_trees(start_exchange, State2); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), send_complete(State), {stop, normal, State}; -update_trees(start_exchange, State=#state{indexns=IndexN, owner=Owner}) when IndexN == [] -> +update_trees(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), lager:info("AAE fullsync source completed partition ~p", [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), + %% TODO: Why stay in update_trees? Should we stop instead? {next_state, update_trees, State}; +update_trees(continue, State=#state{indexns=IndexNs}) -> + case IndexNs of + [_] -> + send_diffs(init, State); + [_|RestNs] -> + State2 = State#state{built=0, indexns=RestNs}, + gen_fsm:send_event(self(), start_exchange), + {next_state, update_trees, State2} + end; update_trees(start_exchange, State=#state{tree_pid=TreePid, index=Partition, indexns=[IndexN|_IndexNs]}) -> @@ -251,6 +293,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, socket=Socket, index=Partition, tree_pid=TreePid, + exchange=Exchange, indexns=[IndexN|_IndexNs]}) -> lager:debug("Starting fullsync key exchange with ~p for ~p/~p", [Cluster, Partition, IndexN]), @@ -274,9 +317,15 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, ok end; (get_bucket, {L, B}) -> - send_synchronous_msg(?MSG_GET_AAE_BUCKET, {L,B,IndexN}, State); + wait_get_bucket(L, B, IndexN, State); (key_hashes, Segment) -> - send_synchronous_msg(?MSG_GET_AAE_SEGMENT, {Segment,IndexN}, State); + wait_get_segment(Segment, IndexN, State); + (start_exchange_level, {Level, Buckets}) -> + _ = [async_get_bucket(Level, B, IndexN, State) || B <- Buckets], + ok; + (start_exchange_segments, Segments) -> + _ = [async_get_segment(Segment, IndexN, State) || Segment <- Segments], + ok; (final, _) -> %% give ourself control of the socket again ok = Transport:controlling_process(Socket, SourcePid) @@ -291,13 +340,12 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% accumulates a list of one element that is the count of %% keys that differed. We can't prime the accumulator. It %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() - NumKeys = 10000000, - {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - AccFun = fun(KeyDiffs, Acc0) -> + AccFun = fun(KeyDiffs, Exchange0) -> %% Gather diff keys into a bloom filter - lists:foldl(fun(KeyDiff, AccIn) -> - accumulate_diff(KeyDiff, Bloom, AccIn, State) end, - Acc0, + lists:foldl(fun(KeyDiff, ExchangeIn) -> + accumulate_diff(KeyDiff, ExchangeIn, State) + end, + Exchange0, KeyDiffs) end, @@ -305,29 +353,60 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, lager:debug("Starting compare for partition ~p", [Partition]), spawn_link(fun() -> StageStart=os:timestamp(), - riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, TreePid), + Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Bloom}) + gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) end), %% wait for differences from bloom_folder or to be done {next_state, compute_differences, State}. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +async_get_bucket(Level, Bucket, IndexN, State) -> + send_asynchronous_msg(?MSG_GET_AAE_BUCKET, {Level,Bucket,IndexN}, State). + +wait_get_bucket(_Level, _Bucket, _IndexN, State) -> + Reply = get_reply(State), + Reply. + +async_get_segment(Segment, IndexN, State) -> + send_asynchronous_msg(?MSG_GET_AAE_SEGMENT, {Segment,IndexN}, State). + +wait_get_segment(_Segment, _IndexN, State) -> + Reply = get_reply(State), + Reply. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + compute_differences({'$aae_src', worker_pid, WorkerPid}, #state{transport=Transport, socket=Socket} = State) -> ok = Transport:controlling_process(Socket, WorkerPid), WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Bloom}, State) -> - %% if we have anything in our bloom filter, start sending them now. - %% this will start a worker process, which will tell us it's done with - %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State), - - %% wait for differences from bloom_folder or to be done - {next_state, send_diffs, State}. +compute_differences({'$aae_src', done, Exchange}, State) -> + %% Just move on to the next tree exchange since we accumulate + %% differences across all trees. + State2 = State#state{exchange=Exchange}, + update_trees(continue, State2). + +maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit}, + #state{index=Partition}) -> + %% Inline sends already occured inline + Sent = erlang:min(Count, Limit), + lager:info("Directly sent ~b differences inline for partition ~p", + [Sent, Partition]), + ok; +maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) -> + Keys = [{Bucket, Key} || {_, {Bucket, Key}} <- ets:tab2list(Buffer)], + true = ets:delete(Buffer), + Sorted = lists:sort(Keys), + Count = length(Sorted), + lager:info("Directly sending ~p differences for partition ~p", [Count, Partition]), + _ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted], + ok. %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt @@ -337,16 +416,20 @@ send_diffs({diff_obj, RObj}, _From, State) -> send_missing(RObj, State), {reply, ok, send_diffs, State}. +send_diffs(init, State=#state{exchange=Exchange}) -> + %% if we have anything in our bloom filter, start sending them now. + %% this will start a worker process, which will tell us it's done with + %% diffs_done once all differences are sent. + _ = maybe_send_direct(Exchange, State), + _ = finish_sending_differences(Exchange#exchange.bloom, State), + + %% wait for differences from bloom_folder or to be done + {next_state, send_diffs, State}; + %% All indexes in this Partition are done. %% Note: recv'd from an async send event -send_diffs(diff_done, State=#state{indexns=[]}) -> - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=[]}}; -%% IndexN is done, restart for remaining -send_diffs(diff_done, State=#state{indexns=[_IndexN|IndexNs]}) -> - %% re-start for next indexN - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=IndexNs}}. +send_diffs(diff_done, State) -> + update_trees(finish_fullsync, State). %%%=================================================================== %%% Internal functions @@ -443,36 +526,56 @@ replicate_diff(KeyDiff, Acc, State=#state{index=Partition}) -> Acc end. -accumulate_diff(KeyDiff, Bloom, [], State) -> - accumulate_diff(KeyDiff, Bloom, [0], State); -accumulate_diff(KeyDiff, Bloom, [Count], #state{index=Partition}) -> - NumObjects = - case KeyDiff of - {remote_missing, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {different, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p different: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {missing, Bin} -> - %% remote has a key we don't have. Ignore it. - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", - [Partition, Bucket, Key]), - 0; - Other -> - lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), - 0 - end, - [Count+NumObjects]. +accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) -> + case KeyDiff of + {remote_missing, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {different, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p different: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {missing, Bin} -> + %% remote has a key we don't have. Ignore it. + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", + [Partition, Bucket, Key]), + Exchange; + Other -> + lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), + Exchange + end. + +maybe_accumulate_key(Bucket, Key, + Exchange=#exchange{mode=Mode, + bloom=Bloom, + count=Count, + limit=Limit}, + State) -> + if Count < Limit -> + %% Below threshold, handle directly + Exchange2 = handle_direct(Mode, Bucket, Key, Exchange, State), + Exchange2#exchange{count=Count+1}; + true -> + %% Past threshold, add to bloom filter for future bloom fold + ebloom:insert(Bloom, <>), + Exchange#exchange{count=Count+1} + end. + +handle_direct(inline, Bucket, Key, Exchange, State) -> + %% Send key inline + _ = send_missing(Bucket, Key, State), + Exchange; +handle_direct(buffered, Bucket, Key, Exchange, _State) -> + %% Enqueue in "direct send" buffer that will be sorted/sent later + #exchange{buffer=Buffer, count=Count} = Exchange, + ets:insert(Buffer, {Count, {Bucket, Key}}), + Exchange. send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> %% we don't actually have the vclock to compare, so just send the @@ -493,8 +596,33 @@ send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> 1 + length(Objects) end. +%% Copied from riak_kv_vnode:local_get to extend with timeout, +%% might port back to riak_kv_vnode.erl in the future. +%% +%% Note: responses that timeout can result in future late +%% messages arriving from the vnode. This is currently safe +%% because of the catch-all handle_info that will ignore these +%% messages. But, something to keep in mind in the future. +kv_local_get(Index, BKey, Timeout) -> + Ref = make_ref(), + ReqId = erlang:phash2(erlang:now()), + Sender = {raw, Ref, self()}, + riak_kv_vnode:get({Index,node()}, BKey, ReqId, Sender), + receive + {Ref, {r, Result, Index, ReqId}} -> + Result; + {Ref, Reply} -> + {error, Reply} + after Timeout -> + {error, timeout} + end. + +%% Get the K/V directly from the local vnode +local_get(Bucket, Key, #state{index=Index}) -> + kv_local_get(Index, {Bucket, Key}, ?REPL_FSM_TIMEOUT). + send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> - case Client:get(Bucket, Key, 1, ?REPL_FSM_TIMEOUT) of + case local_get(Bucket, Key, State) of {ok, RObj} -> %% we don't actually have the vclock to compare, so just send the %% key and let the other side sort things out. @@ -517,7 +645,8 @@ send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto} %% can't find the key! lager:warning("not_found returned for fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0; - _ -> + {error, timeout} -> + lager:warning("timeout during fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0 end. @@ -569,7 +698,10 @@ send_synchronous_msg(MsgType, State=#state{transport=Transport, %% Async message send with tag and (binary or term data). send_asynchronous_msg(MsgType, Data, #state{transport=Transport, socket=Socket}) when is_binary(Data) -> - ok = Transport:send(Socket, <>). + ok = Transport:send(Socket, <>); +send_asynchronous_msg(MsgType, Msg, State) -> + Data = term_to_binary(Msg), + send_asynchronous_msg(MsgType, Data, State). %% send a message with type tag only, no data send_asynchronous_msg(MsgType, #state{transport=Transport,