From 1114a70b0a790102f507bdd2c54eedf4f4b4936a Mon Sep 17 00:00:00 2001 From: Micah Warren Date: Mon, 29 Sep 2014 15:13:26 -0500 Subject: [PATCH 1/4] Implement soft_exit, primarily for aae_fullsyn. Problem: transient failures of aae, such as trees not yet built or locks not being aquired, would cause an aae fullsync process to exit abnormally. This could happen several times in a row, creating log spam. Resolution: the concept of soft_exit. A soft_exit is a message sent from a soon to be exiting process to a soft_linked process. The exiting process would then exit normally, while any soft_linked processes could handle the soft_exit message in a similar fashion as an exit message. This would indicate an exit reason that should be handled, but not bad enough to have the system logger know about it. The soft_exit message sent from the aae worker to the fscoordinator is as simple as `{soft_exit, pid(), term()}'. The current implementation is not generic. There can only one soft_link to the aae, and there's no general mechanism to use soft_link's or soft_exits elsewhere in the code base. Sorry. Another change rolled into this is consistent use of a #partition_info record in the fscoordinator, and error tracking the fscoordinator's state. By swapping to useing a single data structure in the partition queue, whereis waiting list, and purgatory queues it makes it easier to understand the fscordinator (as there is less code modify structures). This is a forward port of the fix done for 1.4. Conflicts favor existing code where it does not directly effect the fix. Conflicts: Makefile rebar.config src/riak_repl2_fssource.erl src/riak_repl2_rtq_proxy.erl src/riak_repl_aae_source.erl test/riak_core_cluster_mgr_tests.erl --- include/riak_repl.hrl | 3 + src/riak_repl2_fscoordinator.erl | 393 +++++++++++++++++++------------ src/riak_repl2_fssource.erl | 84 ++++++- src/riak_repl_aae_source.erl | 9 +- src/riak_repl_wm_stats.erl | 1 - 5 files changed, 319 insertions(+), 171 deletions(-) diff --git a/include/riak_repl.hrl b/include/riak_repl.hrl index 2dd83a58..fae6a6e7 100644 --- a/include/riak_repl.hrl +++ b/include/riak_repl.hrl @@ -28,6 +28,9 @@ -define(DEFAULT_SOURCE_RETRIES, infinity). %% How many times we should retry when failing a reservation -define(DEFAULT_RESERVE_RETRIES, 0). +%% How many times during a fullsync we should retry a partion that has sent +%% a 'soft_exit' message to the coordinator +-define(DEFAULT_SOURCE_SOFT_RETRIES, infinity). %% 20 seconds. sources should claim within 5 seconds, but give them a little more time -define(RESERVATION_TIMEOUT, (20 * 1000)). -define(DEFAULT_MAX_FS_BUSIES_TOLERATED, 10). diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index a2b60f75..acf4fddb 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -61,12 +61,16 @@ partition_queue = queue:new(), retries = dict:new(), reserve_retries = dict:new(), + soft_retries = dict:new(), whereis_waiting = [], busy_nodes = sets:new(), running_sources = [], + purgatory = queue:new(), + dropped = [], successful_exits = 0, error_exits = 0, retry_exits = 0, + soft_retry_exits = 0, pending_fullsync = false, dirty_nodes = ordsets:new(), % these nodes should run fullsync dirty_nodes_during_fs = ordsets:new(), % these nodes reported realtime errors @@ -77,6 +81,13 @@ stat_cache = #stat_cache{} }). +-record(partition_info, { + index, + node, + running_source, + whereis_tref +}). + %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ @@ -253,10 +264,12 @@ handle_call(status, _From, State = #state{socket=Socket}) -> {cluster, State#state.other_cluster}, {queued, queue:len(State#state.partition_queue)}, {in_progress, length(State#state.running_sources)}, + {waiting_for_retry, queue:len(State#state.purgatory)}, {starting, length(State#state.whereis_waiting)}, {successful_exits, State#state.successful_exits}, {error_exits, State#state.error_exits}, {retry_exits, State#state.retry_exits}, + {soft_retry_exits, State#state.soft_retry_exits}, {busy_nodes, sets:size(State#state.busy_nodes)}, {last_running_refresh, StatFreshness}, {running_stats, SourceStats}, @@ -321,7 +334,7 @@ handle_cast({connected, Socket, Transport, _Endpoint, _Proto}, State) -> handle_cast({connect_failed, _From, Why}, State) -> lager:warning("Fullsync remote connection to ~p failed due to ~p, retrying", [State#state.other_cluster, Why]), - {stop, normal, State}; + {stop, connect_failed, State}; handle_cast(start_fullsync, #state{socket=undefined} = State) -> %% not connected yet... @@ -345,6 +358,9 @@ handle_cast(start_fullsync, State) -> partition_queue = queue:from_list(Partitions), retries = dict:new(), reserve_retries = dict:new(), + purgatory = queue:new(), + soft_retries = dict:new(), + dropped = [], successful_exits = 0, error_exits = 0, retry_exits = 0, @@ -356,18 +372,20 @@ handle_cast(start_fullsync, State) -> handle_cast(stop_fullsync, State) -> % exit all running, cancel all timers, and reset the state. - _ = [erlang:cancel_timer(Tref) || {_, {_, Tref}} <- State#state.whereis_waiting], - _ = [begin + [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref} <- State#state.whereis_waiting], + [begin unlink(Pid), riak_repl2_fssource:stop_fullsync(Pid), riak_repl2_fssource_sup:disable(node(Pid), Part) - end || {Pid, {Part, _PartN}} <- State#state.running_sources], + end || #partition_info{index = Part, running_source = Pid} <- State#state.running_sources], State2 = State#state{ largest_n = undefined, owners = [], partition_queue = queue:new(), retries = dict:new(), reserve_retries = dict:new(), + purgatory = queue:new(), + dropped = [], whereis_waiting = [], running_sources = [] }, @@ -380,17 +398,16 @@ handle_cast(_Msg, State) -> %% @hidden handle_info({'EXIT', Pid, Cause}, #state{socket=Socket, transport=Transport}=State) when Cause =:= normal; Cause =:= shutdown -> - lager:debug("Fssource ~p exited normally", [Pid]), - PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources), + lager:debug("fssource ~p exited normally", [Pid]), + PartitionEntry = lists:keytake(Pid, #partition_info.running_source, State#state.running_sources), case PartitionEntry of false -> % late exit or otherwise non-existant {noreply, State}; - {value, {Pid, {Index, _, _}=Partition}, Running} -> - + {value, Partition, Running} -> + #partition_info{index = Index, node = Node} = Partition, % likely a slot on the remote node opened up, so re-enable that % remote node for whereis requests. - {_, _, Node} = Partition, NewBusies = sets:del_element(Node, State#state.busy_nodes), % ensure we unreserve the partition on the remote node @@ -406,68 +423,22 @@ handle_info({'EXIT', Pid, Cause}, maybe_complete_fullsync(Running, State2) end; -handle_info({'EXIT', Pid, Cause}, - #state{socket=Socket, transport=Transport}=State) -> - lager:warning("Fssource ~p exited abnormally: ~p", [Pid, Cause]), - PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources), - case PartitionEntry of - false -> - % late exit - {noreply, State}; - {value, {Pid, {Index, _, _}=Partition}, Running} -> +handle_info({soft_exit, Pid, Cause}, State) -> + lager:info("fssource ~p soft exit with reason ~p", [Pid, Cause]), + handle_abnormal_exit(soft_exit, Pid, Cause, State); - % even a bad exit opens a slot on the remote node - {_, _, Node} = Partition, - NewBusies = sets:del_element(Node, State#state.busy_nodes), - - % ensure we unreserve the partition on the remote node - % instead of waiting for a timeout. - Transport:send(Socket, term_to_binary({unreserve, Index})), - - % stats - #state{partition_queue = PQueue, retries = Retries0} = State, - - RetryLimit = app_helper:get_env(riak_repl, max_fssource_retries, - ?DEFAULT_SOURCE_RETRIES), - Retries = dict:update_counter(Partition, 1, Retries0), - - case dict:fetch(Partition, Retries) of - N when N > RetryLimit, is_integer(RetryLimit) -> - lager:warning("Fullsync dropping partition: ~p, ~p" - " failed retries", - [Partition, RetryLimit]), - ErrorExits = State#state.error_exits + 1, - State2 = State#state{busy_nodes = NewBusies, - retries = Retries, - running_sources = Running, - error_exits = ErrorExits}, - maybe_complete_fullsync(Running, State2); - _ -> %% have not run out of retries yet - % reset for retry later - lager:debug("Fssource rescheduling partition: ~p", - [Partition]), - PQueue2 = queue:in(Partition, PQueue), - RetryExits = State#state.retry_exits + 1, - State2 = State#state{partition_queue = PQueue2, - retries = Retries, - busy_nodes = NewBusies, - running_sources = Running, - retry_exits = RetryExits}, - State3 = start_up_reqs(State2), - {noreply, State3} - end - end; +handle_info({'EXIT', Pid, Cause}, State) -> + lager:info("fssource ~p exited abnormally: ~p", [Pid, Cause]), + handle_abnormal_exit('EXIT', Pid, Cause, State); handle_info({Partition, whereis_timeout}, State) -> #state{whereis_waiting = Waiting} = State, - case proplists:get_value(Partition, Waiting) of - undefined -> + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> % late timeout. {noreply, State}; - {N, NodeData, _Tref} -> - Waiting2 = proplists:delete(Partition, Waiting), - Partition1 = {Partition, N, NodeData}, - Q = queue:in(Partition1, State#state.partition_queue), + {value, PartitionInfo, Waiting2} -> + Q = queue:in(PartitionInfo#partition_info{whereis_tref = undefined}, State#state.partition_queue), State2 = State#state{whereis_waiting = Waiting2, partition_queue = Q}, State3 = start_up_reqs(State2), {noreply, State3} @@ -523,6 +494,80 @@ handle_info({'DOWN', Mon, process, Pid, Why}, #state{stat_cache = #stat_cache{wo handle_info(_Info, State) -> {noreply, State}. +handle_abnormal_exit(ExitType, Pid, Cause, State) -> + PartitionEntry = lists:keytake(Pid, #partition_info.running_source, State#state.running_sources), + handle_abnormal_exit(ExitType, Pid, Cause, PartitionEntry, State). + +handle_abnormal_exit(_ExtiType, _Pid, _Cause, false, State) -> + % late exit + {noreply, State}; + +handle_abnormal_exit(ExitType, Pid, _Cause, {value, PartitionWithSource, Running}, State) -> + + Partition = PartitionWithSource#partition_info{running_source = undefined}, + + #partition_info{index = Index, node = Node} = Partition, + #state{socket = Socket, transport = Transport} = State, + % even a bad exit opens a slot on the remote node + NewBusies = sets:del_element(Node, State#state.busy_nodes), + + % ensure we unreserve the partition on the remote node + % instead of waiting for a timeout. + Transport:send(Socket, term_to_binary({unreserve, Index})), + + % stats + #state{partition_queue = PQueue} = State, + + State2 = State#state{busy_nodes = NewBusies, running_sources = Running}, + {ErrorCount, State3} = increment_error_dict(Partition, ExitType, State2), + + case ExitType of + soft_exit -> + lager:debug("putting partition ~p in purgatory due to soft exit of ~p", [Index, Pid]), + _ = flush_exit_message(Pid), + State4 = start_up_reqs(State3), + SoftRetryLimit = app_helper:get_env(riak_repl, max_fssource_soft_retries, ?DEFAULT_SOURCE_SOFT_RETRIES), + SoftRetryCount = State4#state.soft_retry_exits + 1, + if + SoftRetryLimit =:= infinity -> + Purgatory = queue:in(Partition, State4#state.purgatory), + {noreply, State4#state{purgatory = Purgatory, soft_retry_exits = SoftRetryCount}}; + + SoftRetryLimit < ErrorCount -> + lager:info("Discaring partition ~p since it has reached the soft exit retry limit of ~p", [Partition#partition_info.index, SoftRetryLimit]), + ErrorExits1 = State4#state.error_exits + 1, + Dropped = [Partition#partition_info.index | State4#state.dropped], + {noreply, State4#state{error_exits = ErrorExits1, dropped = Dropped}}; + true -> + Purgatory = queue:in(Partition, State4#state.purgatory), + {noreply, State4#state{purgatory = Purgatory, soft_retry_exits = SoftRetryCount}} + end; + + 'EXIT' -> + lager:debug("Incrementing retries for partition ~p due to error exit of ~p", [Index, Pid]), + RetryLimit = app_helper:get_env(riak_repl, max_fssource_retries, + ?DEFAULT_SOURCE_RETRIES), + + if + ErrorCount > RetryLimit -> + lager:warning("fssource dropping partition: ~p, ~p failed" + "retries", [Partition, RetryLimit]), + ErrorExits = State#state.error_exits + 1, + State4 = State3#state{ error_exits = ErrorExits}, + Dropped = [Partition#partition_info.index | State4#state.dropped], + maybe_complete_fullsync(Running, State4#state{dropped = Dropped}); + true -> %% have not run out of retries yet + % reset for retry later + lager:info("fssource rescheduling partition: ~p", + [Partition]), + PQueue2 = queue:in(Partition, PQueue), + RetryExits = State3#state.retry_exits + 1, + State4 = State3#state{partition_queue = PQueue2, + retry_exits = RetryExits}, + State5 = start_up_reqs(State4), + {noreply, State5} + end + end. %% @hidden terminate(_Reason, _State) -> @@ -541,109 +586,91 @@ code_change(_OldVsn, State, _Extra) -> % we stash on our side what nodes gave a busy reply so we don't send too many % pointless whereis requests. handle_socket_msg({location, Partition, {Node, Ip, Port}}, #state{whereis_waiting = Waiting} = State) -> - case proplists:get_value(Partition, Waiting) of - undefined -> + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> State; - {N, _OldNode, Tref} -> - _ = erlang:cancel_timer(Tref), - Waiting2 = proplists:delete(Partition, Waiting), + {value, PartitionInfo, Waiting2} -> + Tref = PartitionInfo#partition_info.whereis_tref, + erlang:cancel_timer(Tref), % we don't know for sure it's no longer busy until we get a busy reply NewBusies = sets:del_element(Node, State#state.busy_nodes), State2 = State#state{whereis_waiting = Waiting2, busy_nodes = NewBusies}, - Partition2 = {Partition, N, Node}, + Partition2 = PartitionInfo#partition_info{node = Node, whereis_tref = undefined}, State3 = start_fssource(Partition2, Ip, Port, State2), start_up_reqs(State3) end; handle_socket_msg({location_busy, Partition}, #state{whereis_waiting = Waiting} = State) -> - lager:debug("Location_busy, partition = ~p", [Partition]), - case proplists:get_value(Partition, Waiting) of - undefined -> + lager:debug("anya location_busy, partition = ~p", [Partition]), + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> State; - {N, OldNode, Tref} -> - lager:info("Partition ~p is too busy on cluster ~p at node ~p", - [Partition, State#state.other_cluster, OldNode]), - _ = erlang:cancel_timer(Tref), - Waiting2 = proplists:delete(Partition, Waiting), + {value, PartitionInfo, Waiting2} -> + lager:info("anya Partition ~p is too busy on cluster ~p at node ~p", + [Partition, State#state.other_cluster, PartitionInfo#partition_info.node]), + Tref = PartitionInfo#partition_info.whereis_tref, + erlang:cancel_timer(Tref), State2 = State#state{whereis_waiting = Waiting2}, - Partition2 = {Partition, N, OldNode}, + Partition2 = PartitionInfo#partition_info{whereis_tref = undefined}, PQueue = State2#state.partition_queue, PQueue2 = queue:in(Partition2, PQueue), - NewBusies = sets:add_element(OldNode, State#state.busy_nodes), + NewBusies = sets:add_element(Partition2#partition_info.node, State#state.busy_nodes), State3 = State2#state{partition_queue = PQueue2, busy_nodes = NewBusies}, start_up_reqs(State3) end; handle_socket_msg({location_busy, Partition, Node}, #state{whereis_waiting = Waiting} = State) -> - case proplists:get_value(Partition, Waiting) of - undefined -> + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> State; - {N, _OldNode, Tref} -> + {value, PartitionInfo, Waiting2} -> lager:info("Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, Node]), - _ = erlang:cancel_timer(Tref), + Tref = PartitionInfo#partition_info.whereis_tref, + erlang:cancel_timer(Tref), - Waiting2 = proplists:delete(Partition, Waiting), State2 = State#state{whereis_waiting = Waiting2}, - Partition2 = {Partition, N, Node}, + Partition2 = PartitionInfo#partition_info{node = Node}, PQueue = State2#state.partition_queue, PQueue2 = queue:in(Partition2, PQueue), NewBusies = sets:add_element(Node, State#state.busy_nodes), State3 = State2#state{partition_queue = PQueue2, busy_nodes = NewBusies}, start_up_reqs(State3) end; -handle_socket_msg({location_down, Partition}, - #state{whereis_waiting=Waiting0} = State) -> - case proplists:get_value(Partition, Waiting0) of - undefined -> + +handle_socket_msg({location_down, Partition}, #state{whereis_waiting=Waiting} = State) -> + lager:warning("anya location_down, partition = ~p", [Partition]), + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> State; - {N, OldNode, Tref} -> - handle_location_down({Partition, N, OldNode, Tref}, State) + {value, PartitionInfo, Waiting2} -> + lager:info("Partition ~p is unavailable on cluster ~p", + [Partition, State#state.other_cluster]), + Tref = PartitionInfo#partition_info.whereis_tref, + erlang:cancel_timer(Tref), + Dropped = [Partition | State#state.dropped], + State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped}, + start_up_reqs(State2) end; -handle_socket_msg({location_down, Partition, Node}, - #state{whereis_waiting=Waiting0} = State) -> - case proplists:get_value(Partition, Waiting0) of - undefined -> +handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Waiting} = State) -> + case lists:keytake(Partition, #partition_info.index, Waiting) of + false -> State; - {N, _OldNode, Tref} -> - handle_location_down({Partition, N, Node, Tref}, State) - end. - -handle_location_down({Partition, N, Node, Tref}, - #state{reserve_retries=Retries0, - partition_queue=PQueue0, - whereis_waiting=Waiting0} = State) -> - lager:info("Partition ~p is unavailable on cluster ~p", - [Partition, State#state.other_cluster]), - - RetryLimit = app_helper:get_env(riak_repl, - max_reserve_retries, - ?DEFAULT_RESERVE_RETRIES), - - Retries = dict:update_counter(Partition, 1, Retries0), - - _ = erlang:cancel_timer(Tref), - - case dict:fetch(Partition, Retries) of - X when X > RetryLimit, is_integer(RetryLimit) -> - lager:warning("Fullsync dropping partition: ~p, ~p location_down failed retries", - [Partition, RetryLimit]), - Waiting = proplists:delete(Partition, Waiting0), - ErrorExits = State#state.error_exits + 1, - State2 = State#state{whereis_waiting = Waiting, - error_exits = ErrorExits, - reserve_retries = Retries}, - start_up_reqs(State2); - _ -> - lager:warning("Fssource rescheduling partition after location_down: ~p ~p < ~p", - [Partition, N, RetryLimit]), - Waiting = proplists:delete(Partition, Waiting0), - Partition2 = {Partition, N, Node}, - PQueue = queue:in(Partition2, PQueue0), - RetryExits = State#state.retry_exits + 1, - State2 = State#state{whereis_waiting=Waiting, - partition_queue = PQueue, - reserve_retries = Retries, - retry_exits = RetryExits}, - start_up_reqs(State2) + {value, PartitionInfo, Waiting2} -> + Tref = PartitionInfo#partition_info.whereis_tref, + erlang:cancel_timer(Tref), + RetryLimit = app_helper:get_env(riak_repl, max_reserve_retries, ?DEFAULT_RESERVE_RETRIES), + lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), + State2 = State#state{whereis_waiting = Waiting2}, + {RetriedCount, State3} = increment_error_dict(PartitionInfo, State#state.reserve_retries, State2), + State4 = case RetriedCount of + N when N > RetryLimit, is_integer(N) -> + lager:warning("Fullsync dropping partition ~p, ~p location_down failed retries", [PartitionInfo#partition_info.index, RetryLimit]), + Dropped = [Partition | State#state.dropped], + State3#state{dropped = Dropped}; + _ -> + PQueue = queue:in(PartitionInfo, State3#state.partition_queue), + State3#state{partition_queue = PQueue} + end, + start_up_reqs(State4) end. % try our best to reach maximum capacity by sending as many whereis requests @@ -654,7 +681,17 @@ start_up_reqs(State) -> Running = length(State#state.running_sources), Waiting = length(State#state.whereis_waiting), StartupCount = Max - Running - Waiting, - start_up_reqs(State, StartupCount). + State2 = maybe_pop_from_purgatory(State), + start_up_reqs(State2, StartupCount). + +maybe_pop_from_purgatory(State) -> + case queue:out(State#state.purgatory) of + {empty, _} -> + State; + {{value, Partition}, NewPurgatory} -> + PartitionQ2 = queue:in(Partition, State#state.partition_queue), + State#state{purgatory = NewPurgatory, partition_queue = PartitionQ2} + end. start_up_reqs(State, N) when N < 1 -> State; @@ -693,14 +730,16 @@ send_next_whereis_req(State) -> % one of those to finish and try again {defer, State#state{partition_queue = Queue}}; - {Pval, N, RemoteNode} = P -> + P when is_record(P, partition_info) -> + #partition_info{index = Pval} = P, #state{transport = Transport, socket = Socket, whereis_waiting = Waiting} = State, Tref = erlang:send_after(?WAITING_TIMEOUT, self(), {Pval, whereis_timeout}), - Waiting2 = [{Pval, {N, RemoteNode, Tref}} | Waiting], + PartitionInfo2 = P#partition_info{whereis_tref = Tref}, + Waiting2 = [PartitionInfo2 | Waiting], {ok, {PeerIP, PeerPort}} = Transport:peername(Socket), lager:debug("Sending whereis request for partition ~p", [P]), Transport:send(Socket, - term_to_binary({whereis, element(1, P), PeerIP, PeerPort})), + term_to_binary({whereis, Pval, PeerIP, PeerPort})), {ok, State#state{partition_queue = Queue, whereis_waiting = Waiting2}} end @@ -740,13 +779,14 @@ below_max_sources(State) -> Max = app_helper:get_env(riak_repl, max_fssource_cluster, ?DEFAULT_SOURCE_PER_CLUSTER), ( length(State#state.running_sources) + length(State#state.whereis_waiting) ) < Max. -node_available({Partition, _, _}, Owners, Waiting) -> +node_available(PartitionInfo, Owners, Waiting) -> + Partition = PartitionInfo#partition_info.index, LocalNode = proplists:get_value(Partition, Owners), Max = app_helper:get_env(riak_repl, max_fssource_node, ?DEFAULT_SOURCE_PER_NODE), try riak_repl2_fssource_sup:enabled(LocalNode) of RunningList -> PartsSameNode = [Part || {Part, PNode} <- Owners, PNode =:= LocalNode], - PartsWaiting = [Part || {Part, _} <- Waiting, lists:member(Part, PartsSameNode)], + PartsWaiting = [Part || #partition_info{index = Part} <- Waiting, lists:member(Part, PartsSameNode)], if ( length(PartsWaiting) + length(RunningList) ) < Max -> case proplists:get_value(Partition, RunningList) of @@ -765,12 +805,13 @@ node_available({Partition, _, _}, Owners, Waiting) -> skip end. -remote_node_available({_Partition, _, undefined}, _Busies) -> +remote_node_available(Partition, _Busies) when Partition#partition_info.node =:= undefined -> true; -remote_node_available({_Partition, _, RemoteNode}, Busies) -> - not sets:is_element(RemoteNode, Busies). +remote_node_available(Partition, Busies) -> + not sets:is_element(Partition#partition_info.node, Busies). -start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) -> +start_fssource(PartitionVal, Ip, Port, State) -> + Partition = PartitionVal#partition_info.index, #state{owners = Owners} = State, LocalNode = proplists:get_value(Partition, Owners), lager:info("Starting fssource for ~p on ~p to ~p", [Partition, LocalNode, @@ -778,7 +819,8 @@ start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) -> case riak_repl2_fssource_sup:enable(LocalNode, Partition, {Ip, Port}) of {ok, Pid} -> link(Pid), - Running = orddict:store(Pid, PartitionVal, State#state.running_sources), + _ = riak_repl2_fssource:soft_link(Pid), + Running = lists:keystore(Pid, #partition_info.running_source, State#state.running_sources, PartitionVal#partition_info{running_source = Pid}), State#state{running_sources = Running}; {error, Reason} -> case Reason of @@ -796,7 +838,7 @@ start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) -> end, #state{transport = Transport, socket = Socket} = State, Transport:send(Socket, term_to_binary({unreserve, Partition})), - PQueue = queue:in(Partition2, State#state.partition_queue), + PQueue = queue:in(PartitionVal, State#state.partition_queue), State#state{partition_queue=PQueue} end. @@ -822,7 +864,7 @@ sort_partitions(Ring) -> sort_partitions(OffsetPartitions, BigN, []). sort_partitions([], _, Acc) -> - [{P,N,undefined} || {P,N} <- lists:reverse(Acc)]; + [#partition_info{index = P} || {P,_N} <- lists:reverse(Acc)]; sort_partitions(In, N, Acc) -> Split = min(length(In), N) - 1, {A, [P|B]} = lists:split(Split, In), @@ -880,7 +922,8 @@ gather_source_stats(PDict) -> gather_source_stats([], Acc) -> lists:reverse(Acc); -gather_source_stats([{Pid, _} | Tail], Acc) -> +gather_source_stats([PartitionInfo | Tail], Acc) -> + Pid = PartitionInfo#partition_info.running_source, try riak_repl2_fssource:legacy_status(Pid, infinity) of Stats -> gather_source_stats(Tail, [{riak_repl_util:safe_pid_to_list(Pid), Stats} | Acc]) @@ -892,10 +935,11 @@ gather_source_stats([{Pid, _} | Tail], Acc) -> is_fullsync_in_progress(State) -> QEmpty = queue:is_empty(State#state.partition_queue), + PurgatoryEmpty = queue:is_empty(State#state.purgatory), Waiting = State#state.whereis_waiting, Running = State#state.running_sources, - case {QEmpty, Waiting, Running} of - {true, [], []} -> + case {QEmpty, PurgatoryEmpty, Waiting, Running} of + {true, true, [], []} -> false; _ -> true @@ -904,9 +948,10 @@ is_fullsync_in_progress(State) -> maybe_complete_fullsync(Running, State) -> EmptyRunning = Running == [], QEmpty = queue:is_empty(State#state.partition_queue), + PurgatoryEmpty = queue:is_empty(State#state.purgatory), Waiting = State#state.whereis_waiting, - case {EmptyRunning, QEmpty, Waiting} of - {true, true, []} -> + case {EmptyRunning, QEmpty, PurgatoryEmpty, Waiting} of + {true, true, true, []} -> MyClusterName = riak_core_connection:symbolic_clustername(), lager:info("Fullsync complete from ~s to ~s", [MyClusterName, State#state.other_cluster]), @@ -971,3 +1016,39 @@ notify_rt_dirty_nodes(State = #state{dirty_nodes = DirtyNodes, nodeset_to_string_list(Set) -> string:join([erlang:atom_to_list(V) || V <- ordsets:to_list(Set)],","). + +increment_error_dict(PartitionInfo, ExitType, State) when is_record(PartitionInfo, partition_info) -> + increment_error_dict(PartitionInfo#partition_info.index, ExitType, State); + +increment_error_dict(PartitionIndex, soft_exit, State) -> + increment_error_dict(PartitionIndex, #state.soft_retries, State); + +increment_error_dict(PartitionIndex, 'EXIT', State) -> + increment_error_dict(PartitionIndex, #state.retries, State); + +increment_error_dict(PartitionIndex, ElementN, State) when is_integer(ElementN) -> + Dict = element(ElementN, State), + Dict2 = dict:update_counter(PartitionIndex, 1, Dict), + State2 = setelement(ElementN, State, Dict2), + {dict:fetch(PartitionIndex, Dict2), State2}. + +% If we are linked to a remote pid, it is possible for the disterl to +% reconnect at a bad time and lose the exit message, thus we cannot +% rely solely on the exit message to flush it. What we can do is monitor +% the process. 'DOWN' messages always come in after the exit message, so +% if we get the 'DOWN' message first, we know the exit message is never +% going to arrive, and is effectively flushed. If we get the exit first, +% we can flush the down message next, since that will arrive as a noproc +% in any case. +flush_exit_message(Pid) -> + Mon = erlang:monitor(process, Pid), + receive + {'EXIT', Pid, _} -> + receive + {'DOWN', Mon, process, Pid, _} -> + ok + end; + {'DOWN', Mon, process, Pid, _} -> + ok + end. + diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl index 013cf1ee..9e80c42b 100644 --- a/src/riak_repl2_fssource.erl +++ b/src/riak_repl2_fssource.erl @@ -3,8 +3,9 @@ -behaviour(gen_server). %% API --export([start_link/2, connected/6, connect_failed/3, start_fullsync/1, - stop_fullsync/1, cluster_name/1, legacy_status/2, fullsync_complete/1]). +-export([start_link/2, start_link/3, connected/6, connect_failed/3, + start_fullsync/1, stop_fullsync/1, fullsync_complete/1, + cluster_name/1, legacy_status/2, soft_link/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -19,11 +20,15 @@ connection_ref, fullsync_worker, work_dir, - strategy + strategy, + owner }). start_link(Partition, IP) -> - gen_server:start_link(?MODULE, [Partition, IP], []). + start_link(Partition, IP, undefined). + +start_link(Partition, IP, Owner) -> + gen_server:start_link(?MODULE, [Partition, IP, Owner], []). %% connection manager callbacks connected(Socket, Transport, Endpoint, Proto, Pid, Props) -> @@ -51,9 +56,33 @@ cluster_name(Pid) -> legacy_status(Pid, Timeout) -> gen_server:call(Pid, legacy_status, Timeout). +%% @doc Create a 'soft link' between the calling process and the fssource. +%% A soft-link allows for a soft_exit message to be sent before a normal +%% exit to any process that has created a soft link. Only one link is +%% held at a time, and alink is in only one direction (the fssource +%% reports to calling process). +soft_link(Pid) -> + % not using default long timeout because this is primarily used by the + % fscoordinator, and we don't want to potentially block that for up to + % 2 minutes. 15 seconds is bad enough in a worst case scenario. + try gen_server:call(Pid, {soft_link, self()}, timer:seconds(15)) of + ok -> % older versions returned 'ok' for the catchall + false; + true -> + true + catch + _What:Reason -> + lager:debug("Could not create soft link to ~p from ~p due to ~p", [Pid, self(), Reason]), + {error, Reason} + end. + %% gen server init([Partition, IP]) -> + init([Partition, IP, undefined]); + +init([Partition, IP, Owner]) -> + RequestedStrategy = app_helper:get_env(riak_repl, fullsync_strategy, ?DEFAULT_FULLSYNC_STRATEGY), @@ -77,7 +106,9 @@ init([Partition, IP]) -> end; {error, Reason} -> %% the vnode is probably busy. Try again later. - {stop, Reason} + {stop, Reason}; + {ok, State}-> + {ok, State#state{owner = Owner}} end. handle_call({connected, Socket, Transport, _Endpoint, Proto, Props}, @@ -149,9 +180,22 @@ handle_call(stop_fullsync, _From, State=#state{fullsync_worker=FSW, handle_call(legacy_status, _From, State=#state{fullsync_worker=FSW, socket=Socket, strategy=Strategy}) -> - Res = case is_pid(FSW) andalso is_process_alive(FSW) of - true -> gen_fsm:sync_send_all_state_event(FSW, status, infinity); - false -> [] + lager:debug("Sending status to ~p", [FSW]), + Res = case is_pid(FSW) of + true -> + % try/catch because there may be a message in the pid's + % mailbox that will cause it to exit before it gets to our + % status request message. + try gen_fsm:sync_send_all_state_event(FSW, status, infinity) of + SyncSendRes -> + SyncSendRes + catch + What:Why -> + lager:notice("Error getting fullsync worker ~p status: ~p:~p", [FSW, What, Why]), + [] + end; + false -> + [] end, SocketStats = riak_core_tcp_mon:format_socket_stats( riak_core_tcp_mon:socket_status(Socket), []), @@ -172,6 +216,10 @@ handle_call(cluster_name, _From, State) -> ClusterName end, {reply, Name, State}; +handle_call({soft_link, NewOwner}, _From, State) -> + lager:debug("Changing soft_link from ~p to ~p", [State#state.owner, NewOwner]), + State2 = State#state{owner = NewOwner}, + {reply, true, State2}; handle_call(_Msg, _From, State) -> {reply, ok, State}. @@ -221,16 +269,20 @@ handle_info({Proto, Socket, Data}, gen_fsm:send_event(State#state.fullsync_worker, Msg), {noreply, State} end; +handle_info({soft_exit, Pid, Reason}, State = #state{fullsync_worker = Pid}) -> + lager:debug("Fullsync worker exited normally, but really wanted it to be ~p", [Reason]), + maybe_soft_exit(Reason, State); handle_info(_Msg, State) -> {noreply, State}. terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir}) -> - %% check if process alive only if it's defined - case is_pid(FSW) andalso is_process_alive(FSW) of + %% try to exit the fullsync worker; if we're dying because it did, + %% don't worry about the error (cause it's already dead). + case is_pid(FSW) of false -> ok; true -> - gen_fsm:sync_send_all_state_event(FSW, stop) + catch gen_fsm:sync_send_all_state_event(FSW, stop) end, case WorkDir of undefined -> ok; @@ -306,3 +358,13 @@ connect(IP, Strategy, Partition) -> lager:warning("Error connecting to remote ~p for partition ~p", [IP, Partition]), {error, Reason} end. + +maybe_soft_exit(Reason, State) -> + case State#state.owner of + undefined -> + {stop, Reason, State}; + Owner -> + Owner ! {soft_exit, self(), Reason}, + {stop, normal, State} + end. + diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index cae1fa7e..125b7381 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -120,7 +120,8 @@ handle_info({'DOWN', TreeMref, process, Pid, Why}, _StateName, State=#state{tree %% Local hashtree process went down. Stop exchange. lager:info("Monitored pid ~p, AAE Hashtree process went down because: ~p", [Pid, Why]), send_complete(State), - {stop, {aae_hashtree_went_down, Why}, State}; + State#state.owner ! {soft_exit, self(), {aae_hastree_went_down, Why}}, + {stop, normal, State}; handle_info(Error={'DOWN', _, _, _, _}, _StateName, State) -> %% Something else exited. Stop exchange. lager:info("Something went down ~p", [Error]), @@ -195,7 +196,8 @@ prepare_exchange(start_exchange, State0=#state{transport=Transport, Error -> lager:info("AAE source failed get_lock for partition ~p, got ~p", [Partition, Error]), - {stop, Error, State} + State#state.owner ! {soft_exit, self(), Error}, + {stop, normal, State} end; {error, wrong_node} -> {stop, wrong_node, State0} @@ -209,7 +211,8 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) -> lager:info("Remote lock tree for partition ~p failed, got ~p", [Partition, Error]), send_complete(State), - {stop, {remote, Error}, State} + State#state.owner ! {soft_exit, self(), {remote, Error}}, + {stop, normal, State} end. %% @doc Now that locks have been acquired, ask both the local and remote diff --git a/src/riak_repl_wm_stats.erl b/src/riak_repl_wm_stats.erl index 4302080e..02e9a43c 100644 --- a/src/riak_repl_wm_stats.erl +++ b/src/riak_repl_wm_stats.erl @@ -125,7 +125,6 @@ format_pid_stat(Pair) -> jsonify_stats([], Acc) -> - %?debugFmt("Got []: Acc: ~w", [Acc]), lists:flatten(lists:reverse(Acc)); jsonify_stats([{fullsync, Num, _Left}|T], Acc) -> From a1166e1bff09ca3f3f89383632cba450c1815f34 Mon Sep 17 00:00:00 2001 From: Micah Warren Date: Fri, 21 Nov 2014 14:56:24 -0600 Subject: [PATCH 2/4] Fixed invalid call to update error count track. Increment_error_dict expects the partition, elementN of error dict, and the state. It pulls the dict out of the state so it put it back in place, thus just returning the state. So this call that passed the dict in was wrong. --- src/riak_repl2_fscoordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index acf4fddb..bffc68dc 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -660,7 +660,7 @@ handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Wait RetryLimit = app_helper:get_env(riak_repl, max_reserve_retries, ?DEFAULT_RESERVE_RETRIES), lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), State2 = State#state{whereis_waiting = Waiting2}, - {RetriedCount, State3} = increment_error_dict(PartitionInfo, State#state.reserve_retries, State2), + {RetriedCount, State3} = increment_error_dict(PartitionInfo, #state.reserve_retries, State2), State4 = case RetriedCount of N when N > RetryLimit, is_integer(N) -> lager:warning("Fullsync dropping partition ~p, ~p location_down failed retries", [PartitionInfo#partition_info.index, RetryLimit]), From c36c3ca876cabac73826fe29f6e94b5acdf55f8e Mon Sep 17 00:00:00 2001 From: "Engel A. Sanchez" Date: Wed, 26 Nov 2014 14:15:08 -0500 Subject: [PATCH 3/4] Fix error/retry exit counts on location down msgs When a partition is not available, perhaps after a number of retries, the error exits stat should be incremented. Also, the retry exits stat should be incremented on each retry. This was discovered when backporting the repl_location_failures riak_test. --- src/riak_repl2_fscoordinator.erl | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index bffc68dc..ae434920 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -647,7 +647,10 @@ handle_socket_msg({location_down, Partition}, #state{whereis_waiting=Waiting} = Tref = PartitionInfo#partition_info.whereis_tref, erlang:cancel_timer(Tref), Dropped = [Partition | State#state.dropped], - State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped}, + #state{retry_exits = RetryExits, error_exits = ErrorExits} = State, + State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped, + retry_exits = RetryExits + 1, + error_exits = ErrorExits + 1}, start_up_reqs(State2) end; handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Waiting} = State) -> @@ -665,10 +668,16 @@ handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Wait N when N > RetryLimit, is_integer(N) -> lager:warning("Fullsync dropping partition ~p, ~p location_down failed retries", [PartitionInfo#partition_info.index, RetryLimit]), Dropped = [Partition | State#state.dropped], - State3#state{dropped = Dropped}; + #state{retry_exits = RetryExits, + error_exits = ErrorExits} = State, + State3#state{dropped = Dropped, + error_exits = ErrorExits + 1, + retry_exits = RetryExits + 1}; _ -> PQueue = queue:in(PartitionInfo, State3#state.partition_queue), - State3#state{partition_queue = PQueue} + #state{retry_exits = RetryExits} = State, + State3#state{partition_queue = PQueue, + retry_exits = RetryExits + 1} end, start_up_reqs(State4) end. From 5479089f99605f99dfc6a8665561e072c5fc707a Mon Sep 17 00:00:00 2001 From: "Engel A. Sanchez" Date: Fri, 5 Dec 2014 11:54:57 -0500 Subject: [PATCH 4/4] Fix dialyzer warnings The one in riak_repl2_fssource is a legit bug in the code --- src/riak_repl2_fscoordinator.erl | 24 +++++++++++++----------- src/riak_repl2_fssource.erl | 8 +++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl index ae434920..fba95217 100644 --- a/src/riak_repl2_fscoordinator.erl +++ b/src/riak_repl2_fscoordinator.erl @@ -372,12 +372,14 @@ handle_cast(start_fullsync, State) -> handle_cast(stop_fullsync, State) -> % exit all running, cancel all timers, and reset the state. - [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref} <- State#state.whereis_waiting], - [begin - unlink(Pid), - riak_repl2_fssource:stop_fullsync(Pid), - riak_repl2_fssource_sup:disable(node(Pid), Part) - end || #partition_info{index = Part, running_source = Pid} <- State#state.running_sources], + _ = [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref} + <- State#state.whereis_waiting], + _ = [begin + unlink(Pid), + riak_repl2_fssource:stop_fullsync(Pid), + riak_repl2_fssource_sup:disable(node(Pid), Part) + end || #partition_info{index = Part, running_source = Pid} + <- State#state.running_sources], State2 = State#state{ largest_n = undefined, owners = [], @@ -591,7 +593,7 @@ handle_socket_msg({location, Partition, {Node, Ip, Port}}, #state{whereis_waitin State; {value, PartitionInfo, Waiting2} -> Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), % we don't know for sure it's no longer busy until we get a busy reply NewBusies = sets:del_element(Node, State#state.busy_nodes), State2 = State#state{whereis_waiting = Waiting2, busy_nodes = NewBusies}, @@ -608,7 +610,7 @@ handle_socket_msg({location_busy, Partition}, #state{whereis_waiting = Waiting} lager:info("anya Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, PartitionInfo#partition_info.node]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), State2 = State#state{whereis_waiting = Waiting2}, Partition2 = PartitionInfo#partition_info{whereis_tref = undefined}, PQueue = State2#state.partition_queue, @@ -624,7 +626,7 @@ handle_socket_msg({location_busy, Partition, Node}, #state{whereis_waiting = Wai {value, PartitionInfo, Waiting2} -> lager:info("Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, Node]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), State2 = State#state{whereis_waiting = Waiting2}, @@ -645,7 +647,7 @@ handle_socket_msg({location_down, Partition}, #state{whereis_waiting=Waiting} = lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), Dropped = [Partition | State#state.dropped], #state{retry_exits = RetryExits, error_exits = ErrorExits} = State, State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped, @@ -659,7 +661,7 @@ handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Wait State; {value, PartitionInfo, Waiting2} -> Tref = PartitionInfo#partition_info.whereis_tref, - erlang:cancel_timer(Tref), + _ = erlang:cancel_timer(Tref), RetryLimit = app_helper:get_env(riak_repl, max_reserve_retries, ?DEFAULT_RESERVE_RETRIES), lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]), State2 = State#state{whereis_waiting = Waiting2}, diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl index 9e80c42b..d193f8af 100644 --- a/src/riak_repl2_fssource.erl +++ b/src/riak_repl2_fssource.erl @@ -101,14 +101,12 @@ init([Partition, IP, Owner]) -> case connect(IP, SupportedStrategy, Partition) of {error, Reason} -> {stop, Reason}; - Result -> - Result + {ok, State}-> + {ok, State#state{owner = Owner}} end; {error, Reason} -> %% the vnode is probably busy. Try again later. - {stop, Reason}; - {ok, State}-> - {ok, State#state{owner = Owner}} + {stop, Reason} end. handle_call({connected, Socket, Transport, _Endpoint, Proto, Props},