From 57b0bb51bb696514a148b1d3027e0dcbc20853d1 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Thu, 28 Aug 2014 16:47:20 -0700 Subject: [PATCH 01/10] Extend AAE fullsync to use a pipelined exchange The current AAE fullsync exchange was based on the exchange FSM used by Riak for local cluster AAE. However, this was a bad design decision as local AAE was never designed to tolerate high latency links. In the worst case, this design can require millions of sequential roundtrip messages. This is not an issue for a fast LAN, but is impractical for WANs with 10-200ms RTTs. This commit changes the AAE fullsync exchange to use a pipelined approach. Rather than performing a synchronous request for each bucket, the exchange determines all differences for a given level of the AAE tree and then requests the data for all differing buckets upfront. Ideally, this would supported by a new streaming protocol. However to retain protocol compatibility the pipelined approach sends all the known bucket requests upfront and relies upon TCP ordering to deliver the responses in the order requested. Note: this commit relies upon changes made to riak_core and riak_kv to extend the AAE subsystem to support the new level-by-level exchange that is utilized by this new fullsync approach. --- src/riak_repl_aae_source.erl | 71 ++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index ae2f79b5..5136dedf 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -262,7 +262,9 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% allow us to pass control of the TCP socket around. This is needed so %% that the process needing to send/receive on that socket has ownership %% of it. - Remote = fun(init, _) -> + %% + %% Old, non-pipelined verison + TestR1 = fun(init, _) -> %% cause control of the socket to be given to AAE so that %% the get_bucket and key_hashes can send messages via the %% socket (with correct ownership). We'll send a 'ready' @@ -274,14 +276,54 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, ok end; (get_bucket, {L, B}) -> - send_synchronous_msg(?MSG_GET_AAE_BUCKET, {L,B,IndexN}, State); + async_get_bucket(L, B, IndexN, State), + wait_get_bucket(L, B, IndexN, State); (key_hashes, Segment) -> - send_synchronous_msg(?MSG_GET_AAE_SEGMENT, {Segment,IndexN}, State); + async_get_segment(Segment, IndexN, State), + wait_get_segment(Segment, IndexN, State); + (start_exchange_level, {_Level, _Buckets}) -> + ok; + (start_exchange_segments, _Segments) -> + ok; (final, _) -> %% give ourself control of the socket again ok = Transport:controlling_process(Socket, SourcePid) end, + %% Pipelined verison + TestR2 = fun(init, _) -> + %% cause control of the socket to be given to AAE so that + %% the get_bucket and key_hashes can send messages via the + %% socket (with correct ownership). We'll send a 'ready' + %% back here once the socket ownership is transfered and + %% we are ready to proceed with the compare. + gen_fsm:send_event(SourcePid, {'$aae_src', worker_pid, self()}), + receive + {'$aae_src', ready, SourcePid} -> + ok + end; + (get_bucket, {L, B}) -> + wait_get_bucket(L, B, IndexN, State); + (key_hashes, Segment) -> + wait_get_segment(Segment, IndexN, State); + (start_exchange_level, {Level, Buckets}) -> + _ = [async_get_bucket(Level, B, IndexN, State) || B <- Buckets], + ok; + (start_exchange_segments, Segments) -> + _ = [async_get_segment(Segment, IndexN, State) || Segment <- Segments], + ok; + (final, _) -> + %% give ourself control of the socket again + ok = Transport:controlling_process(Socket, SourcePid) + end, + + Remote = case app_helper:get_env(riak_repl, fullsync_pipeline, false) of + false -> + TestR1; + true -> + TestR2 + end, + %% Unclear if we should allow exchange to run indefinitely or enforce %% a timeout. The problem is that depending on the number of keys and %% key differences, exchange can take arbitrarily long. For now, go with @@ -314,6 +356,24 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% wait for differences from bloom_folder or to be done {next_state, compute_differences, State}. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +async_get_bucket(Level, Bucket, IndexN, State) -> + send_asynchronous_msg(?MSG_GET_AAE_BUCKET, {Level,Bucket,IndexN}, State). + +wait_get_bucket(_Level, _Bucket, _IndexN, State) -> + Reply = get_reply(State), + Reply. + +async_get_segment(Segment, IndexN, State) -> + send_asynchronous_msg(?MSG_GET_AAE_SEGMENT, {Segment,IndexN}, State). + +wait_get_segment(_Segment, _IndexN, State) -> + Reply = get_reply(State), + Reply. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + compute_differences({'$aae_src', worker_pid, WorkerPid}, #state{transport=Transport, socket=Socket} = State) -> ok = Transport:controlling_process(Socket, WorkerPid), @@ -569,7 +629,10 @@ send_synchronous_msg(MsgType, State=#state{transport=Transport, %% Async message send with tag and (binary or term data). send_asynchronous_msg(MsgType, Data, #state{transport=Transport, socket=Socket}) when is_binary(Data) -> - ok = Transport:send(Socket, <>). + ok = Transport:send(Socket, <>); +send_asynchronous_msg(MsgType, Msg, State) -> + Data = term_to_binary(Msg), + send_asynchronous_msg(MsgType, Data, State). %% send a message with type tag only, no data send_asynchronous_msg(MsgType, #state{transport=Transport, From 32331ddf0f0d90b33422867cda66d23df4a7956f Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Wed, 3 Sep 2014 13:31:06 +0200 Subject: [PATCH 02/10] Only do one backend fold for each vnode, not N Previous code created a bloom filter for each IndexN, populating it with diffs, and then did a backend fold for that vnode sending objects along the way. The code now 1. creates the bloom filter in init() 2. runs key exchange for each IndexN 3. then does a vnode fold to find objects in the bloom filter. --- src/riak_repl_aae_source.erl | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 5136dedf..7c550bab 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -44,7 +44,8 @@ diff_batch_size = 1000 :: non_neg_integer(), local_lock = false :: boolean(), owner :: pid(), - proto :: term() + proto :: term(), + bloom :: reference() %% ebloom }). %% Per state transition timeout used by certain transitions @@ -74,6 +75,9 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> lager:info("AAE fullsync source worker started for partition ~p", [Partition]), + NumKeys = 1000000, + {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), + State = #state{cluster=Cluster, client=Client, transport=Transport, @@ -82,7 +86,8 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> built=0, owner=OwnerPid, wire_ver=w1, - proto=Proto}, + proto=Proto, + bloom=Bloom }, {ok, prepare_exchange, State}. handle_event(_Event, StateName, State) -> @@ -333,8 +338,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% accumulates a list of one element that is the count of %% keys that differed. We can't prime the accumulator. It %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() - NumKeys = 10000000, - {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), + Bloom = State#state.bloom, AccFun = fun(KeyDiffs, Acc0) -> %% Gather diff keys into a bloom filter lists:foldl(fun(KeyDiff, AccIn) -> @@ -379,7 +383,10 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, ok = Transport:controlling_process(Socket, WorkerPid), WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Bloom}, State) -> + +compute_differences({'$aae_src', done, Bloom}, State=#state{ indexns=IndexNs, bloom=Bloom }) + when length(IndexNs) =< 1 -> + %% we just finished diffing the *last* IndexN, so we go to the vnode fold / bloom state %% if we have anything in our bloom filter, start sending them now. %% this will start a worker process, which will tell us it's done with @@ -387,7 +394,12 @@ compute_differences({'$aae_src', done, Bloom}, State) -> _ = finish_sending_differences(Bloom, State), %% wait for differences from bloom_folder or to be done - {next_state, send_diffs, State}. + {next_state, send_diffs, State}; + +compute_differences({'$aae_src', done, _}, State=#state{ indexns=[_|IndexNs] }) -> + %% re-start for next indexN + gen_fsm:send_event(self(), start_exchange), + {next_state, update_trees, State#state{built=0, indexns=IndexNs}}. %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt @@ -399,14 +411,9 @@ send_diffs({diff_obj, RObj}, _From, State) -> %% All indexes in this Partition are done. %% Note: recv'd from an async send event -send_diffs(diff_done, State=#state{indexns=[]}) -> +send_diffs(diff_done, State) -> gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=[]}}; -%% IndexN is done, restart for remaining -send_diffs(diff_done, State=#state{indexns=[_IndexN|IndexNs]}) -> - %% re-start for next indexN - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=IndexNs}}. + {next_state, update_trees, State#state{built=0, indexns=[]}}. %%%=================================================================== %%% Internal functions From 5bf71c28a079dadf7571743959b41705b6fdc914 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Wed, 3 Sep 2014 15:14:08 +0200 Subject: [PATCH 03/10] Use N=1 for reading values for replication --- src/riak_repl_aae_source.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 7c550bab..3918964a 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -561,7 +561,7 @@ send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> end. send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> - case Client:get(Bucket, Key, 1, ?REPL_FSM_TIMEOUT) of + case Client:get(Bucket, Key, [{r, 1}, {timeout, ?REPL_FSM_TIMEOUT}, {n_val, 1}]) of {ok, RObj} -> %% we don't actually have the vclock to compare, so just send the %% key and let the other side sort things out. From 88edced288926dbc2165a43bade7e906916e4eeb Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Wed, 3 Sep 2014 15:31:04 +0200 Subject: [PATCH 04/10] Adaptable sync strategy: get or fold w/threshold This change introduces a threshold for how to process fullsync diffs. A small number of changed KVs (currently fixed at 1000 per vnode) are read using GET; above that limit, objects are read using a backend fold. For small number og changed KVs, this avoids doing a full scan of the backend data. --- src/riak_repl_aae_source.erl | 54 +++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 3918964a..59556b09 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -45,12 +45,17 @@ local_lock = false :: boolean(), owner :: pid(), proto :: term(), - bloom :: reference() %% ebloom + bloom :: reference(), %% ebloom + diff_cnt=0 :: non_neg_integer() }). %% Per state transition timeout used by certain transitions -define(DEFAULT_ACTION_TIMEOUT, 300000). %% 5 minutes +%% the first this many differences are not put in the bloom +%% filter, but simply sent to the remote site directly. +-define(GET_OBJECT_LIMIT, 1000). + %%%=================================================================== %%% API %%%=================================================================== @@ -221,7 +226,7 @@ update_trees(start_exchange, State=#state{indexns=IndexN, owner=Owner}) when Ind update_trees(start_exchange, State=#state{tree_pid=TreePid, index=Partition, indexns=[IndexN|_IndexNs]}) -> - lager:debug("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), + lager:info("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), update_request(TreePid, {Partition, undefined}, IndexN), case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of ok -> @@ -339,22 +344,36 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% keys that differed. We can't prime the accumulator. It %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() Bloom = State#state.bloom, + Count0 = State#state.diff_cnt, AccFun = fun(KeyDiffs, Acc0) -> - %% Gather diff keys into a bloom filter - lists:foldl(fun(KeyDiff, AccIn) -> - accumulate_diff(KeyDiff, Bloom, AccIn, State) end, - Acc0, - KeyDiffs) + Count = case Acc0 of [C] when is_integer(C) -> C; _ -> 0 end, + FoldFun = + case (Count0+Count) > ?GET_OBJECT_LIMIT of + %% Gather diff keys into a bloom filter + true -> fun(KeyDiff, AccIn) -> + accumulate_diff(KeyDiff, Bloom, AccIn, State) + end; + + %% Replicate diffs directly + false -> fun(KeyDiff, AccIn) -> + replicate_diff(KeyDiff, AccIn, State) + end + end, + + lists:foldl(FoldFun, Acc0, KeyDiffs) end, %% TODO: Add stats for AAE lager:debug("Starting compare for partition ~p", [Partition]), spawn_link(fun() -> StageStart=os:timestamp(), - riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, TreePid), - lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", - [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Bloom}) + case riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, TreePid) of + [N] when is_integer(N) -> Count=N; + _ -> Count=0 + end, + lager:info("Full-sync with site ~p; fullsync difference generator for ~p/~p complete (~p differences, completed in ~p secs)", + [State#state.cluster, Partition, IndexN, Count, riak_repl_util:elapsed_secs(StageStart)]), + gen_fsm:send_event(SourcePid, {'$aae_src', done, Bloom, Count}) end), %% wait for differences from bloom_folder or to be done @@ -384,22 +403,25 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Bloom}, State=#state{ indexns=IndexNs, bloom=Bloom }) +compute_differences({'$aae_src', done, Bloom, Count}, State=#state{ indexns=IndexNs, bloom=Bloom, diff_cnt=Count0 }) when length(IndexNs) =< 1 -> %% we just finished diffing the *last* IndexN, so we go to the vnode fold / bloom state + State2 = State#state{ diff_cnt=Count0+Count }, + %% if we have anything in our bloom filter, start sending them now. %% this will start a worker process, which will tell us it's done with %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State), + _ = finish_sending_differences(Bloom, State2), %% wait for differences from bloom_folder or to be done - {next_state, send_diffs, State}; + {next_state, send_diffs, State2}; -compute_differences({'$aae_src', done, _}, State=#state{ indexns=[_|IndexNs] }) -> +compute_differences({'$aae_src', done, _, Count}, State=#state{ indexns=[_|IndexNs], diff_cnt=Count0 }) -> %% re-start for next indexN + gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=IndexNs}}. + {next_state, update_trees, State#state{built=0, indexns=IndexNs, diff_cnt=Count0+Count }}. %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt From 32316fa611300d62f1623ddece69461559c9287b Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Thu, 4 Sep 2014 03:40:06 -0700 Subject: [PATCH 05/10] Make fullsync direct send threshold configurable --- src/riak_repl_aae_source.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 59556b09..99fc5173 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -345,10 +345,11 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() Bloom = State#state.bloom, Count0 = State#state.diff_cnt, + Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), AccFun = fun(KeyDiffs, Acc0) -> Count = case Acc0 of [C] when is_integer(C) -> C; _ -> 0 end, FoldFun = - case (Count0+Count) > ?GET_OBJECT_LIMIT of + case (Count0+Count) > Limit of %% Gather diff keys into a bloom filter true -> fun(KeyDiff, AccIn) -> accumulate_diff(KeyDiff, Bloom, AccIn, State) From e22a9ebeccb037ffd7526957d56e9d35d3d085a1 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Tue, 2 Sep 2014 16:14:17 -0700 Subject: [PATCH 06/10] Refactor AAE FS source state transitions for clarity This commit slightly refactors the riak_repl_aae_source code to make state transitions more obvious. This change also eases the porting of commits from a separate feature branch that implemented a slightly different approach to improved AAE syncing. --- src/riak_repl_aae_source.erl | 63 +++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 99fc5173..a4129499 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -80,9 +80,6 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> lager:info("AAE fullsync source worker started for partition ~p", [Partition]), - NumKeys = 1000000, - {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - State = #state{cluster=Cluster, client=Client, transport=Transport, @@ -91,8 +88,7 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> built=0, owner=OwnerPid, wire_ver=w1, - proto=Proto, - bloom=Bloom }, + proto=Proto}, {ok, prepare_exchange, State}. handle_event(_Event, StateName, State) -> @@ -200,7 +196,7 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% try to get the remote lock case send_synchronous_msg(?MSG_LOCK_TREE, State) of ok -> - update_trees(start_exchange, State); + update_trees(init, State); Error -> lager:info("Remote lock tree for partition ~p failed, got ~p", [Partition, Error]), @@ -213,20 +209,35 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> %% a timely manner, the exchange will timeout. Since the trees will %% continue to finish the update even after the exchange times out, %% a future exchange should eventually make progress. +update_trees(init, State) -> + NumKeys = 10000000, + {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), + State2 = State#state{bloom=Bloom}, + update_trees(start_exchange, State2); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), send_complete(State), {stop, normal, State}; -update_trees(start_exchange, State=#state{indexns=IndexN, owner=Owner}) when IndexN == [] -> +update_trees(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), lager:info("AAE fullsync source completed partition ~p", [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), + %% TODO: Why stay in update_trees? Should we stop instead? {next_state, update_trees, State}; +update_trees(continue, State=#state{indexns=IndexNs}) -> + case IndexNs of + [_] -> + send_diffs(init, State); + [_|RestNs] -> + State2 = State#state{built=0, indexns=RestNs}, + gen_fsm:send_event(self(), start_exchange), + {next_state, update_trees, State2} + end; update_trees(start_exchange, State=#state{tree_pid=TreePid, index=Partition, indexns=[IndexN|_IndexNs]}) -> - lager:info("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), + lager:debug("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]), update_request(TreePid, {Partition, undefined}, IndexN), case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of ok -> @@ -374,7 +385,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, end, lager:info("Full-sync with site ~p; fullsync difference generator for ~p/~p complete (~p differences, completed in ~p secs)", [State#state.cluster, Partition, IndexN, Count, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Bloom, Count}) + gen_fsm:send_event(SourcePid, {'$aae_src', done, Count}) end), %% wait for differences from bloom_folder or to be done @@ -404,25 +415,11 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Bloom, Count}, State=#state{ indexns=IndexNs, bloom=Bloom, diff_cnt=Count0 }) - when length(IndexNs) =< 1 -> - %% we just finished diffing the *last* IndexN, so we go to the vnode fold / bloom state - +compute_differences({'$aae_src', done, Count}, State=#state{diff_cnt=Count0}) -> + %% Just move on to the next tree exchange since we now roll all + %% differences into a single bloom filter. State2 = State#state{ diff_cnt=Count0+Count }, - - %% if we have anything in our bloom filter, start sending them now. - %% this will start a worker process, which will tell us it's done with - %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State2), - - %% wait for differences from bloom_folder or to be done - {next_state, send_diffs, State2}; - -compute_differences({'$aae_src', done, _, Count}, State=#state{ indexns=[_|IndexNs], diff_cnt=Count0 }) -> - %% re-start for next indexN - - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=IndexNs, diff_cnt=Count0+Count }}. + update_trees(continue, State2). %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt @@ -432,11 +429,19 @@ send_diffs({diff_obj, RObj}, _From, State) -> send_missing(RObj, State), {reply, ok, send_diffs, State}. +send_diffs(init, State=#state{bloom=Bloom}) -> + %% if we have anything in our bloom filter, start sending them now. + %% this will start a worker process, which will tell us it's done with + %% diffs_done once all differences are sent. + _ = finish_sending_differences(Bloom, State), + + %% wait for differences from bloom_folder or to be done + {next_state, send_diffs, State}; + %% All indexes in this Partition are done. %% Note: recv'd from an async send event send_diffs(diff_done, State) -> - gen_fsm:send_event(self(), start_exchange), - {next_state, update_trees, State#state{built=0, indexns=[]}}. + update_trees(finish_fullsync, State). %%%=================================================================== %%% Internal functions From d1252bc694603cd9ade74d8dfba4718955e25b08 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Tue, 2 Sep 2014 16:14:22 -0700 Subject: [PATCH 07/10] Add buffer-then-send mode for AAE fullsync repair This commit merges in the separately prototyped buffer-then-send approach to directly sending key differences. The mode used is determined by the `fullsync_direct_mode` application variable, which defaults to `inline`. * inline differences selected for direct repair are sent as they are encountered during an AAE exchange. * buffered differences selected for direct repair are buffered in memory during an exchange. After the exchange, the buffer is sorted and keys are then sent in order. --- src/riak_repl_aae_source.erl | 174 ++++++++++++++++++++++------------- 1 file changed, 109 insertions(+), 65 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index a4129499..5c77ebcf 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -30,6 +30,15 @@ -type index() :: non_neg_integer(). -type index_n() :: {index(), pos_integer()}. +-record(exchange, {mode :: inline | buffered, + buffer :: ets:tid(), + bloom :: reference(), %% ebloom + limit :: non_neg_integer(), + count :: non_neg_integer() + }). + +-type exchange() :: #exchange{}. + -record(state, {cluster, client, %% riak:local_client() transport, @@ -45,8 +54,7 @@ local_lock = false :: boolean(), owner :: pid(), proto :: term(), - bloom :: reference(), %% ebloom - diff_cnt=0 :: non_neg_integer() + exchange :: exchange() }). %% Per state transition timeout used by certain transitions @@ -212,7 +220,20 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> update_trees(init, State) -> NumKeys = 10000000, {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)), - State2 = State#state{bloom=Bloom}, + Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), + Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline), + Buffer = case Mode of + inline -> + undefined; + buffered -> + ets:new(?MODULE, [public, set]) + end, + Exchange = #exchange{mode=Mode, + buffer=Buffer, + bloom=Bloom, + limit=Limit, + count=0}, + State2 = State#state{exchange=Exchange}, update_trees(start_exchange, State2); update_trees(cancel_fullsync, State) -> lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]), @@ -272,6 +293,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, socket=Socket, index=Partition, tree_pid=TreePid, + exchange=Exchange, indexns=[IndexN|_IndexNs]}) -> lager:debug("Starting fullsync key exchange with ~p for ~p/~p", [Cluster, Partition, IndexN]), @@ -354,38 +376,23 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% accumulates a list of one element that is the count of %% keys that differed. We can't prime the accumulator. It %% always starts as the empty list. KeyDiffs is a list of hashtree::keydiff() - Bloom = State#state.bloom, - Count0 = State#state.diff_cnt, - Limit = app_helper:get_env(riak_repl, fullsync_direct, ?GET_OBJECT_LIMIT), - AccFun = fun(KeyDiffs, Acc0) -> - Count = case Acc0 of [C] when is_integer(C) -> C; _ -> 0 end, - FoldFun = - case (Count0+Count) > Limit of - %% Gather diff keys into a bloom filter - true -> fun(KeyDiff, AccIn) -> - accumulate_diff(KeyDiff, Bloom, AccIn, State) - end; - - %% Replicate diffs directly - false -> fun(KeyDiff, AccIn) -> - replicate_diff(KeyDiff, AccIn, State) - end - end, - - lists:foldl(FoldFun, Acc0, KeyDiffs) + AccFun = fun(KeyDiffs, Exchange0) -> + %% Gather diff keys into a bloom filter + lists:foldl(fun(KeyDiff, ExchangeIn) -> + accumulate_diff(KeyDiff, ExchangeIn, State) + end, + Exchange0, + KeyDiffs) end, %% TODO: Add stats for AAE lager:debug("Starting compare for partition ~p", [Partition]), spawn_link(fun() -> StageStart=os:timestamp(), - case riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, TreePid) of - [N] when is_integer(N) -> Count=N; - _ -> Count=0 - end, - lager:info("Full-sync with site ~p; fullsync difference generator for ~p/~p complete (~p differences, completed in ~p secs)", - [State#state.cluster, Partition, IndexN, Count, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Count}) + Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), + lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", + [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), + gen_fsm:send_event(SourcePid, {'$aae_src', Exchange2, done}) end), %% wait for differences from bloom_folder or to be done @@ -415,12 +422,28 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', done, Count}, State=#state{diff_cnt=Count0}) -> - %% Just move on to the next tree exchange since we now roll all - %% differences into a single bloom filter. - State2 = State#state{ diff_cnt=Count0+Count }, +compute_differences({'$aae_src', Exchange, done}, State) -> + %% Just move on to the next tree exchange since we accumulate + %% differences across all trees. + State2 = State#state{exchange=Exchange}, update_trees(continue, State2). +maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit}, + #state{index=Partition}) -> + %% Inline sends already occured inline + Sent = erlang:min(Count, Limit), + lager:info("Directly sent ~b differences inline for partition ~p", + [Sent, Partition]), + ok; +maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) -> + Keys = [{Bucket, Key} || {_, {Bucket, Key}} <- ets:tab2list(Buffer)], + true = ets:delete(Buffer), + Sorted = lists:sort(Keys), + Count = length(Sorted), + lager:info("Directly sending ~p differences for partition ~p", [Count, Partition]), + _ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted], + ok. + %% state send_diffs is where we wait for diff_obj messages from the bloom folder %% and send them to the sink for each diff_obj. We eventually finish upon receipt %% of the diff_done event. Note: recv'd from a sync send event. @@ -429,11 +452,12 @@ send_diffs({diff_obj, RObj}, _From, State) -> send_missing(RObj, State), {reply, ok, send_diffs, State}. -send_diffs(init, State=#state{bloom=Bloom}) -> +send_diffs(init, State=#state{exchange=Exchange}) -> %% if we have anything in our bloom filter, start sending them now. %% this will start a worker process, which will tell us it's done with %% diffs_done once all differences are sent. - _ = finish_sending_differences(Bloom, State), + _ = maybe_send_direct(Exchange, State), + _ = finish_sending_differences(Exchange#exchange.bloom, State), %% wait for differences from bloom_folder or to be done {next_state, send_diffs, State}; @@ -538,36 +562,56 @@ replicate_diff(KeyDiff, Acc, State=#state{index=Partition}) -> Acc end. -accumulate_diff(KeyDiff, Bloom, [], State) -> - accumulate_diff(KeyDiff, Bloom, [0], State); -accumulate_diff(KeyDiff, Bloom, [Count], #state{index=Partition}) -> - NumObjects = - case KeyDiff of - {remote_missing, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {different, Bin} -> - %% send object and related objects to remote - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p different: ~p:~p", - [Partition, Bucket, Key]), - ebloom:insert(Bloom, <>), - 1; - {missing, Bin} -> - %% remote has a key we don't have. Ignore it. - {Bucket,Key} = binary_to_term(Bin), - lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", - [Partition, Bucket, Key]), - 0; - Other -> - lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), - 0 - end, - [Count+NumObjects]. +accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) -> + case KeyDiff of + {remote_missing, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {different, Bin} -> + %% send object and related objects to remote + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p different: ~p:~p", + [Partition, Bucket, Key]), + maybe_accumulate_key(Bucket, Key, Exchange, State); + {missing, Bin} -> + %% remote has a key we don't have. Ignore it. + {Bucket,Key} = binary_to_term(Bin), + lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)", + [Partition, Bucket, Key]), + Exchange; + Other -> + lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]), + Exchange + end. + +maybe_accumulate_key(Bucket, Key, + Exchange=#exchange{mode=Mode, + bloom=Bloom, + count=Count, + limit=Limit}, + State) -> + if Count < Limit -> + %% Below threshold, handle directly + Exchange2 = handle_direct(Mode, Bucket, Key, Exchange, State), + Exchange2#exchange{count=Count+1}; + true -> + %% Past threshold, add to bloom filter for future bloom fold + ebloom:insert(Bloom, <>), + Exchange#exchange{count=Count+1} + end. + +handle_direct(inline, Bucket, Key, Exchange, State) -> + %% Send key inline + _ = send_missing(Bucket, Key, State), + Exchange; +handle_direct(buffered, Bucket, Key, Exchange, _State) -> + %% Enqueue in "direct send" buffer that will be sorted/sent later + #exchange{buffer=Buffer, count=Count} = Exchange, + ets:insert(Buffer, {Count, {Bucket, Key}}), + Exchange. send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> %% we don't actually have the vclock to compare, so just send the From ff4039b37f5732904e4bd840449bc5850a1fe0ea Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Wed, 17 Sep 2014 16:31:27 -0700 Subject: [PATCH 08/10] Fix deadlock between AAE fullsync sink/source A deadlock in AAE fullsync was identified during testing. The deadlock occurs when both the AAE fullsync source and sink processes block on TCP send due to TCP backpressure. Since both processes are blocked, neither can perform a receive to unblock the other. This commit addresses the issue by making the AAE sink use a helper process for sending, therefore ensuring the sink never blocks on a send. The sink will therefore always get around to receiving data at some point, unblocking the source if needed. NOTE: the SSL transport wraps the underlying socket in a gen_server, and all socket functions route through this server. As such, the SSL transport is still vulnerable to this deadlock. Fixing this requires either changing the AAE protocol or patching the OTP SSL implementation. One of the two is planned future work. --- src/riak_repl_aae_sink.erl | 44 ++++++++++++++++++++++++++++++++---- src/riak_repl_aae_source.erl | 4 ++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/riak_repl_aae_sink.erl b/src/riak_repl_aae_sink.erl index a573e07f..ede387d8 100644 --- a/src/riak_repl_aae_sink.erl +++ b/src/riak_repl_aae_sink.erl @@ -15,6 +15,7 @@ %% API -export([start_link/4, init_sync/1]). +-export([sender_init/2, sender_loop/1]). -record(state, { clustername, @@ -22,6 +23,7 @@ transport, tree_pid :: pid(), %% pid of the AAE tree partition, + sender :: pid(), owner :: pid() %% our fssource owner }). @@ -51,7 +53,8 @@ handle_call(init_sync, _From, State=#state{transport=Transport, socket=Socket}) {nodelay, true}, {header, 1}], ok = Transport:setopts(Socket, TcpOptions), - {reply, ok, State}; + Sender = spawn_sender(Transport, Socket), + {reply, ok, State#state{sender=Sender}}; handle_call(status, _From, State) -> Reply = [{partition_syncing, State#state.partition}], @@ -73,7 +76,7 @@ handle_cast(_Msg, State) -> handle_info({Proto, _Socket, Data}, State=#state{transport=Transport, socket=Socket}) when Proto==tcp; Proto==ssl -> TcpOptions = [{active, once}], %% reset to receive next tcp message - ok = Transport:setopts(Socket, TcpOptions), + Transport:setopts(Socket, TcpOptions), case Data of [MsgType] -> process_msg(MsgType, State); @@ -164,8 +167,41 @@ process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) -> %% Send a response back to the aae_source worker -send_reply(Msg, State=#state{socket=Socket, transport=Transport}) -> +send_reply(Msg, State=#state{sender=Sender}) -> Data = term_to_binary(Msg), - ok = Transport:send(Socket, <>), + Sender ! <>, {noreply, State}. +%%%=================================================================== +%%% TCP send helper +%%%=================================================================== + +%% Sending is performed in a separate process to ensure that the sink +%% never blocks and can therefore always receive messages + reset the +%% active once flag. If a separate process is not used, it is possible +%% for both the source and sink to deadlock on a TCP send that each +%% stall waiting for the remote side to receive. +%% +%% There is currently no backpressure between the sink and this helper +%% process. Adding backpressure runs the risk of once again hitting +%% the aforementioned deadlock scenario. +%% +%% The correct approach is to add backpressure to the actual fullsync +%% protocol itself. This remains as future work. +%% +%% NOTE: the SSL transport wraps the underlying socket in a gen_server, +%% and all socket functions route through this server. As such, splitting +%% sending and receiving between two processes does not help. The SSL +%% transport is thus known to *not* be safe and *can* deadlock. + +spawn_sender(Transport, Socket) -> + spawn_link(?MODULE, sender_init, [Transport, Socket]). + +sender_init(Transport, Socket) -> + sender_loop({Transport, Socket}). + +sender_loop(State={Transport, Socket}) -> + receive Msg -> + ok = Transport:send(Socket, Msg) + end, + ?MODULE:sender_loop(State). diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 5c77ebcf..aeedfca2 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -392,7 +392,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', Exchange2, done}) + gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) end), %% wait for differences from bloom_folder or to be done @@ -422,7 +422,7 @@ compute_differences({'$aae_src', worker_pid, WorkerPid}, WorkerPid ! {'$aae_src', ready, self()}, {next_state, compute_differences, State}; -compute_differences({'$aae_src', Exchange, done}, State) -> +compute_differences({'$aae_src', done, Exchange}, State) -> %% Just move on to the next tree exchange since we accumulate %% differences across all trees. State2 = State#state{exchange=Exchange}, From 4109e758af42f1b30863e22be06d4c39c1dbbcdd Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Wed, 17 Sep 2014 22:49:17 -0700 Subject: [PATCH 09/10] Remove non-pipelined AAE fullsync mode --- src/riak_repl_aae_source.erl | 38 +----------------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index aeedfca2..edeb3fb0 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -305,36 +305,7 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, %% allow us to pass control of the TCP socket around. This is needed so %% that the process needing to send/receive on that socket has ownership %% of it. - %% - %% Old, non-pipelined verison - TestR1 = fun(init, _) -> - %% cause control of the socket to be given to AAE so that - %% the get_bucket and key_hashes can send messages via the - %% socket (with correct ownership). We'll send a 'ready' - %% back here once the socket ownership is transfered and - %% we are ready to proceed with the compare. - gen_fsm:send_event(SourcePid, {'$aae_src', worker_pid, self()}), - receive - {'$aae_src', ready, SourcePid} -> - ok - end; - (get_bucket, {L, B}) -> - async_get_bucket(L, B, IndexN, State), - wait_get_bucket(L, B, IndexN, State); - (key_hashes, Segment) -> - async_get_segment(Segment, IndexN, State), - wait_get_segment(Segment, IndexN, State); - (start_exchange_level, {_Level, _Buckets}) -> - ok; - (start_exchange_segments, _Segments) -> - ok; - (final, _) -> - %% give ourself control of the socket again - ok = Transport:controlling_process(Socket, SourcePid) - end, - - %% Pipelined verison - TestR2 = fun(init, _) -> + Remote = fun(init, _) -> %% cause control of the socket to be given to AAE so that %% the get_bucket and key_hashes can send messages via the %% socket (with correct ownership). We'll send a 'ready' @@ -360,13 +331,6 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, ok = Transport:controlling_process(Socket, SourcePid) end, - Remote = case app_helper:get_env(riak_repl, fullsync_pipeline, false) of - false -> - TestR1; - true -> - TestR2 - end, - %% Unclear if we should allow exchange to run indefinitely or enforce %% a timeout. The problem is that depending on the number of keys and %% key differences, exchange can take arbitrarily long. For now, go with From 6032c432ee15306164f3f32767c1b2fa2b134463 Mon Sep 17 00:00:00 2001 From: Joseph Blomstedt Date: Fri, 19 Sep 2014 17:06:50 -0700 Subject: [PATCH 10/10] Change AAE FS source to use direct vnode gets When performing direct sends, the AAE fullsync source retrieves individual objects in order to send them. Previously, this was accomplished using normal riak_client gets using the n_val=1 option. However, there are no guarantees about which replica the object would come from using such an approach. This commit changes the AAE fullsync source to instead retrieve objects directly from the vnode performing the fullsync. This ensures that the object sent aligns with the AAE hash used to determine differences. This approach may also have better performance. --- src/riak_repl_aae_source.erl | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index edeb3fb0..b2c280f3 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -596,8 +596,33 @@ send_missing(RObj, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> 1 + length(Objects) end. +%% Copied from riak_kv_vnode:local_get to extend with timeout, +%% might port back to riak_kv_vnode.erl in the future. +%% +%% Note: responses that timeout can result in future late +%% messages arriving from the vnode. This is currently safe +%% because of the catch-all handle_info that will ignore these +%% messages. But, something to keep in mind in the future. +kv_local_get(Index, BKey, Timeout) -> + Ref = make_ref(), + ReqId = erlang:phash2(erlang:now()), + Sender = {raw, Ref, self()}, + riak_kv_vnode:get({Index,node()}, BKey, ReqId, Sender), + receive + {Ref, {r, Result, Index, ReqId}} -> + Result; + {Ref, Reply} -> + {error, Reply} + after Timeout -> + {error, timeout} + end. + +%% Get the K/V directly from the local vnode +local_get(Bucket, Key, #state{index=Index}) -> + kv_local_get(Index, {Bucket, Key}, ?REPL_FSM_TIMEOUT). + send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto}) -> - case Client:get(Bucket, Key, [{r, 1}, {timeout, ?REPL_FSM_TIMEOUT}, {n_val, 1}]) of + case local_get(Bucket, Key, State) of {ok, RObj} -> %% we don't actually have the vclock to compare, so just send the %% key and let the other side sort things out. @@ -620,7 +645,8 @@ send_missing(Bucket, Key, State=#state{client=Client, wire_ver=Ver, proto=Proto} %% can't find the key! lager:warning("not_found returned for fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0; - _ -> + {error, timeout} -> + lager:warning("timeout during fullsync client get on Bucket: ~p Key:~p", [Bucket,Key]), 0 end.