From df2eb4626486ed02e57117620f476e53a2fb789b Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Tue, 2 Sep 2014 16:14:17 -0700 Subject: [PATCH 01/15] Refactor AAE FS source state transitions for clarity This commit slightly refactors the riak_repl_aae_source code to make state transitions more obvious. This change also eases the porting of commits from a separate feature branch that implemented a slightly different approach to improved AAE syncing. --- src/riak_repl_aae_source.erl | 63 +++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 99fc5173..a4129499 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -80,9 +80,6 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> lager:info("AAE fullsync source worker started for partition ~p", [Partition]), - NumKeys = 1000000, - {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - State = #state{cluster=Cluster, client=Client, transport=Transport, @@ -91,8 +88,7 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> built=0, owner=OwnerPid, wire_ver=w1, - proto=Proto, - bloom=Bloom }, + proto=Proto}, {ok, prepare_exchange, State}. handle_event(_Event, StateName, State) -> @@ -200,7 +196,7 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% try to get the remote lock case send_synchronous_msg(?MSG_LOCK_TREE, State) of ok -> - update_trees(start_exchange, State); + update_trees(init, State); Error -> lager:info("Remote lock tree for partition ~p failed, got ~p", [Partition, Error]), @@ -213,20 +209,35 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% a timely manner, the exchange will timeout. Since the trees will %% continue to finish the update even after the exchange times out, %% a future exchange should eventually make progress. +update_trees(init, State) -> + NumKeys = 10000000, + {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), + State2 = State#state{bloom=Bloom}, + update_trees(start_exchange, State2); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), send_complete(State), {stop, normal, State}; -update_trees(start_exchange, State=#state{indexns=IndexN, owner=Owner}) when IndexN == [] -> +update_trees(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), lager:info("AAE fullsync source completed partition ~p", [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), + %% TODO: Why stay in update_trees? Should we stop instead? {next_state, update_trees, State}; +update_trees(continue, State=#state{indexns=IndexNs}) -> + case IndexNs of + [_] -> + send_diffs(init, State); + [_|RestNs] -> + State2 = State#state{built=0, indexns=RestNs}, + gen_fsm:send_event(self(), start_exchange), + {next_state, update_trees, State2} + end; update_trees(start_exchange, State=#state{tree_pid=TreePid, index=Partition, indexns=[IndexN|_IndexNs]}) -> - lager:info("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), + lager:debug("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), update_request(TreePid, {Partition, undefined}, IndexN), case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of ok -> @@ -374,7 +385,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, end, lager:info("Full-sync with site ~p; fullsync difference generator for ~p/~p complete (~p differences, completed in ~p secs)", [State#state.cluster, Partition, IndexN, Count, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Bloom, Count}) + gen_fsm:send_event(SourcePid, {'$aae_src', done, Count}) end), %% wait for differences from bloom_folder or to be done @@ -404,25 +415,11 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Bloom, Count}, State=#state{ indexns=IndexNs, bloom=Bloom, diff_cnt=Count0 }) - when length(IndexNs) =< 1 -> - %% we just finished diffing the *last* IndexN, so we go to the vnode fold / bloom state - +compute_differences({'$aae_src', done, Count}, State=#state{diff_cnt=Count0}) -> + %% Just move on to the next tree exchange since we now roll all + %% differences into a single bloom filter. State2 = State#state{ diff_cnt=Count0+Count }, - - %% if we have anything in our bloom filter, start sending them now. - %% this will start a worker process, which will tell us it's done with - %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State2), - - %% wait for differences from bloom_folder or to be done - {next_state, send_diffs, State2}; - -compute_differences({'$aae_src', done, _, Count}, State=#state{ indexns=[_|IndexNs], diff_cnt=Count0 }) -> - %% re-start for next indexN - - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=IndexNs, diff_cnt=Count0+Count }}. + update_trees(continue, State2). %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt @@ -432,11 +429,19 @@ send_diffs({diff_obj, RObj}, _From, State) -> send_missing(RObj, State), {reply, ok, send_diffs, State}. +send_diffs(init, State=#state{bloom=Bloom}) -> + %% if we have anything in our bloom filter, start sending them now. + %% this will start a worker process, which will tell us it's done with + %% diffs_done once all differences are sent. + _ = finish_sending_differences(Bloom, State), + + %% wait for differences from bloom_folder or to be done + {next_state, send_diffs, State}; + %% All indexes in this Partition are done. %% Note: recv'd from an async send event send_diffs(diff_done, State) -> - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=[]}}. + update_trees(finish_fullsync, State). %%%=================================================================== %%% Internal functions From 1ba47e872e9ec69214f4be7e508c38a64154239d Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Tue, 2 Sep 2014 16:14:22 -0700 Subject: [PATCH 02/15] Add buffer-then-send mode for AAE fullsync repair This commit merges in the separately prototyped buffer-then-send approach to directly sending key differences. The mode used is determined by the `fullsync_direct_mode` application variable, which defaults to `inline`. * inline differences selected for direct repair are sent as they are encountered during an AAE exchange. * buffered differences selected for direct repair are buffered in memory during an exchange. After the exchange, the buffer is sorted and keys are then sent in order. --- src/riak_repl_aae_source.erl | 174 ++++++++++++++++++++++------------- 1 file changed, 109 insertions(+), 65 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index a4129499..5c77ebcf 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -30,6 +30,15 @@ -type index() :: non_neg_integer(). -type index_n() :: {index(), pos_integer()}. +-record(exchange, {mode :: inline | buffered, + buffer :: ets:tid(), + bloom :: reference(), %% ebloom + limit :: non_neg_integer(), + count :: non_neg_integer() + }). + +-type exchange() :: #exchange{}. + -record(state, {cluster, client, %% riak:local_client() transport, @@ -45,8 +54,7 @@ local_lock = false :: boolean(), owner :: pid(), proto :: term(), - bloom :: reference(), %% ebloom - diff_cnt=0 :: non_neg_integer() + exchange :: exchange() }). %% Per state transition timeout used by certain transitions @@ -212,7 +220,20 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> update_trees(init, State) -> NumKeys = 10000000, {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - State2 = State#state{bloom=Bloom}, + Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), + Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline), + Buffer = case Mode of + inline -> + undefined; + buffered -> + ets:new(?MODULE, [public, set]) + end, + Exchange = #exchange{mode=Mode, + buffer=Buffer, + bloom=Bloom, + limit=Limit, + count=0}, + State2 = State#state{exchange=Exchange}, update_trees(start_exchange, State2); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), @@ -272,6 +293,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, socket=Socket, index=Partition, tree_pid=TreePid, + exchange=Exchange, indexns=[IndexN|_IndexNs]}) -> lager:debug("Starting fullsync key exchange with ~p for ~p/~p", [Cluster, Partition, IndexN]), @@ -354,38 +376,23 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% accumulates a list of one element that is the count of %% keys that differed. We can't prime the accumulator. It %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() - Bloom = State#state.bloom, - Count0 = State#state.diff_cnt, - Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), - AccFun = fun(KeyDiffs, Acc0) -> - Count = case Acc0 of [C] when is_integer(C) -> C; _ -> 0 end, - FoldFun = - case (Count0+Count) > Limit of - %% Gather diff keys into a bloom filter - true -> fun(KeyDiff, AccIn) -> - accumulate_diff(KeyDiff, Bloom, AccIn, State) - end; - - %% Replicate diffs directly - false -> fun(KeyDiff, AccIn) -> - replicate_diff(KeyDiff, AccIn, State) - end - end, - - lists:foldl(FoldFun, Acc0, KeyDiffs) + AccFun = fun(KeyDiffs, Exchange0) -> + %% Gather diff keys into a bloom filter + lists:foldl(fun(KeyDiff, ExchangeIn) -> + accumulate_diff(KeyDiff, ExchangeIn, State) + end, + Exchange0, + KeyDiffs) end, %% TODO: Add stats for AAE lager:debug("Starting compare for partition ~p", [Partition]), spawn_link(fun() -> StageStart=os:timestamp(), - case riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, TreePid) of - [N] when is_integer(N) -> Count=N; - _ -> Count=0 - end, - lager:info("Full-sync with site ~p; fullsync difference generator for ~p/~p complete (~p differences, completed in ~p secs)", - [State#state.cluster, Partition, IndexN, Count, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Count}) + Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), + lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", + [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), + gen_fsm:send_event(SourcePid, {'$aae_src', Exchange2, done}) end), %% wait for differences from bloom_folder or to be done @@ -415,12 +422,28 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Count}, State=#state{diff_cnt=Count0}) -> - %% Just move on to the next tree exchange since we now roll all - %% differences into a single bloom filter. - State2 = State#state{ diff_cnt=Count0+Count }, +compute_differences({'$aae_src', Exchange, done}, State) -> + %% Just move on to the next tree exchange since we accumulate + %% differences across all trees. + State2 = State#state{exchange=Exchange}, update_trees(continue, State2). +maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit}, + #state{index=Partition}) -> + %% Inline sends already occured inline + Sent = erlang:min(Count, Limit), + lager:info("Directly sent ~b differences inline for partition ~p", + [Sent, Partition]), + ok; +maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) -> + Keys = [{Bucket, Key} || {_, {Bucket, Key}} <- ets:tab2list(Buffer)], + true = ets:delete(Buffer), + Sorted = lists:sort(Keys), + Count = length(Sorted), + lager:info("Directly sending ~p differences for partition ~p", [Count, Partition]), + _ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted], + ok. + %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt %% of the diff_done event. Note: recv'd from a sync send event. @@ -429,11 +452,12 @@ send_diffs({diff_obj, RObj}, _From, State) -> send_missing(RObj, State), {reply, ok, send_diffs, State}. -send_diffs(init, State=#state{bloom=Bloom}) -> +send_diffs(init, State=#state{exchange=Exchange}) -> %% if we have anything in our bloom filter, start sending them now. %% this will start a worker process, which will tell us it's done with %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State), + _ = maybe_send_direct(Exchange, State), + _ = finish_sending_differences(Exchange#exchange.bloom, State), %% wait for differences from bloom_folder or to be done {next_state, send_diffs, State}; @@ -538,36 +562,56 @@ replicate_diff(KeyDiff, Acc, State=#state{index=Partition}) -> Acc end. -accumulate_diff(KeyDiff, Bloom, [], State) -> - accumulate_diff(KeyDiff, Bloom, [0], State); -accumulate_diff(KeyDiff, Bloom, [Count], #state{index=Partition}) -> - NumObjects = - case KeyDiff of - {remote_missing, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {different, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p different: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {missing, Bin} -> - %% remote has a key we don't have. Ignore it. - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", - [Partition, Bucket, Key]), - 0; - Other -> - lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), - 0 - end, - [Count+NumObjects]. +accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) -> + case KeyDiff of + {remote_missing, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {different, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p different: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {missing, Bin} -> + %% remote has a key we don't have. Ignore it. + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", + [Partition, Bucket, Key]), + Exchange; + Other -> + lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), + Exchange + end. + +maybe_accumulate_key(Bucket, Key, + Exchange=#exchange{mode=Mode, + bloom=Bloom, + count=Count, + limit=Limit}, + State) -> + if Count < Limit -> + %% Below threshold, handle directly + Exchange2 = handle_direct(Mode, Bucket, Key, Exchange, State), + Exchange2#exchange{count=Count+1}; + true -> + %% Past threshold, add to bloom filter for future bloom fold + ebloom:insert(Bloom, <>), + Exchange#exchange{count=Count+1} + end. + +handle_direct(inline, Bucket, Key, Exchange, State) -> + %% Send key inline + _ = send_missing(Bucket, Key, State), + Exchange; +handle_direct(buffered, Bucket, Key, Exchange, _State) -> + %% Enqueue in "direct send" buffer that will be sorted/sent later + #exchange{buffer=Buffer, count=Count} = Exchange, + ets:insert(Buffer, {Count, {Bucket, Key}}), + Exchange. send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> %% we don't actually have the vclock to compare, so just send the From 6b1d4f67bbf7256ea7a9e1b1563902794f293e7c Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Wed, 17 Sep 2014 16:31:27 -0700 Subject: [PATCH 03/15] Fix deadlock between AAE fullsync sink/source A deadlock in AAE fullsync was identified during testing. The deadlock occurs when both the AAE fullsync source and sink processes block on TCP send due to TCP backpressure. Since both processes are blocked, neither can perform a receive to unblock the other. This commit addresses the issue by making the AAE sink use a helper process for sending, therefore ensuring the sink never blocks on a send. The sink will therefore always get around to receiving data at some point, unblocking the source if needed. NOTE: the SSL transport wraps the underlying socket in a gen_server, and all socket functions route through this server. As such, the SSL transport is still vulnerable to this deadlock. Fixing this requires either changing the AAE protocol or patching the OTP SSL implementation. One of the two is planned future work. --- src/riak_repl_aae_sink.erl | 44 ++++++++++++++++++++++++++++++++---- src/riak_repl_aae_source.erl | 4 ++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/riak_repl_aae_sink.erl b/src/riak_repl_aae_sink.erl index a573e07f..ede387d8 100644 --- a/src/riak_repl_aae_sink.erl +++ b/src/riak_repl_aae_sink.erl @@ -15,6 +15,7 @@ %% API -export([start_link/4, init_sync/1]). +-export([sender_init/2, sender_loop/1]). -record(state, { clustername, @@ -22,6 +23,7 @@ transport, tree_pid :: pid(), %% pid of the AAE tree partition, + sender :: pid(), owner :: pid() %% our fssource owner }). @@ -51,7 +53,8 @@ handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket}) {nodelay, true}, {header, 1}], ok = Transport:setopts(Socket, TcpOptions), - {reply, ok, State}; + Sender = spawn_sender(Transport, Socket), + {reply, ok, State#state{sender=Sender}}; handle_call(status, _From, State) -> Reply = [{partition_syncing, State#state.partition}], @@ -73,7 +76,7 @@ handle_cast(_Msg, State) -> handle_info({Proto, _Socket, Data}, State=#state{transport=Transport, socket=Socket}) when Proto==tcp; Proto==ssl -> TcpOptions = [{active, once}], %% reset to receive next tcp message - ok = Transport:setopts(Socket, TcpOptions), + Transport:setopts(Socket, TcpOptions), case Data of [MsgType] -> process_msg(MsgType, State); @@ -164,8 +167,41 @@ process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) -> %% Send a response back to the aae_source worker -send_reply(Msg, State=#state{socket=Socket, transport=Transport}) -> +send_reply(Msg, State=#state{sender=Sender}) -> Data = term_to_binary(Msg), - ok = Transport:send(Socket, <>), + Sender ! <>, {noreply, State}. +%%%=================================================================== +%%% TCP send helper +%%%=================================================================== + +%% Sending is performed in a separate process to ensure that the sink +%% never blocks and can therefore always receive messages + reset the +%% active once flag. If a separate process is not used, it is possible +%% for both the source and sink to deadlock on a TCP send that each +%% stall waiting for the remote side to receive. +%% +%% There is currently no backpressure between the sink and this helper +%% process. Adding backpressure runs the risk of once again hitting +%% the aforementioned deadlock scenario. +%% +%% The correct approach is to add backpressure to the actual fullsync +%% protocol itself. This remains as future work. +%% +%% NOTE: the SSL transport wraps the underlying socket in a gen_server, +%% and all socket functions route through this server. As such, splitting +%% sending and receiving between two processes does not help. The SSL +%% transport is thus known to *not* be safe and *can* deadlock. + +spawn_sender(Transport, Socket) -> + spawn_link(?MODULE, sender_init, [Transport, Socket]). + +sender_init(Transport, Socket) -> + sender_loop({Transport, Socket}). + +sender_loop(State={Transport, Socket}) -> + receive Msg -> + ok = Transport:send(Socket, Msg) + end, + ?MODULE:sender_loop(State). diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 5c77ebcf..aeedfca2 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -392,7 +392,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', Exchange2, done}) + gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) end), %% wait for differences from bloom_folder or to be done @@ -422,7 +422,7 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', Exchange, done}, State) -> +compute_differences({'$aae_src', done, Exchange}, State) -> %% Just move on to the next tree exchange since we accumulate %% differences across all trees. State2 = State#state{exchange=Exchange}, From e444a03dcbc800512b846c5c7c8481dd0c4f3ba2 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Wed, 17 Sep 2014 22:49:17 -0700 Subject: [PATCH 04/15] Remove non-pipelined AAE fullsync mode --- src/riak_repl_aae_source.erl | 38 +----------------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index aeedfca2..edeb3fb0 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -305,36 +305,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% allow us to pass control of the TCP socket around. This is needed so %% that the process needing to send/receive on that socket has ownership %% of it. - %% - %% Old, non-pipelined verison - TestR1 = fun(init, _) -> - %% cause control of the socket to be given to AAE so that - %% the get_bucket and key_hashes can send messages via the - %% socket (with correct ownership). We'll send a 'ready' - %% back here once the socket ownership is transfered and - %% we are ready to proceed with the compare. - gen_fsm:send_event(SourcePid, {'$aae_src', worker_pid, self()}), - receive - {'$aae_src', ready, SourcePid} -> - ok - end; - (get_bucket, {L, B}) -> - async_get_bucket(L, B, IndexN, State), - wait_get_bucket(L, B, IndexN, State); - (key_hashes, Segment) -> - async_get_segment(Segment, IndexN, State), - wait_get_segment(Segment, IndexN, State); - (start_exchange_level, {_Level, _Buckets}) -> - ok; - (start_exchange_segments, _Segments) -> - ok; - (final, _) -> - %% give ourself control of the socket again - ok = Transport:controlling_process(Socket, SourcePid) - end, - - %% Pipelined verison - TestR2 = fun(init, _) -> + Remote = fun(init, _) -> %% cause control of the socket to be given to AAE so that %% the get_bucket and key_hashes can send messages via the %% socket (with correct ownership). We'll send a 'ready' @@ -360,13 +331,6 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, ok = Transport:controlling_process(Socket, SourcePid) end, - Remote = case app_helper:get_env(riak_repl, fullsync_pipeline, false) of - false -> - TestR1; - true -> - TestR2 - end, - %% Unclear if we should allow exchange to run indefinitely or enforce %% a timeout. The problem is that depending on the number of keys and %% key differences, exchange can take arbitrarily long. For now, go with From abdd5139b8ce097b82ac3eeb5c6a40ecdcccfd79 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Fri, 19 Sep 2014 17:06:50 -0700 Subject: [PATCH 05/15] Change AAE FS source to use direct vnode gets When performing direct sends, the AAE fullsync source retrieves individual objects in order to send them. Previously, this was accomplished using normal riak_client gets using the n_val=1 option. However, there are no guarantees about which replica the object would come from using such an approach. This commit changes the AAE fullsync source to instead retrieve objects directly from the vnode performing the fullsync. This ensures that the object sent aligns with the AAE hash used to determine differences. This approach may also have better performance. --- src/riak_repl_aae_source.erl | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index edeb3fb0..b2c280f3 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -596,8 +596,33 @@ send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> 1 + length(Objects) end. +%% Copied from riak_kv_vnode:local_get to extend with timeout, +%% might port back to riak_kv_vnode.erl in the future. +%% +%% Note: responses that timeout can result in future late +%% messages arriving from the vnode. This is currently safe +%% because of the catch-all handle_info that will ignore these +%% messages. But, something to keep in mind in the future. +kv_local_get(Index, BKey, Timeout) -> + Ref = make_ref(), + ReqId = erlang:phash2(erlang:now()), + Sender = {raw, Ref, self()}, + riak_kv_vnode:get({Index,node()}, BKey, ReqId, Sender), + receive + {Ref, {r, Result, Index, ReqId}} -> + Result; + {Ref, Reply} -> + {error, Reply} + after Timeout -> + {error, timeout} + end. + +%% Get the K/V directly from the local vnode +local_get(Bucket, Key, #state{index=Index}) -> + kv_local_get(Index, {Bucket, Key}, ?REPL_FSM_TIMEOUT). + send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> - case Client:get(Bucket, Key, [{r, 1}, {timeout, ?REPL_FSM_TIMEOUT}, {n_val, 1}]) of + case local_get(Bucket, Key, State) of {ok, RObj} -> %% we don't actually have the vclock to compare, so just send the %% key and let the other side sort things out. @@ -620,7 +645,8 @@ send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto} %% can't find the key! lager:warning("not_found returned for fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0; - _ -> + {error, timeout} -> + lager:warning("timeout during fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0 end. From a2b5941b021942f511a04ec889905423495f9dce Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 17 Sep 2014 09:59:12 -0600 Subject: [PATCH 06/15] Added a worker pool for fullsync sinks. Although keylisting has a pool for issuing gets, both the AAE and the Keylist strategies serialize writing objects from the sink process. This adds a pool with a default size of 100 shared across all fullsync sinks that make the writes async. No difference in safety properties, neither version checks the response from the put FSM. --- src/riak_repl2_fssink_pool.erl | 28 ++++++++++++++++++++++++++++ src/riak_repl_aae_sink.erl | 6 +++--- src/riak_repl_fullsync_worker.erl | 12 +++++++++++- src/riak_repl_sup.erl | 3 +++ 4 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 src/riak_repl2_fssink_pool.erl diff --git a/src/riak_repl2_fssink_pool.erl b/src/riak_repl2_fssink_pool.erl new file mode 100644 index 00000000..896e34d1 --- /dev/null +++ b/src/riak_repl2_fssink_pool.erl @@ -0,0 +1,28 @@ +%% Fullsync pool to be shared by all sinks. Globally bounded in size in case multiple +%% fullsyncs are running. + +-module(riak_repl2_fssink_pool). +-export([start_link/0, status/0, bin_put/1]). + +start_link() -> + MinPool = app_helper:get_env(riak_repl, fssink_min_workers, 5), + MaxPool = app_helper:get_env(riak_repl, fssink_max_workers, 100), + PoolArgs = [{name, {local, ?MODULE}}, + {worker_module, riak_repl_fullsync_worker}, + {worker_args, []}, + {size, MinPool}, {max_overflow, MaxPool}], + poolboy:start_link(PoolArgs). + +%% @doc Return the poolboy status +status() -> + {StateName, WorkerQueueLen, Overflow, NumMonitors} = poolboy:status(?MODULE), + [{statename, StateName}, + {worker_queue_len, WorkerQueueLen}, + {overflow, Overflow}, + {num_monitors, NumMonitors}]. + +%% @doc Send a replication wire-encoded binary to the worker pool +%% for running a put against. No guarantees of completion. +bin_put(BinObj) -> + Pid = poolboy:checkout(?MODULE, true, infinity), + riak_repl_fullsync_worker:do_binput(Pid, BinObj, ?MODULE). diff --git a/src/riak_repl_aae_sink.erl b/src/riak_repl_aae_sink.erl index a573e07f..d49b11f8 100644 --- a/src/riak_repl_aae_sink.erl +++ b/src/riak_repl_aae_sink.erl @@ -141,9 +141,9 @@ process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=Tre %% no reply process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) -> - RObj = riak_repl_util:from_wire(BObj), - %% do the put - riak_repl_util:do_repl_put(RObj), + %% may block on worker pool, ok return means work was submitted + %% to pool, not that put FSM completed successfully. + ok = riak_repl2_fssink_pool:bin_put(BObj), {noreply, State}; %% replies: ok | not_responsible diff --git a/src/riak_repl_fullsync_worker.erl b/src/riak_repl_fullsync_worker.erl index 1bcc52c6..ac69b1a5 100644 --- a/src/riak_repl_fullsync_worker.erl +++ b/src/riak_repl_fullsync_worker.erl @@ -7,7 +7,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([start_link/1, do_put/3, do_binputs/5, do_get/7, do_get/8]). +-export([start_link/1, do_put/3, do_binput/3, do_binputs/5, do_get/7, do_get/8]). -export([do_binputs_internal/4]). %% Used for unit/integration testing, not public interface @@ -19,6 +19,11 @@ start_link(_Args) -> do_put(Pid, Obj, Pool) -> gen_server:cast(Pid, {put, Obj, Pool}). +%% Put a single object, encoded as replication binary from wire format +do_binput(Pid, BinObj, Pool) -> + %% safe to cast as the pool size will add backpressure on the sink + gen_server:cast(Pid, {bin_put, BinObj, Pool}). + do_binputs(Pid, BinObjs, DoneFun, Pool, Ver) -> %% safe to cast as the pool size will add backpressure on the sink gen_server:cast(Pid, {puts, BinObjs, DoneFun, Pool, Ver}). @@ -134,6 +139,11 @@ handle_cast({put, RObj, Pool}, State) -> %% unblock this worker for more work (or death) poolboy:checkin(Pool, self()), {noreply, State}; +handle_cast({bin_put, BinObj, Pool}, State) -> + RObj = riak_repl_util:from_wire(BinObj), + riak_repl_util:do_repl_put(RObj), + poolboy:checkin(Pool, self()), % resume work + {noreply, State}; handle_cast({puts, BinObjs, DoneFun, Pool, Ver}, State) -> ?MODULE:do_binputs_internal(BinObjs, DoneFun, Pool, Ver), % so it can be mecked {noreply, State}; diff --git a/src/riak_repl_sup.erl b/src/riak_repl_sup.erl index 23627e95..7aadf378 100644 --- a/src/riak_repl_sup.erl +++ b/src/riak_repl_sup.erl @@ -48,6 +48,9 @@ init([]) -> {riak_repl2_fssource_sup, {riak_repl2_fssource_sup, start_link, []}, permanent, infinity, supervisor, [riak_repl2_fssource_sup]}, + {riak_repl2_fssink_pool, {riak_repl2_fssink_pool, start_link, []}, + permanent, 5000, worker, [riak_repl2_fssink_pool, poolboy]}, + {riak_repl2_fssink_sup, {riak_repl2_fssink_sup, start_link, []}, permanent, infinity, supervisor, [riak_repl2_fssink_sup]}, From 4e2e31b28c0530b5a54dd6b815ab35375cbe79f0 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Sun, 21 Sep 2014 10:58:54 -0600 Subject: [PATCH 07/15] Changed fullsync_direct to fullsync_direct_limit, stopped "Gathering source data" msg. Minor changes for user experience, clarified with _limit to permit a future _percentage version of fullsync_direct. Lowered the "Gathering source data" message displayed every minute to a debug message. --- src/riak_repl2_fscoordinator.erl | 2 +- src/riak_repl_aae_source.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index d2e2c968..d0722a24 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -863,7 +863,7 @@ schedule_stat_refresh(StatCache) -> %% @private Exported just to be able to spawn with arguments more nicely. refresh_stats_worker(ReportTo, Sources) -> - lager:info("Gathering source data for ~p", [Sources]), + lager:debug("Gathering source data for ~p", [Sources]), SourceStats = gather_source_stats(Sources), Time = riak_core_util:moment(), Self = self(), diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index b2c280f3..cae1fa7e 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -220,7 +220,7 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> update_trees(init, State) -> NumKeys = 10000000, {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), + Limit = app_helper:get_env(riak_repl, fullsync_direct_limit, ?GET_OBJECT_LIMIT), Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline), Buffer = case Mode of inline -> From 779c1ecd942d91beb644824a1b6d6eecea0a53f1 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Mon, 6 Oct 2014 15:22:01 -0700 Subject: [PATCH 08/15] Remove unused riak_repl_aae_source:replicate_diff/3 --- src/riak_repl_aae_source.erl | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 7dd9e004..b6804282 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -25,8 +25,6 @@ -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). --export([replicate_diff/3]). - -type index() :: non_neg_integer(). -type index_n() :: {index(), pos_integer()}. @@ -517,34 +515,6 @@ bloom_fold({B, K}, V, {MPid, Bloom}) -> end, {MPid, Bloom}. -%% @private -%% Returns accumulator as a list of one element that is the count of -%% keys that differed. Initial value of Acc is always []. -replicate_diff(KeyDiff, {DiffCount, Bloom} = Acc, State=#state{index=Partition}) -> - case KeyDiff of - {remote_missing, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", - [Partition, Bucket, Key]), - {DiffCount + send_missing(Bucket, Key, State), Bloom}; - {different, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p different: ~p:~p", - [Partition, Bucket, Key]), - {DiffCount + send_missing(Bucket, Key, State), Bloom}; - {missing, Bin} -> - %% remote has a key we don't have. Ignore it. - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", - [Partition, Bucket, Key]), - Acc; - Other -> - lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), - Acc - end. - accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) -> case KeyDiff of {remote_missing, Bin} -> From 83dd1929aff4697fecee109627f891de64398e20 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Thu, 16 Oct 2014 11:48:22 +0100 Subject: [PATCH 09/15] Remove redundant and unnecessary logging --- src/riak_repl2_fscoordinator.erl | 2 -- src/riak_repl2_fssource.erl | 5 ++--- src/riak_repl_aae_source.erl | 15 ++++----------- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index d0722a24..b23cf38f 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -768,8 +768,6 @@ remote_node_available({_Partition, _, RemoteNode}, Busies) -> start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) -> #state{owners = Owners} = State, LocalNode = proplists:get_value(Partition, Owners), - lager:info("Starting fssource for ~p on ~p to ~p", [Partition, LocalNode, - Ip]), case riak_repl2_fssource_sup:enable(LocalNode, Partition, {Ip, Port}) of {ok, Pid} -> link(Pid), diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl index 1ccb232f..b01cb82d 100644 --- a/src/riak_repl2_fssource.erl +++ b/src/riak_repl2_fssource.erl @@ -81,9 +81,8 @@ init([Partition, IP]) -> end. handle_call({connected, Socket, Transport, _Endpoint, Proto, Props}, - _From, State=#state{ip=IP, partition=Partition, strategy=RequestedStrategy}) -> + _From, State=#state{partition=Partition, strategy=RequestedStrategy}) -> Cluster = proplists:get_value(clustername, Props), - lager:info("Fullsync connection to ~p for ~p",[IP, Partition]), SocketTag = riak_repl_util:generate_socket_tag("fs_source", Transport, Socket), lager:debug("Keeping stats for " ++ SocketTag), @@ -287,7 +286,7 @@ maybe_exchange_caps(_, Caps, Socket, Transport) -> %% Start a connection to the remote sink node at IP, using the given fullsync strategy, %% for the given partition. The protocol version will be determined from the strategy. connect(IP, Strategy, Partition) -> - lager:info("Connecting to remote ~p for partition ~p", [IP, Partition]), + lager:debug("Connecting to remote ~p for partition ~p", [IP, Partition]), TcpOptions = [{keepalive, true}, {nodelay, true}, {packet, 4}, diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index b6804282..43fb9bb0 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -87,8 +87,7 @@ cancel_fullsync(Pid) -> %%%=================================================================== init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> - lager:info("AAE fullsync source worker started for partition ~p", - [Partition]), + Ver = riak_repl_util:deduce_wire_version_from_proto(Proto), {_, ClientVer, _} = Proto, @@ -230,7 +229,6 @@ update_trees(cancel_fullsync, State) -> update_trees(start_exchange, State=#state{tree_pid=TreePid, index=Partition, indexns=IndexNs}) -> - lager:info("Start update for partition,IndexN ~p,~p", [Partition, IndexNs]), lists:foreach(fun(IndexN) -> update_request(TreePid, {Partition, undefined}, IndexN), case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of @@ -253,7 +251,6 @@ update_trees({tree_built, _, _}, State = #state{indexns=IndexNs}) -> NeededBuilts -> %% Trees built now we can estimate how many keys {ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid), - lager:info("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]), lager:debug("Moving to key exchange state"), key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys}); @@ -288,8 +285,6 @@ key_exchange(cancel_fullsync, State) -> {stop, normal, State}; key_exchange(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), - lager:info("AAE fullsync source completed partition ~p", - [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), %% TODO: Why stay in key_exchange? Should we stop instead? {next_state, key_exchange, State}; @@ -368,8 +363,8 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, spawn_link(fun() -> StageStart=os:timestamp(), Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), - lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", - [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), + lager:debug("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", + [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) end), @@ -413,12 +408,10 @@ maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit}, lager:info("Directly sent ~b differences inline for partition ~p", [Sent, Partition]), ok; -maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) -> +maybe_send_direct(#exchange{buffer=Buffer}, State) -> Keys = [{Bucket, Key} || {_, {Bucket, Key}} <- ets:tab2list(Buffer)], true = ets:delete(Buffer), Sorted = lists:sort(Keys), - Count = length(Sorted), - lager:info("Directly sending ~p differences for partition ~p", [Count, Partition]), _ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted], ok. From e43257a45b1bf6f25f0134c2b9bb39140ecf2c40 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Thu, 16 Oct 2014 17:18:56 +0100 Subject: [PATCH 10/15] Revert some of "Remove redundant and unnecessary logging" Keep needed for testing as debug. --- src/riak_repl2_fssource.erl | 3 ++- src/riak_repl_aae_source.erl | 10 ++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl index b01cb82d..fffb69dc 100644 --- a/src/riak_repl2_fssource.erl +++ b/src/riak_repl2_fssource.erl @@ -81,8 +81,9 @@ init([Partition, IP]) -> end. handle_call({connected, Socket, Transport, _Endpoint, Proto, Props}, - _From, State=#state{partition=Partition, strategy=RequestedStrategy}) -> + _From, State=#state{ip=IP, partition=Partition, strategy=RequestedStrategy}) -> Cluster = proplists:get_value(clustername, Props), + lager:info("Fullsync connection to ~p for ~p",[IP, Partition]), SocketTag = riak_repl_util:generate_socket_tag("fs_source", Transport, Socket), lager:debug("Keeping stats for " ++ SocketTag), diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 43fb9bb0..23126c48 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -87,7 +87,8 @@ cancel_fullsync(Pid) -> %%%=================================================================== init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> - + lager:debug("AAE fullsync source worker started for partition ~p", + [Partition]), Ver = riak_repl_util:deduce_wire_version_from_proto(Proto), {_, ClientVer, _} = Proto, @@ -251,6 +252,7 @@ update_trees({tree_built, _, _}, State = #state{indexns=IndexNs}) -> NeededBuilts -> %% Trees built now we can estimate how many keys {ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid), + lager:debug("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]), lager:debug("Moving to key exchange state"), key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys}); @@ -285,6 +287,8 @@ key_exchange(cancel_fullsync, State) -> {stop, normal, State}; key_exchange(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), + lager:debug("AAE fullsync source completed partition ~p", + [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), %% TODO: Why stay in key_exchange? Should we stop instead? {next_state, key_exchange, State}; @@ -408,10 +412,12 @@ maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit}, lager:info("Directly sent ~b differences inline for partition ~p", [Sent, Partition]), ok; -maybe_send_direct(#exchange{buffer=Buffer}, State) -> +maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) -> Keys = [{Bucket, Key} || {_, {Bucket, Key}} <- ets:tab2list(Buffer)], true = ets:delete(Buffer), Sorted = lists:sort(Keys), + Count = length(Sorted), + lager:debug("Directly sending ~p differences for partition ~p", [Count, Partition]), _ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted], ok. From 67b95356e04a8ab2a03f16c3d0b85ad4928334eb Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Tue, 21 Oct 2014 10:17:58 +0100 Subject: [PATCH 11/15] Fix xref and dialyzer --- rebar.config | 2 +- src/riak_repl_aae_source.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 7a076b92..9c9ebdb9 100644 --- a/rebar.config +++ b/rebar.config @@ -10,6 +10,6 @@ {lager, "2.0.3", {git, "git://github.com/basho/lager.git", {tag, "2.0.3"}}}, {ranch, "0.4.0-p1", {git, "git://github.com/basho/ranch.git", {tag, "0.4.0-p1"}}}, {ebloom, ".*", {git, "git://github.com/basho/ebloom.git", {tag, "2.0.0"}}}, - {riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {branch, "2.0"}}}, + {riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {branch, "feature/aae-estimate-keys"}}}, {riak_repl_pb_api, ".*", {git, "git@github.com:basho/riak_repl_pb_api.git", {branch, "2.0"}}} ]}. diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 23126c48..9a4fa7cb 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -236,7 +236,7 @@ update_trees(start_exchange, State=#state{tree_pid=TreePid, ok -> gen_fsm:send_event(self(), {tree_built, Partition, IndexN}); not_responsible -> - gen_fsm:send_event(self(), {not_responsible, Partition, IndexN}, State) + gen_fsm:send_event(self(), {not_responsible, Partition, IndexN}) end end, IndexNs), {next_state, update_trees, State}; From 38b7b4b1e357e4abae3ed827ad72ea5da559db24 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Tue, 9 Dec 2014 10:43:43 +0000 Subject: [PATCH 12/15] Revert riak_kv deps changes --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 9c9ebdb9..7a076b92 100644 --- a/rebar.config +++ b/rebar.config @@ -10,6 +10,6 @@ {lager, "2.0.3", {git, "git://github.com/basho/lager.git", {tag, "2.0.3"}}}, {ranch, "0.4.0-p1", {git, "git://github.com/basho/ranch.git", {tag, "0.4.0-p1"}}}, {ebloom, ".*", {git, "git://github.com/basho/ebloom.git", {tag, "2.0.0"}}}, - {riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {branch, "feature/aae-estimate-keys"}}}, + {riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {branch, "2.0"}}}, {riak_repl_pb_api, ".*", {git, "git@github.com:basho/riak_repl_pb_api.git", {branch, "2.0"}}} ]}. From ce0c1b46740cbbf932b4326ba94981682a992f83 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Wed, 10 Dec 2014 16:53:02 +0000 Subject: [PATCH 13/15] Update logging after review --- src/riak_repl_aae_source.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 9a4fa7cb..c5de2b5e 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -242,7 +242,8 @@ update_trees(start_exchange, State=#state{tree_pid=TreePid, {next_state, update_trees, State}; update_trees({not_responsible, Partition, IndexN}, State = #state{owner=Owner}) -> - lager:debug("VNode ~p does not cover preflist ~p", [Partition, IndexN]), + lager:debug("Skipping AAE fullsync tree update for vnode ~p because" + " it is not responsible for the preflist ~p", [Partition, IndexN]), gen_server:cast(Owner, not_responsible), {stop, normal, State}; update_trees({tree_built, _, _}, State = #state{indexns=IndexNs}) -> @@ -449,7 +450,7 @@ send_diffs(diff_done, State) -> %%%=================================================================== finish_sending_differences(#exchange{bloom=undefined, count=DiffCnt}, #state{index=Partition, estimated_nr_keys=EstimatedNrKeys}) -> - lager:info("No Bloom folding over ~p/~p differences for partition ~p with EstimatedNrKeys ~p", + lager:info("Syncing without bloom ~p/~p differences for partition ~p with EstimatedNrKeys ~p", [0, DiffCnt, Partition, EstimatedNrKeys]), gen_fsm:send_event(self(), diff_done); @@ -457,7 +458,7 @@ finish_sending_differences(#exchange{bloom=Bloom, count=DiffCnt}, #state{index=Partition, estimated_nr_keys=EstimatedNrKeys}) -> case ebloom:elements(Bloom) of Count = 0 -> - lager:info("No Bloom folding over ~p/~p differences for partition ~p with EstimatedNrKeys ~p", + lager:info("Syncing without bloom ~p/~p differences for partition ~p with EstimatedNrKeys ~p", [Count, DiffCnt, Partition, EstimatedNrKeys]), gen_fsm:send_event(self(), diff_done); Count -> From da47f0a18a53847ea8f220eca0cecc84b1d637b0 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Mon, 15 Dec 2014 10:43:36 +0000 Subject: [PATCH 14/15] Cancel directly on not_responsible from remote cluster Remove loop so we can receive cancel_fullsync during update of remote trees. --- src/riak_repl_aae_source.erl | 41 +++++++++++++++++------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index c5de2b5e..70f4a4e2 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -221,32 +221,28 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% a timely manner, the exchange will timeout. Since the trees will %% continue to finish the update even after the exchange times out, %% a future exchange should eventually make progress. -update_trees(init, State) -> - update_trees(start_exchange, State); +update_trees(init, State=#state{indexns=IndexNs}) -> + update_trees({start_exchange, IndexNs}, State); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), send_complete(State), {stop, normal, State}; -update_trees(start_exchange, State=#state{tree_pid=TreePid, - index=Partition, - indexns=IndexNs}) -> - lists:foreach(fun(IndexN) -> - update_request(TreePid, {Partition, undefined}, IndexN), - case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of - ok -> - gen_fsm:send_event(self(), {tree_built, Partition, IndexN}); - not_responsible -> - gen_fsm:send_event(self(), {not_responsible, Partition, IndexN}) - end - end, IndexNs), - {next_state, update_trees, State}; - -update_trees({not_responsible, Partition, IndexN}, State = #state{owner=Owner}) -> - lager:debug("Skipping AAE fullsync tree update for vnode ~p because" - " it is not responsible for the preflist ~p", [Partition, IndexN]), - gen_server:cast(Owner, not_responsible), - {stop, normal, State}; -update_trees({tree_built, _, _}, State = #state{indexns=IndexNs}) -> +update_trees({start_exchange, [IndexHead|IndexTail]}, State=#state{tree_pid=TreePid, + index=Partition, + owner=Owner}) -> + update_request(TreePid, {Partition, undefined}, IndexHead), + case send_synchronous_msg(?MSG_UPDATE_TREE, IndexHead, State) of + ok -> + gen_fsm:send_event(self(), {tree_built, Partition, IndexHead, IndexTail}), + {next_state, update_trees, State}; + not_responsible -> + lager:debug("Skipping AAE fullsync tree update for vnode ~p because" + " it is not responsible for the preflist ~p", [Partition, IndexHead]), + gen_server:cast(Owner, not_responsible), + {stop, normal, State} + end; + +update_trees({tree_built, _, _, IndexTail}, State = #state{indexns=IndexNs}) -> Built = State#state.built + 1, NeededBuilts = length(IndexNs) * 2, %% All local and remote case Built of @@ -258,6 +254,7 @@ update_trees({tree_built, _, _}, State = #state{indexns=IndexNs}) -> lager:debug("Moving to key exchange state"), key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys}); _ -> + gen_fsm:send_event(self(), {start_exchange, IndexTail}), {next_state, update_trees, State#state{built=Built}} end. From 6bdc099c9f09ebb3b0bc2f039585001ebd9a8b01 Mon Sep 17 00:00:00 2001 From: Mikael Lixenstrand Date: Tue, 16 Dec 2014 10:19:55 +0000 Subject: [PATCH 15/15] handle not_responsible for local partitions --- src/riak_repl_aae_source.erl | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 70f4a4e2..a405d1e8 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -233,16 +233,25 @@ update_trees({start_exchange, [IndexHead|IndexTail]}, State=#state{tree_pid=Tree update_request(TreePid, {Partition, undefined}, IndexHead), case send_synchronous_msg(?MSG_UPDATE_TREE, IndexHead, State) of ok -> - gen_fsm:send_event(self(), {tree_built, Partition, IndexHead, IndexTail}), + gen_fsm:send_event(self(), tree_built), + case IndexTail of + [] -> ok; + _ -> gen_fsm:send_event(self(), {start_exchange, IndexTail}) + end, {next_state, update_trees, State}; not_responsible -> lager:debug("Skipping AAE fullsync tree update for vnode ~p because" " it is not responsible for the preflist ~p", [Partition, IndexHead]), gen_server:cast(Owner, not_responsible), {stop, normal, State} - end; + end; + +update_trees({not_responsible, Partition, IndexN}, State=#state{owner=Owner}) -> + lager:debug("VNode ~p does not cover preflist ~p", [Partition, IndexN]), + gen_server:cast(Owner, not_responsible), + {stop, normal, State}; -update_trees({tree_built, _, _, IndexTail}, State = #state{indexns=IndexNs}) -> +update_trees(tree_built, State = #state{indexns=IndexNs}) -> Built = State#state.built + 1, NeededBuilts = length(IndexNs) * 2, %% All local and remote case Built of @@ -254,7 +263,6 @@ update_trees({tree_built, _, _, IndexTail}, State = #state{indexns=IndexNs}) -> lager:debug("Moving to key exchange state"), key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys}); _ -> - gen_fsm:send_event(self(), {start_exchange, IndexTail}), {next_state, update_trees, State#state{built=Built}} end. @@ -641,7 +649,7 @@ update_request(Tree, {Index, _}, IndexN) -> as_event(fun() -> case riak_kv_index_hashtree:update(IndexN, Tree) of ok -> - {tree_built, Index, IndexN}; + tree_built; not_responsible -> {not_responsible, Index, IndexN} end