diff --git a/dialyzer.ignore-warnings b/dialyzer.ignore-warnings
index 3487cc7d..33aa0b34 100644
--- a/dialyzer.ignore-warnings
+++ b/dialyzer.ignore-warnings
@@ -4,9 +4,7 @@ riak_repl_keylist_client.erl:267: The call application:unset_env('riak_repl',{'p
riak_repl_keylist_client.erl:120: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:132: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term())
riak_core_connection.erl:108: Function exchange_handshakes_with/4 has no local return
-riak_core_connection.erl:171: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
-riak_core_connection.erl:189: Function try_ssl/4 will never be called
-riak_core_connection.erl:225: Function negotiate_proto_with_server/3 will never be called
+riak_core_connection.erl:172: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
riak_repl_keylist_client.erl:106: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:216: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:265: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
diff --git a/include/riak_core_connection.hrl b/include/riak_core_connection.hrl
index 2ce0c5c8..1006efb3 100644
--- a/include/riak_core_connection.hrl
+++ b/include/riak_core_connection.hrl
@@ -22,12 +22,13 @@
%% handshake messages to safely initiate a connection. Let's not accept
%% a connection to a telnet session by accident!
--define(CTRL_REV, {1,0}).
+-define(CTRL_REV, {1,1}).
-define(CTRL_HELLO, <<"riak-ctrl:hello">>).
-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>).
-define(CTRL_ACK, <<"riak-ctrl:ack">>).
-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>).
-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>).
+-define(CTRL_ALL_MEMBERS, <<"riak-ctrl:all_members">>).
-define(CONNECTION_SETUP_TIMEOUT, 10000).
diff --git a/include/riak_repl.hrl b/include/riak_repl.hrl
index 2dd83a58..fae6a6e7 100644
--- a/include/riak_repl.hrl
+++ b/include/riak_repl.hrl
@@ -28,6 +28,9 @@
-define(DEFAULT_SOURCE_RETRIES, infinity).
%% How many times we should retry when failing a reservation
-define(DEFAULT_RESERVE_RETRIES, 0).
+%% How many times during a fullsync we should retry a partion that has sent
+%% a 'soft_exit' message to the coordinator
+-define(DEFAULT_SOURCE_SOFT_RETRIES, infinity).
%% 20 seconds. sources should claim within 5 seconds, but give them a little more time
-define(RESERVATION_TIMEOUT, (20 * 1000)).
-define(DEFAULT_MAX_FS_BUSIES_TOLERATED, 10).
diff --git a/src/riak_core_cluster_conn.erl b/src/riak_core_cluster_conn.erl
index 51c08b26..295d2e72 100644
--- a/src/riak_core_cluster_conn.erl
+++ b/src/riak_core_cluster_conn.erl
@@ -75,19 +75,21 @@
-type remote() :: {cluster_by_name, clustername()} | {cluster_by_addr, ip_addr()}.
-type peer_address() :: {string(), pos_integer()}.
+-type node_address() :: {atom(), peer_address()}.
-type ranch_transport_messages() :: {atom(), atom(), atom()}.
-record(state, {mode :: atom(),
remote :: remote(),
socket :: port(),
name :: clustername(),
previous_name="undefined" :: clustername(),
- members=[] :: [peer_address()],
+ members=[] :: [peer_address() | node_address()],
connection_ref :: reference(),
connection_timeout :: timeout(),
transport :: atom(),
address :: peer_address(),
connection_props :: proplists:proplist(),
- transport_msgs :: ranch_transport_messages()}).
+ transport_msgs :: ranch_transport_messages(),
+ proto_version :: {non_neg_integer(), non_neg_integer()} }).
-type state() :: #state{}.
%%%===================================================================
@@ -121,14 +123,17 @@ status(Ref, Timeout) ->
connected(Socket,
Transport,
Addr,
- {?REMOTE_CLUSTER_PROTO_ID, _MyVer, _RemoteVer},
+ {?REMOTE_CLUSTER_PROTO_ID,
+ _MyVer ={CommonMajor,LocalMinor},
+ _RemoteVer={CommonMajor,RemoteMinor}},
{_Remote, Client},
Props) ->
%% give control over the socket to the `Client' process.
%% tell client we're connected and to whom
Transport:controlling_process(Socket, Client),
gen_fsm:send_event(Client,
- {connected_to_remote, Socket, Transport, Addr, Props}).
+ {connected_to_remote, Socket, Transport, Addr, Props,
+ {CommonMajor, min(LocalMinor,RemoteMinor)}}).
-spec connect_failed({term(), term()}, {error, term()}, {_, atom() | pid() | port() | {atom(), _} | {via, _, _}}) -> ok.
connect_failed({_Proto, _Vers}, {error, _}=Error, {_Remote, Client}) ->
@@ -189,7 +194,7 @@ connecting({connect_failed, Error}, State=#state{remote=Remote}) ->
%% This is fatal! We are being supervised by conn_sup and if we
%% die, it will restart us.
{stop, Error, State};
-connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
+connecting({connected_to_remote, Socket, Transport, Addr, Props, ProtoVersion}, State) ->
RemoteName = proplists:get_value(clustername, Props),
_ = lager:debug("Cluster Manager control channel client connected to"
" remote ~p at ~p named ~p",
@@ -202,7 +207,9 @@ connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
transport=Transport,
address=Addr,
connection_props=Props,
- transport_msgs = TransportMsgs},
+ transport_msgs = TransportMsgs,
+ proto_version=ProtoVersion
+ },
_ = request_cluster_name(UpdState),
{next_state, waiting_for_cluster_name, UpdState, ?CONNECTION_SETUP_TIMEOUT};
connecting(poll_cluster, State) ->
@@ -240,13 +247,21 @@ waiting_for_cluster_name(_, _From, _State) ->
{reply, ok, waiting_for_cluster_name, _State}.
%% Async message handling for the `waiting_for_cluster_members' state
-waiting_for_cluster_members({cluster_members, Members}, State) ->
+waiting_for_cluster_members({cluster_members, NewMembers}, State = #state{ proto_version={1,0} }) ->
#state{address=Addr,
name=Name,
previous_name=PreviousName,
+ members=OldMembers,
remote=Remote} = State,
- %% This is the first time we're updating the cluster manager
- %% with the name of this cluster, so it's old name is undefined.
+ %% this is 1.0 code. NewMembers is list of {IP,Port}
+
+ SortedNew = ordsets:from_list(NewMembers),
+ Members =
+ NewMembers ++ lists:filter(fun(Mem) ->
+ not ordsets:is_element(Mem, SortedNew)
+ end,
+ OldMembers),
+
ClusterUpdatedMsg = {cluster_updated,
PreviousName,
Name,
@@ -255,14 +270,48 @@ waiting_for_cluster_members({cluster_members, Members}, State) ->
Remote},
gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
{next_state, connected, State#state{members=Members}};
+waiting_for_cluster_members({all_cluster_members, NewMembers}, State) ->
+ #state{address=Addr,
+ name=Name,
+ previous_name=PreviousName,
+ members=OldMembers,
+ remote=Remote} = State,
+
+ %% this is 1.1+ code. Members is list of {node,{IP,Port}}
+
+ Members =
+ lists:foldl(fun(Elm={_Node,{_Ip,Port}}, Acc) when is_integer(Port) ->
+ [Elm|Acc];
+ ({Node,_}, Acc) ->
+ case lists:keyfind(Node, 1, OldMembers) of
+ Elm={Node,{_IP,Port}} when is_integer(Port) ->
+ [Elm|Acc];
+ _ ->
+ Acc
+ end
+ end,
+ [],
+ NewMembers ),
+
+ ClusterUpdatedMsg = {cluster_updated,
+ PreviousName,
+ Name,
+ [Member || {_Node,Member} <- Members],
+ Addr,
+ Remote},
+ gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
+ {next_state, connected, State#state{members=Members}};
waiting_for_cluster_members(_, _State) ->
{next_state, waiting_for_cluster_members, _State}.
%% Sync message handling for the `waiting_for_cluster_members' state
waiting_for_cluster_members(status, _From, State) ->
{reply, {waiting_for_cluster_members, State#state.name}, waiting_for_cluster_members, State};
-waiting_for_cluster_members(_, _From, _State) ->
- {reply, ok, waiting_for_cluster_members, _State}.
+waiting_for_cluster_members(Other, _From, State) ->
+ _ = lager:error("cluster_conn: client got unexpected "
+ "msg from remote: ~p, ~p",
+ [State#state.remote, Other]),
+ {reply, ok, waiting_for_cluster_members, State}.
%% Async message handling for the `connected' state
connected(poll_cluster, State) ->
@@ -307,11 +356,18 @@ handle_info({TransOK, Socket, Name},
{next_state, waiting_for_cluster_name, State};
handle_info({TransOK, Socket, Members},
waiting_for_cluster_members,
- State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
+ State=#state{socket=Socket, transport_msgs = {TransOK, _, _}, proto_version={1,0}}) ->
Transport = State#state.transport,
gen_fsm:send_event(self(), {cluster_members, binary_to_term(Members)}),
_ = Transport:setopts(Socket, [{active, once}]),
{next_state, waiting_for_cluster_members, State};
+handle_info({TransOK, Socket, Members},
+ waiting_for_cluster_members,
+ State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
+ Transport = State#state.transport,
+ gen_fsm:send_event(self(), {all_cluster_members, binary_to_term(Members)}),
+ _ = Transport:setopts(Socket, [{active, once}]),
+ {next_state, waiting_for_cluster_members, State};
handle_info({TransOK, Socket, Data},
StateName,
State=#state{address=Addr,
@@ -362,18 +418,25 @@ code_change(_OldVsn, StateName, State, _Extra) ->
request_cluster_name(#state{mode=test}) ->
ok;
request_cluster_name(#state{socket=Socket, transport=Transport}) ->
- _ = inet:setopts(Socket, [{active, once}]),
+ _ = Transport:setopts(Socket, [{active, once}]),
Transport:send(Socket, ?CTRL_ASK_NAME).
-spec request_member_ips(state()) -> ok | {error, term()}.
request_member_ips(#state{mode=test}) ->
ok;
-request_member_ips(#state{socket=Socket, transport=Transport}) ->
+request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,0}}) ->
Transport:send(Socket, ?CTRL_ASK_MEMBERS),
%% get the IP we think we've connected to
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
%% make it a string
PeerIPStr = inet_parse:ntoa(PeerIP),
+ Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort}));
+request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,1}}) ->
+ Transport:send(Socket, ?CTRL_ALL_MEMBERS),
+ %% get the IP we think we've connected to
+ {ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
+ %% make it a string
+ PeerIPStr = inet_parse:ntoa(PeerIP),
Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort})).
initiate_connection(State=#state{mode=test}) ->
@@ -383,7 +446,7 @@ initiate_connection(State=#state{remote=Remote}) ->
%% `riak_core_connection_mgr::connect/4' is incorrect.
{ok, Ref} = riak_core_connection_mgr:connect(
Remote,
- {{?REMOTE_CLUSTER_PROTO_ID, [{1,0}]},
+ {{?REMOTE_CLUSTER_PROTO_ID, [{1,1},{1,0}]},
{?CTRL_OPTIONS, ?MODULE, {Remote, self()}}},
default),
State#state{connection_ref=Ref}.
diff --git a/src/riak_core_cluster_mgr.erl b/src/riak_core_cluster_mgr.erl
index 8a70241c..afba88f4 100644
--- a/src/riak_core_cluster_mgr.erl
+++ b/src/riak_core_cluster_mgr.erl
@@ -70,6 +70,7 @@
leader_node = undefined :: undefined | node(),
gc_interval = infinity,
member_fun = fun(_Addr) -> [] end, % return members of local cluster
+ all_member_fun = fun(_Addr) -> [] end, % return members of local cluster
restore_targets_fun = fun() -> [] end, % returns persisted cluster targets
save_members_fun = fun(_C,_M) -> ok end, % persists remote cluster members
balancer_fun = fun(Addrs) -> Addrs end, % registered balancer function
@@ -77,11 +78,12 @@
}).
-export([start_link/0,
- start_link/3,
+ start_link/4,
set_leader/2,
get_leader/0,
get_is_leader/0,
register_member_fun/1,
+ register_all_member_fun/1,
register_restore_cluster_targets_fun/1,
register_save_cluster_members_fun/1,
add_remote_cluster/1, remove_remote_cluster/1,
@@ -90,7 +92,8 @@
get_ipaddrs_of_cluster/1,
set_gc_interval/1,
stop/0,
- connect_to_clusters/0
+ connect_to_clusters/0,
+ shuffle_remote_ipaddrs/1
]).
%% gen_server callbacks
@@ -100,7 +103,7 @@
%% internal functions
-export([%ctrlService/5, ctrlServiceProcess/5,
round_robin_balancer/1, cluster_mgr_sites_fun/0,
- get_my_members/1]).
+ get_my_members/1, get_all_members/1]).
-export([ensure_valid_ip_addresses/1]).
@@ -111,8 +114,8 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-start_link(DefaultLocator, DefaultSave, DefaultRestore) ->
- Args = [DefaultLocator, DefaultSave, DefaultRestore],
+start_link(DefaultLocator, DefaultAllLocator, DefaultSave, DefaultRestore) ->
+ Args = [DefaultLocator, DefaultAllLocator, DefaultSave, DefaultRestore],
Options = [],
gen_server:start_link({local, ?SERVER}, ?MODULE, Args, Options).
@@ -139,6 +142,12 @@ get_is_leader() ->
register_member_fun(MemberFun) ->
gen_server:cast(?SERVER, {register_member_fun, MemberFun}).
+%% @doc Register a function that will get called to get out local riak node
+%% member's IP addrs. MemberFun(ip_addr()) -> [{node(),{IP,Port}}] were IP is a string
+-spec register_all_member_fun(MemberFun :: fun((ip_addr()) -> [{atom(),{string(),pos_integer()}}])) -> 'ok'.
+register_all_member_fun(MemberFun) ->
+ gen_server:cast(?SERVER, {register_all_member_fun, MemberFun}).
+
register_restore_cluster_targets_fun(ReadClusterFun) ->
gen_server:cast(?SERVER, {register_restore_cluster_targets_fun, ReadClusterFun}).
@@ -168,9 +177,17 @@ get_connections() ->
get_my_members(MyAddr) ->
gen_server:call(?SERVER, {get_my_members, MyAddr}, infinity).
+get_all_members(MyAddr) ->
+ gen_server:call(?SERVER, {get_all_members, MyAddr}, infinity).
+
%% @doc Return a list of the known IP addresses of all nodes in the remote cluster.
get_ipaddrs_of_cluster(ClusterName) ->
- gen_server:call(?SERVER, {get_known_ipaddrs_of_cluster, {name,ClusterName}}, infinity).
+ case gen_server:call(?SERVER, {get_known_ipaddrs_of_cluster, {name,ClusterName}}, infinity) of
+ {ok, Reply} ->
+ shuffle_remote_ipaddrs(Reply);
+ Reply ->
+ Reply
+ end.
%% @doc stops the local server.
-spec stop() -> 'ok'.
@@ -190,7 +207,7 @@ init(Defaults) ->
%% start our cluster_mgr service if not already started.
case riak_core_service_mgr:is_registered(?CLUSTER_PROTO_ID) of
false ->
- ServiceProto = {?CLUSTER_PROTO_ID, [{1,0}]},
+ ServiceProto = {?CLUSTER_PROTO_ID, [{1,1}, {1,0}]},
%ServiceSpec = {ServiceProto, {?CTRL_OPTIONS, ?MODULE, ctrlService, []}},
ServiceSpec = {ServiceProto, {?CTRL_OPTIONS, riak_core_cluster_serv, start_link, []}},
riak_core_service_mgr:sync_register_service(ServiceSpec, {round_robin,?MAX_CONS});
@@ -226,7 +243,18 @@ handle_call(get_is_leader, _From, State) ->
handle_call({get_my_members, MyAddr}, _From, State) ->
%% This doesn't need to call the leader.
MemberFun = State#state.member_fun,
- MyMembers = [{string_of_ip(IP),Port} || {IP,Port} <- MemberFun(MyAddr)],
+ MyMembers = [{string_of_ip(IP),Port} || {IP,Port} <- MemberFun(MyAddr), is_integer(Port)],
+ {reply, MyMembers, State};
+
+handle_call({get_all_members, MyAddr}, _From, State) ->
+ %% This doesn't need to call the leader.
+ AllMemberFun = State#state.all_member_fun,
+ MyMembers = lists:map(fun({Node,{IP,Port}}) when is_integer(Port) ->
+ {Node,{string_of_ip(IP),Port}};
+ ({Node,_}) ->
+ {Node, unreachable}
+ end,
+ AllMemberFun(MyAddr)),
{reply, MyMembers, State};
handle_call(leader_node, _From, State) ->
@@ -309,6 +337,9 @@ handle_cast({set_gc_interval, Interval}, State) ->
handle_cast({register_member_fun, Fun}, State) ->
{noreply, State#state{member_fun=Fun}};
+handle_cast({register_all_member_fun, Fun}, State) ->
+ {noreply, State#state{all_member_fun=Fun}};
+
handle_cast({register_save_cluster_members_fun, Fun}, State) ->
{noreply, State#state{save_members_fun=Fun}};
@@ -347,9 +378,6 @@ handle_cast({remove_remote_cluster, Cluster}, State) ->
{noreply, State2};
%% The client connection recived (or polled for) an update from the remote cluster.
-%% Note that here, the Members are sorted in least connected order. Preserve that.
-%% TODO: we really want to keep all nodes we have every seen, except remove nodes
-%% that explicitly leave the cluster or show up in other clusters.
handle_cast({cluster_updated, "undefined", NewName, Members, Addr,
{cluster_by_addr, _CAddr}=Remote}, State) ->
%% replace connection by address with connection by clustername if that would be safe.
@@ -422,9 +450,10 @@ register_defaults(Defaults, State) ->
case Defaults of
[] ->
State;
- [MembersFun, SaveFun, RestoreFun] ->
+ [MembersFun, AllMembersFun, SaveFun, RestoreFun] ->
lager:debug("Registering default cluster manager functions."),
State#state{member_fun=MembersFun,
+ all_member_fun=AllMembersFun,
save_members_fun=SaveFun,
restore_targets_fun=RestoreFun}
end.
@@ -521,8 +550,8 @@ save_cluster(NewName, OldMembers, ReturnedMembers, State) ->
[NewName, Members]);
_ ->
persist_members_to_ring(State, NewName, Members),
- lager:info("Cluster Manager: updated ~p with members: ~p",
- [NewName, Members])
+ lager:info("Cluster Manager: updated ~p with members: ~p OldMembers ~p",
+ [NewName, Members, OldMembers])
end
end,
%% clear out these IPs from other clusters
@@ -727,3 +756,34 @@ connect_to_persisted_clusters(State) ->
_ ->
ok
end.
+
+shuffle_with_seed(List, Seed={_,_,_}) ->
+ _ = random:seed(Seed),
+ [E || {E, _} <- lists:keysort(2, [{Elm, random:uniform()} || Elm <- List])];
+shuffle_with_seed(List, Seed) ->
+ <<_:10,S1:50,S2:50,S3:50>> = crypto:hash(sha, term_to_binary(Seed)),
+ shuffle_with_seed(List, {S1,S2,S3}).
+
+
+shuffle_remote_ipaddrs([]) ->
+ {ok, []};
+shuffle_remote_ipaddrs(RemoteUnsorted) ->
+ {ok, MyRing} = riak_core_ring_manager:get_my_ring(),
+ SortedNodes = lists:sort(riak_core_ring:all_members(MyRing)),
+ NodesTagged = lists:zip(lists:seq(1, length(SortedNodes)), SortedNodes),
+ case lists:keyfind(node(), 2, NodesTagged) of
+ {MyPos, _} ->
+ OurClusterName = riak_core_connection:symbolic_clustername(),
+ RemoteAddrs = shuffle_with_seed(lists:sort(RemoteUnsorted), [OurClusterName]),
+
+ %% MyPos is the position if *this* node in the sorted list of
+ %% all nodes in my ring. Now choose the node at the corresponding
+ %% index in RemoteAddrs as out "buddy"
+ SplitPos = ((MyPos-1) rem length(RemoteAddrs)),
+ case lists:split(SplitPos,RemoteAddrs) of
+ {BeforeBuddy,[Buddy|AfterBuddy]} ->
+ {ok, [Buddy | shuffle_with_seed(AfterBuddy ++ BeforeBuddy, node())]}
+ end;
+ false ->
+ {ok, shuffle_with_seed(lists:sort(RemoteUnsorted), node())}
+ end.
diff --git a/src/riak_core_cluster_mgr_sup.erl b/src/riak_core_cluster_mgr_sup.erl
index 0b476c6b..454b31a0 100644
--- a/src/riak_core_cluster_mgr_sup.erl
+++ b/src/riak_core_cluster_mgr_sup.erl
@@ -19,6 +19,7 @@ start_link() ->
%% @doc supervisor callback.
init([]) ->
ClusterMgrDefaults = [fun riak_repl_app:cluster_mgr_member_fun/1,
+ fun riak_repl_app:cluster_mgr_all_member_fun/1,
fun riak_repl_app:cluster_mgr_write_cluster_members_to_ring/2,
fun riak_repl_app:cluster_mgr_read_cluster_targets_from_ring/0],
Processes =
diff --git a/src/riak_core_cluster_serv.erl b/src/riak_core_cluster_serv.erl
index 915142dd..5df5d93d 100644
--- a/src/riak_core_cluster_serv.erl
+++ b/src/riak_core_cluster_serv.erl
@@ -115,6 +115,17 @@ handle_socket_info(?CTRL_ASK_MEMBERS, Transport, Socket, State) ->
{stop, Else, State}
end;
+handle_socket_info(?CTRL_ALL_MEMBERS, Transport, Socket, State) ->
+ case read_ip_address(Socket, Transport, State#state.remote_addr) of
+ {ok, RemoteConnectedToIp} ->
+ Members = gen_server:call(?CLUSTER_MANAGER_SERVER, {get_all_members, RemoteConnectedToIp}, infinity),
+ ok = Transport:send(Socket, term_to_binary(Members)),
+ ok = Transport:setopts(Socket, [{active, once}]),
+ {noreply, State};
+ Else ->
+ {stop, Else, State}
+ end;
+
handle_socket_info(OtherData, _Transport, _Socket, State) ->
ok = lager:warning("Some other data from the socket: ~p", [OtherData]),
{stop, {error, unrecognized_request}, State}.
diff --git a/src/riak_core_connection.erl b/src/riak_core_connection.erl
index df2f4749..0f00bc46 100644
--- a/src/riak_core_connection.erl
+++ b/src/riak_core_connection.erl
@@ -29,7 +29,8 @@
%%
The server sends `term_to_binary({?CTRL_ACK, ?CTRL_REV, TheirCaps}).'
%% If the server and client agree on SSL, the session is upgraded.
%% The client sends `term_to_binary({Protocol, Version}).'
-%% The server sends `term_to_binary({ok, {ProtoName, {CommonMajor, RemoteMinor, LocalMinor}}})', after which we call ?MODULE:connect/6 and exit.
+%% The server sends `term_to_binary({ok, {ProtoName, {CommonMajor, RemoteMinor, LocalMinor}}})',
+%% after which we call ?MODULE:connect/6 and exit.
-module(riak_core_connection).
-behavior(gen_fsm).
@@ -170,7 +171,17 @@ init({IP, Port, Protocol, ProtoVers, SocketOptions, Mod, ModArgs}) ->
Hello = term_to_binary({?CTRL_HELLO, ?CTRL_REV, MyCaps}),
ok = ranch_tcp:send(Socket, Hello),
ranch_tcp:setopts(Socket, [{active, once}]),
- State = #state{transport = ranch_tcp, socket = Socket, protocol = Protocol, protovers = ProtoVers, socket_opts = SocketOptions, mod = Mod, mod_args = ModArgs, cluster_name = MyName, local_capabilities = MyCaps, ip = IP, port = Port},
+ State = #state{transport = ranch_tcp,
+ socket = Socket,
+ protocol = Protocol,
+ protovers = ProtoVers,
+ socket_opts = SocketOptions,
+ mod = Mod,
+ mod_args = ModArgs,
+ cluster_name = MyName,
+ local_capabilities = MyCaps,
+ ip = IP,
+ port = Port},
{ok, wait_for_capabilities, State};
Else ->
lager:warning("Could not connect ~p:~p due to ~p", [IP, Port, Else]),
@@ -207,7 +218,9 @@ handle_info({_Transport, Socket, Data}, wait_for_capabilities, State = #state{so
FullProto = {State#state.protocol, State#state.protovers},
NewTransport:send(Socket, erlang:term_to_binary(FullProto)),
NewTransport:setopts(Socket, [{active, once}]),
- State2 = State#state{transport = NewTransport, socket = NewSocket, remote_capabilities = TheirCaps},
+ State2 = State#state{transport = NewTransport,
+ socket = NewSocket,
+ remote_capabilities = TheirCaps},
{next_state, wait_for_protocol, State2}
end;
Else ->
@@ -222,7 +235,12 @@ handle_info({_TransTag, Socket, Data}, wait_for_protocol, State = #state{socket
IpPort = {State#state.ip, State#state.port},
NegotiatedProto = {ProtoName, {CommonMajor, LocalMinor}, {CommonMajor, RemoteMinor}},
_ = Transport:setopts(Socket, State#state.socket_opts),
- _ModStarted = Module:connected(Socket, Transport, IpPort, NegotiatedProto, ModArgs, State#state.remote_capabilities),
+ _ModStarted = Module:connected(Socket,
+ Transport,
+ IpPort,
+ NegotiatedProto,
+ ModArgs,
+ State#state.remote_capabilities),
{stop, normal, State};
Else ->
lager:warning("Invalid version returned: ~p", [Else]),
@@ -260,7 +278,6 @@ try_ssl(Socket, Transport, MyCaps, TheirCaps) ->
{Transport, Socket};
{true, true} ->
lager:info("~p and ~p agreed to use SSL", [MyName, TheirName]),
- ok = ssl:start(),
case riak_core_ssl_util:upgrade_client_to_ssl(Socket, riak_core) of
{ok, SSLSocket} ->
{ranch_ssl, SSLSocket};
@@ -270,4 +287,3 @@ try_ssl(Socket, Transport, MyCaps, TheirCaps) ->
{error, Reason}
end
end.
-
diff --git a/src/riak_core_connection_mgr.erl b/src/riak_core_connection_mgr.erl
index 7abc1a41..831e8254 100644
--- a/src/riak_core_connection_mgr.erl
+++ b/src/riak_core_connection_mgr.erl
@@ -113,6 +113,7 @@
reset_backoff/0,
get_request_states/0,
get_connection_errors/1,
+ filter_blacklisted_ipaddrs/1,
stop/0
]).
@@ -204,6 +205,10 @@ get_request_states() ->
get_connection_errors(Addr) ->
gen_server:call(?SERVER, {get_connection_errors, Addr}).
+%% @doc Remove the blacklisted addresses from given list
+filter_blacklisted_ipaddrs(Addrs) ->
+ gen_server:call(?SERVER, {filter_blacklisted_ipaddrs, Addrs}).
+
%% doc Stop the server and sever all connections.
stop() ->
gen_server:call(?SERVER, stop).
@@ -287,6 +292,10 @@ handle_call({get_connection_errors, Addr}, _From, State = #state{endpoints=Endpo
{reply, [], State}
end;
+handle_call({filter_blacklisted_ipaddrs, Addrs}, _From, State=#state{ endpoints=Eps }) ->
+ Answer = filter_blacklisted_endpoints(Addrs, Eps),
+ {reply, Answer, State};
+
handle_call(_Unhandled, _From, State) ->
lager:debug("Unhandled gen_server call: ~p", [_Unhandled]),
{reply, {error, unhandled}, State}.
@@ -326,7 +335,7 @@ handle_info({backoff_timer, Addr}, State = #state{endpoints = EPs}) ->
{noreply, State#state{endpoints = orddict:store(Addr,EP2,EPs)}};
error ->
%% TODO: Should never happen because the Addr came from the EP list.
- {norepy, State}
+ {noreply, State}
end;
handle_info({retry_req, Ref}, State = #state{pending = Pending}) ->
case lists:keyfind(Ref, #req.ref, Pending) of
@@ -649,7 +658,8 @@ filter_blacklisted_endpoints(EpAddrs, AllEps) ->
{ok, EP} ->
EP#ep.is_black_listed == false;
error ->
- false
+ %% If we don't know this endpoint, it is not blacklisted.
+ true
end
end),
lists:filter(PredicateFun, EpAddrs).
diff --git a/src/riak_core_connection_mgr_stats.erl b/src/riak_core_connection_mgr_stats.erl
index a20fc983..163814c4 100644
--- a/src/riak_core_connection_mgr_stats.erl
+++ b/src/riak_core_connection_mgr_stats.erl
@@ -59,11 +59,8 @@ register_stats() ->
%% When the cache needs to get the latest values, it will call our
%% `produce_stats()' function.
get_stats() ->
- case riak_core_stat_cache:get_stats(?APP) of
- {ok, Stats, _TS} ->
- Stats;
- Error -> Error
- end.
+ {ok, Stats, _TS} = riak_core_stat_cache:get_stats(?APP),
+ Stats.
get_consolidated_stats() ->
Strings = [format_stat(Stat) || Stat <- get_stats()],
diff --git a/src/riak_core_service_conn.erl b/src/riak_core_service_conn.erl
index 838a1c3b..be3caafe 100644
--- a/src/riak_core_service_conn.erl
+++ b/src/riak_core_service_conn.erl
@@ -217,7 +217,7 @@ choose_version({ClientProto,ClientVersions}=_CProtocol, HostProtocols) ->
case [H || {{HostProto,_Versions},_Rest}=H <- HostProtocols, ClientProto == HostProto] of
[] ->
%% oops! The host does not support this sub protocol type
- lager:error("Failed to find host support for protocol: ~p", [ClientProto]),
+ lager:error("Failed to find host support for protocol: ~p, HostProtocols = ~p", [ClientProto, HostProtocols]),
lager:debug("choose_version: no common protocols"),
{error,protocol_not_supported};
[{{_HostProto,HostVersions},Rest}=_Matched | _DuplicatesIgnored] ->
diff --git a/src/riak_core_service_mgr.erl b/src/riak_core_service_mgr.erl
index 42d308c6..af492cf3 100644
--- a/src/riak_core_service_mgr.erl
+++ b/src/riak_core_service_mgr.erl
@@ -144,7 +144,7 @@ is_registered(ProtocolId) ->
%% @doc Register a callback function that will get called periodically or
%% when the connection status of services changes. The function will
-%% receive a list of tuples: `{, }' where stats
+%% receive a list of tuples: {<protocol-id>, <stats>} where stats
%% holds the number of open connections that have been accepted for that
%% protocol type. This can be used to report load, in the form of
%% connected-ness, for each protocol type, to remote clusters, e.g.,
diff --git a/src/riak_repl2_fscoordinator.erl b/src/riak_repl2_fscoordinator.erl
index a2b60f75..aa6f391f 100644
--- a/src/riak_repl2_fscoordinator.erl
+++ b/src/riak_repl2_fscoordinator.erl
@@ -61,12 +61,16 @@
partition_queue = queue:new(),
retries = dict:new(),
reserve_retries = dict:new(),
+ soft_retries = dict:new(),
whereis_waiting = [],
busy_nodes = sets:new(),
running_sources = [],
+ purgatory = queue:new(),
+ dropped = [],
successful_exits = 0,
error_exits = 0,
retry_exits = 0,
+ soft_retry_exits = 0,
pending_fullsync = false,
dirty_nodes = ordsets:new(), % these nodes should run fullsync
dirty_nodes_during_fs = ordsets:new(), % these nodes reported realtime errors
@@ -74,9 +78,17 @@
fullsyncs_completed = 0,
fullsync_start_time = undefined,
last_fullsync_duration = undefined,
+ last_fullsync_completed = undefined,
stat_cache = #stat_cache{}
}).
+-record(partition_info, {
+ index,
+ node,
+ running_source,
+ whereis_tref
+}).
+
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
@@ -249,14 +261,21 @@ handle_call(status, _From, State = #state{socket=Socket}) ->
undefined -> undefined;
_N -> calendar:gregorian_seconds_to_datetime(State#state.fullsync_start_time)
end,
+ FinishTime =
+ case State#state.last_fullsync_completed of
+ undefined -> undefined;
+ LastFSCompleted -> calendar:gregorian_seconds_to_datetime(LastFSCompleted)
+ end,
SelfStats = [
{cluster, State#state.other_cluster},
{queued, queue:len(State#state.partition_queue)},
{in_progress, length(State#state.running_sources)},
+ {waiting_for_retry, queue:len(State#state.purgatory)},
{starting, length(State#state.whereis_waiting)},
{successful_exits, State#state.successful_exits},
{error_exits, State#state.error_exits},
{retry_exits, State#state.retry_exits},
+ {soft_retry_exits, State#state.soft_retry_exits},
{busy_nodes, sets:size(State#state.busy_nodes)},
{last_running_refresh, StatFreshness},
{running_stats, SourceStats},
@@ -264,6 +283,7 @@ handle_call(status, _From, State = #state{socket=Socket}) ->
{fullsyncs_completed, State#state.fullsyncs_completed},
{last_fullsync_started, StartTime},
{last_fullsync_duration, State#state.last_fullsync_duration},
+ {last_fullsync_completed, FinishTime},
{fullsync_suggested,
nodeset_to_string_list(State#state.dirty_nodes)},
{fullsync_suggested_during_fs,
@@ -321,7 +341,7 @@ handle_cast({connected, Socket, Transport, _Endpoint, _Proto}, State) ->
handle_cast({connect_failed, _From, Why}, State) ->
lager:warning("Fullsync remote connection to ~p failed due to ~p, retrying",
[State#state.other_cluster, Why]),
- {stop, normal, State};
+ {stop, connect_failed, State};
handle_cast(start_fullsync, #state{socket=undefined} = State) ->
%% not connected yet...
@@ -345,6 +365,9 @@ handle_cast(start_fullsync, State) ->
partition_queue = queue:from_list(Partitions),
retries = dict:new(),
reserve_retries = dict:new(),
+ purgatory = queue:new(),
+ soft_retries = dict:new(),
+ dropped = [],
successful_exits = 0,
error_exits = 0,
retry_exits = 0,
@@ -356,18 +379,22 @@ handle_cast(start_fullsync, State) ->
handle_cast(stop_fullsync, State) ->
% exit all running, cancel all timers, and reset the state.
- _ = [erlang:cancel_timer(Tref) || {_, {_, Tref}} <- State#state.whereis_waiting],
+ _ = [erlang:cancel_timer(Tref) || #partition_info{whereis_tref = Tref}
+ <- State#state.whereis_waiting],
_ = [begin
- unlink(Pid),
- riak_repl2_fssource:stop_fullsync(Pid),
- riak_repl2_fssource_sup:disable(node(Pid), Part)
- end || {Pid, {Part, _PartN}} <- State#state.running_sources],
+ unlink(Pid),
+ riak_repl2_fssource:stop_fullsync(Pid),
+ riak_repl2_fssource_sup:disable(node(Pid), Part)
+ end || #partition_info{index = Part, running_source = Pid}
+ <- State#state.running_sources],
State2 = State#state{
largest_n = undefined,
owners = [],
partition_queue = queue:new(),
retries = dict:new(),
reserve_retries = dict:new(),
+ purgatory = queue:new(),
+ dropped = [],
whereis_waiting = [],
running_sources = []
},
@@ -380,17 +407,16 @@ handle_cast(_Msg, State) ->
%% @hidden
handle_info({'EXIT', Pid, Cause},
#state{socket=Socket, transport=Transport}=State) when Cause =:= normal; Cause =:= shutdown ->
- lager:debug("Fssource ~p exited normally", [Pid]),
- PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources),
+ lager:debug("fssource ~p exited normally", [Pid]),
+ PartitionEntry = lists:keytake(Pid, #partition_info.running_source, State#state.running_sources),
case PartitionEntry of
false ->
% late exit or otherwise non-existant
{noreply, State};
- {value, {Pid, {Index, _, _}=Partition}, Running} ->
-
+ {value, Partition, Running} ->
+ #partition_info{index = Index, node = Node} = Partition,
% likely a slot on the remote node opened up, so re-enable that
% remote node for whereis requests.
- {_, _, Node} = Partition,
NewBusies = sets:del_element(Node, State#state.busy_nodes),
% ensure we unreserve the partition on the remote node
@@ -406,68 +432,22 @@ handle_info({'EXIT', Pid, Cause},
maybe_complete_fullsync(Running, State2)
end;
-handle_info({'EXIT', Pid, Cause},
- #state{socket=Socket, transport=Transport}=State) ->
- lager:warning("Fssource ~p exited abnormally: ~p", [Pid, Cause]),
- PartitionEntry = lists:keytake(Pid, 1, State#state.running_sources),
- case PartitionEntry of
- false ->
- % late exit
- {noreply, State};
- {value, {Pid, {Index, _, _}=Partition}, Running} ->
-
- % even a bad exit opens a slot on the remote node
- {_, _, Node} = Partition,
- NewBusies = sets:del_element(Node, State#state.busy_nodes),
-
- % ensure we unreserve the partition on the remote node
- % instead of waiting for a timeout.
- Transport:send(Socket, term_to_binary({unreserve, Index})),
+handle_info({soft_exit, Pid, Cause}, State) ->
+ lager:info("fssource ~p soft exit with reason ~p", [Pid, Cause]),
+ handle_abnormal_exit(soft_exit, Pid, Cause, State);
- % stats
- #state{partition_queue = PQueue, retries = Retries0} = State,
-
- RetryLimit = app_helper:get_env(riak_repl, max_fssource_retries,
- ?DEFAULT_SOURCE_RETRIES),
- Retries = dict:update_counter(Partition, 1, Retries0),
-
- case dict:fetch(Partition, Retries) of
- N when N > RetryLimit, is_integer(RetryLimit) ->
- lager:warning("Fullsync dropping partition: ~p, ~p"
- " failed retries",
- [Partition, RetryLimit]),
- ErrorExits = State#state.error_exits + 1,
- State2 = State#state{busy_nodes = NewBusies,
- retries = Retries,
- running_sources = Running,
- error_exits = ErrorExits},
- maybe_complete_fullsync(Running, State2);
- _ -> %% have not run out of retries yet
- % reset for retry later
- lager:debug("Fssource rescheduling partition: ~p",
- [Partition]),
- PQueue2 = queue:in(Partition, PQueue),
- RetryExits = State#state.retry_exits + 1,
- State2 = State#state{partition_queue = PQueue2,
- retries = Retries,
- busy_nodes = NewBusies,
- running_sources = Running,
- retry_exits = RetryExits},
- State3 = start_up_reqs(State2),
- {noreply, State3}
- end
- end;
+handle_info({'EXIT', Pid, Cause}, State) ->
+ lager:info("fssource ~p exited abnormally: ~p", [Pid, Cause]),
+ handle_abnormal_exit('EXIT', Pid, Cause, State);
handle_info({Partition, whereis_timeout}, State) ->
#state{whereis_waiting = Waiting} = State,
- case proplists:get_value(Partition, Waiting) of
- undefined ->
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
% late timeout.
{noreply, State};
- {N, NodeData, _Tref} ->
- Waiting2 = proplists:delete(Partition, Waiting),
- Partition1 = {Partition, N, NodeData},
- Q = queue:in(Partition1, State#state.partition_queue),
+ {value, PartitionInfo, Waiting2} ->
+ Q = queue:in(PartitionInfo#partition_info{whereis_tref = undefined}, State#state.partition_queue),
State2 = State#state{whereis_waiting = Waiting2, partition_queue = Q},
State3 = start_up_reqs(State2),
{noreply, State3}
@@ -523,6 +503,84 @@ handle_info({'DOWN', Mon, process, Pid, Why}, #state{stat_cache = #stat_cache{wo
handle_info(_Info, State) ->
{noreply, State}.
+handle_abnormal_exit(ExitType, Pid, Cause, State) ->
+ PartitionEntry = lists:keytake(Pid, #partition_info.running_source, State#state.running_sources),
+ handle_abnormal_exit(ExitType, Pid, Cause, PartitionEntry, State).
+
+handle_abnormal_exit(_ExtiType, _Pid, _Cause, false, State) ->
+ % late exit
+ {noreply, State};
+
+handle_abnormal_exit(ExitType, Pid, _Cause, {value, PartitionWithSource, Running}, State) ->
+
+ Partition = PartitionWithSource#partition_info{running_source = undefined},
+
+ #partition_info{index = Index, node = Node} = Partition,
+ #state{socket = Socket, transport = Transport} = State,
+ % even a bad exit opens a slot on the remote node
+ NewBusies = sets:del_element(Node, State#state.busy_nodes),
+
+ % ensure we unreserve the partition on the remote node
+ % instead of waiting for a timeout.
+ Transport:send(Socket, term_to_binary({unreserve, Index})),
+
+ % stats
+ #state{partition_queue = PQueue} = State,
+
+ State2 = State#state{busy_nodes = NewBusies, running_sources = Running},
+ {ErrorCount, State3} = increment_error_dict(Partition, ExitType, State2),
+
+ case ExitType of
+ soft_exit ->
+ lager:debug("putting partition ~p in purgatory due to soft exit of ~p", [Index, Pid]),
+ _ = flush_exit_message(Pid),
+ State4 = start_up_reqs(State3),
+ SoftRetryLimit = app_helper:get_env(riak_repl, max_fssource_soft_retries, ?DEFAULT_SOURCE_SOFT_RETRIES),
+ SoftRetryCount = State4#state.soft_retry_exits + 1,
+ if
+ SoftRetryLimit =:= infinity ->
+ Purgatory = queue:in(Partition, State4#state.purgatory),
+ {noreply, State4#state{purgatory = Purgatory, soft_retry_exits = SoftRetryCount}};
+
+ SoftRetryLimit < ErrorCount ->
+ lager:info("Discarding partition ~p since it has reached the soft exit retry limit of ~p", [Partition#partition_info.index, SoftRetryLimit]),
+ ErrorExits1 = State4#state.error_exits + 1,
+ Dropped = [Partition#partition_info.index | State4#state.dropped],
+ Purgatory = queue:filter(fun(P) -> P =/= Partition end,
+ State4#state.purgatory),
+ {noreply, State4#state{error_exits = ErrorExits1,
+ purgatory = Purgatory,
+ dropped = Dropped}};
+ true ->
+ Purgatory = queue:in(Partition, State4#state.purgatory),
+ {noreply, State4#state{purgatory = Purgatory, soft_retry_exits = SoftRetryCount}}
+ end;
+
+ 'EXIT' ->
+ lager:debug("Incrementing retries for partition ~p due to error exit of ~p", [Index, Pid]),
+ RetryLimit = app_helper:get_env(riak_repl, max_fssource_retries,
+ ?DEFAULT_SOURCE_RETRIES),
+
+ if
+ ErrorCount > RetryLimit ->
+ lager:warning("fssource dropping partition: ~p, ~p failed"
+ "retries", [Partition, RetryLimit]),
+ ErrorExits = State#state.error_exits + 1,
+ State4 = State3#state{ error_exits = ErrorExits},
+ Dropped = [Partition#partition_info.index | State4#state.dropped],
+ maybe_complete_fullsync(Running, State4#state{dropped = Dropped});
+ true -> %% have not run out of retries yet
+ % reset for retry later
+ lager:info("fssource rescheduling partition: ~p",
+ [Partition]),
+ PQueue2 = queue:in(Partition, PQueue),
+ RetryExits = State3#state.retry_exits + 1,
+ State4 = State3#state{partition_queue = PQueue2,
+ retry_exits = RetryExits},
+ State5 = start_up_reqs(State4),
+ {noreply, State5}
+ end
+ end.
%% @hidden
terminate(_Reason, _State) ->
@@ -541,109 +599,100 @@ code_change(_OldVsn, State, _Extra) ->
% we stash on our side what nodes gave a busy reply so we don't send too many
% pointless whereis requests.
handle_socket_msg({location, Partition, {Node, Ip, Port}}, #state{whereis_waiting = Waiting} = State) ->
- case proplists:get_value(Partition, Waiting) of
- undefined ->
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
State;
- {N, _OldNode, Tref} ->
+ {value, PartitionInfo, Waiting2} ->
+ Tref = PartitionInfo#partition_info.whereis_tref,
_ = erlang:cancel_timer(Tref),
- Waiting2 = proplists:delete(Partition, Waiting),
% we don't know for sure it's no longer busy until we get a busy reply
NewBusies = sets:del_element(Node, State#state.busy_nodes),
State2 = State#state{whereis_waiting = Waiting2, busy_nodes = NewBusies},
- Partition2 = {Partition, N, Node},
+ Partition2 = PartitionInfo#partition_info{node = Node, whereis_tref = undefined},
State3 = start_fssource(Partition2, Ip, Port, State2),
start_up_reqs(State3)
end;
handle_socket_msg({location_busy, Partition}, #state{whereis_waiting = Waiting} = State) ->
- lager:debug("Location_busy, partition = ~p", [Partition]),
- case proplists:get_value(Partition, Waiting) of
- undefined ->
+ lager:debug("anya location_busy, partition = ~p", [Partition]),
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
State;
- {N, OldNode, Tref} ->
- lager:info("Partition ~p is too busy on cluster ~p at node ~p",
- [Partition, State#state.other_cluster, OldNode]),
+ {value, PartitionInfo, Waiting2} ->
+ lager:info("anya Partition ~p is too busy on cluster ~p at node ~p",
+ [Partition, State#state.other_cluster, PartitionInfo#partition_info.node]),
+ Tref = PartitionInfo#partition_info.whereis_tref,
_ = erlang:cancel_timer(Tref),
- Waiting2 = proplists:delete(Partition, Waiting),
State2 = State#state{whereis_waiting = Waiting2},
- Partition2 = {Partition, N, OldNode},
+ Partition2 = PartitionInfo#partition_info{whereis_tref = undefined},
PQueue = State2#state.partition_queue,
PQueue2 = queue:in(Partition2, PQueue),
- NewBusies = sets:add_element(OldNode, State#state.busy_nodes),
+ NewBusies = sets:add_element(Partition2#partition_info.node, State#state.busy_nodes),
State3 = State2#state{partition_queue = PQueue2, busy_nodes = NewBusies},
start_up_reqs(State3)
end;
handle_socket_msg({location_busy, Partition, Node}, #state{whereis_waiting = Waiting} = State) ->
- case proplists:get_value(Partition, Waiting) of
- undefined ->
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
State;
- {N, _OldNode, Tref} ->
+ {value, PartitionInfo, Waiting2} ->
lager:info("Partition ~p is too busy on cluster ~p at node ~p", [Partition, State#state.other_cluster, Node]),
+ Tref = PartitionInfo#partition_info.whereis_tref,
_ = erlang:cancel_timer(Tref),
- Waiting2 = proplists:delete(Partition, Waiting),
State2 = State#state{whereis_waiting = Waiting2},
- Partition2 = {Partition, N, Node},
+ Partition2 = PartitionInfo#partition_info{node = Node},
PQueue = State2#state.partition_queue,
PQueue2 = queue:in(Partition2, PQueue),
NewBusies = sets:add_element(Node, State#state.busy_nodes),
State3 = State2#state{partition_queue = PQueue2, busy_nodes = NewBusies},
start_up_reqs(State3)
end;
-handle_socket_msg({location_down, Partition},
- #state{whereis_waiting=Waiting0} = State) ->
- case proplists:get_value(Partition, Waiting0) of
- undefined ->
+
+handle_socket_msg({location_down, Partition}, #state{whereis_waiting=Waiting} = State) ->
+ lager:warning("anya location_down, partition = ~p", [Partition]),
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
State;
- {N, OldNode, Tref} ->
- handle_location_down({Partition, N, OldNode, Tref}, State)
+ {value, PartitionInfo, Waiting2} ->
+ lager:info("Partition ~p is unavailable on cluster ~p",
+ [Partition, State#state.other_cluster]),
+ Tref = PartitionInfo#partition_info.whereis_tref,
+ _ = erlang:cancel_timer(Tref),
+ Dropped = [Partition | State#state.dropped],
+ #state{retry_exits = RetryExits, error_exits = ErrorExits} = State,
+ State2 = State#state{whereis_waiting = Waiting2, dropped = Dropped,
+ retry_exits = RetryExits + 1,
+ error_exits = ErrorExits + 1},
+ start_up_reqs(State2)
end;
-handle_socket_msg({location_down, Partition, Node},
- #state{whereis_waiting=Waiting0} = State) ->
- case proplists:get_value(Partition, Waiting0) of
- undefined ->
+handle_socket_msg({location_down, Partition, _Node}, #state{whereis_waiting=Waiting} = State) ->
+ case lists:keytake(Partition, #partition_info.index, Waiting) of
+ false ->
State;
- {N, _OldNode, Tref} ->
- handle_location_down({Partition, N, Node, Tref}, State)
- end.
-
-handle_location_down({Partition, N, Node, Tref},
- #state{reserve_retries=Retries0,
- partition_queue=PQueue0,
- whereis_waiting=Waiting0} = State) ->
- lager:info("Partition ~p is unavailable on cluster ~p",
- [Partition, State#state.other_cluster]),
-
- RetryLimit = app_helper:get_env(riak_repl,
- max_reserve_retries,
- ?DEFAULT_RESERVE_RETRIES),
-
- Retries = dict:update_counter(Partition, 1, Retries0),
-
- _ = erlang:cancel_timer(Tref),
-
- case dict:fetch(Partition, Retries) of
- X when X > RetryLimit, is_integer(RetryLimit) ->
- lager:warning("Fullsync dropping partition: ~p, ~p location_down failed retries",
- [Partition, RetryLimit]),
- Waiting = proplists:delete(Partition, Waiting0),
- ErrorExits = State#state.error_exits + 1,
- State2 = State#state{whereis_waiting = Waiting,
- error_exits = ErrorExits,
- reserve_retries = Retries},
- start_up_reqs(State2);
- _ ->
- lager:warning("Fssource rescheduling partition after location_down: ~p ~p < ~p",
- [Partition, N, RetryLimit]),
- Waiting = proplists:delete(Partition, Waiting0),
- Partition2 = {Partition, N, Node},
- PQueue = queue:in(Partition2, PQueue0),
- RetryExits = State#state.retry_exits + 1,
- State2 = State#state{whereis_waiting=Waiting,
- partition_queue = PQueue,
- reserve_retries = Retries,
- retry_exits = RetryExits},
- start_up_reqs(State2)
+ {value, PartitionInfo, Waiting2} ->
+ Tref = PartitionInfo#partition_info.whereis_tref,
+ _ = erlang:cancel_timer(Tref),
+ RetryLimit = app_helper:get_env(riak_repl, max_reserve_retries, ?DEFAULT_RESERVE_RETRIES),
+ lager:info("Partition ~p is unavailable on cluster ~p", [Partition, State#state.other_cluster]),
+ State2 = State#state{whereis_waiting = Waiting2},
+ {RetriedCount, State3} = increment_error_dict(PartitionInfo, #state.reserve_retries, State2),
+ State4 = case RetriedCount of
+ N when N > RetryLimit, is_integer(N) ->
+ lager:warning("Fullsync dropping partition ~p, ~p location_down failed retries", [PartitionInfo#partition_info.index, RetryLimit]),
+ Dropped = [Partition | State#state.dropped],
+ #state{retry_exits = RetryExits,
+ error_exits = ErrorExits} = State,
+ State3#state{dropped = Dropped,
+ error_exits = ErrorExits + 1,
+ retry_exits = RetryExits + 1};
+ _ ->
+ PQueue = queue:in(PartitionInfo, State3#state.partition_queue),
+ #state{retry_exits = RetryExits} = State,
+ State3#state{partition_queue = PQueue,
+ retry_exits = RetryExits + 1}
+ end,
+ start_up_reqs(State4)
end.
% try our best to reach maximum capacity by sending as many whereis requests
@@ -654,7 +703,17 @@ start_up_reqs(State) ->
Running = length(State#state.running_sources),
Waiting = length(State#state.whereis_waiting),
StartupCount = Max - Running - Waiting,
- start_up_reqs(State, StartupCount).
+ State2 = maybe_pop_from_purgatory(State),
+ start_up_reqs(State2, StartupCount).
+
+maybe_pop_from_purgatory(State) ->
+ case queue:out(State#state.purgatory) of
+ {empty, _} ->
+ State;
+ {{value, Partition}, NewPurgatory} ->
+ PartitionQ2 = queue:in(Partition, State#state.partition_queue),
+ State#state{purgatory = NewPurgatory, partition_queue = PartitionQ2}
+ end.
start_up_reqs(State, N) when N < 1 ->
State;
@@ -693,14 +752,16 @@ send_next_whereis_req(State) ->
% one of those to finish and try again
{defer, State#state{partition_queue = Queue}};
- {Pval, N, RemoteNode} = P ->
+ P when is_record(P, partition_info) ->
+ #partition_info{index = Pval} = P,
#state{transport = Transport, socket = Socket, whereis_waiting = Waiting} = State,
Tref = erlang:send_after(?WAITING_TIMEOUT, self(), {Pval, whereis_timeout}),
- Waiting2 = [{Pval, {N, RemoteNode, Tref}} | Waiting],
+ PartitionInfo2 = P#partition_info{whereis_tref = Tref},
+ Waiting2 = [PartitionInfo2 | Waiting],
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
lager:debug("Sending whereis request for partition ~p", [P]),
Transport:send(Socket,
- term_to_binary({whereis, element(1, P), PeerIP, PeerPort})),
+ term_to_binary({whereis, Pval, PeerIP, PeerPort})),
{ok, State#state{partition_queue = Queue, whereis_waiting =
Waiting2}}
end
@@ -740,13 +801,14 @@ below_max_sources(State) ->
Max = app_helper:get_env(riak_repl, max_fssource_cluster, ?DEFAULT_SOURCE_PER_CLUSTER),
( length(State#state.running_sources) + length(State#state.whereis_waiting) ) < Max.
-node_available({Partition, _, _}, Owners, Waiting) ->
+node_available(PartitionInfo, Owners, Waiting) ->
+ Partition = PartitionInfo#partition_info.index,
LocalNode = proplists:get_value(Partition, Owners),
Max = app_helper:get_env(riak_repl, max_fssource_node, ?DEFAULT_SOURCE_PER_NODE),
try riak_repl2_fssource_sup:enabled(LocalNode) of
RunningList ->
PartsSameNode = [Part || {Part, PNode} <- Owners, PNode =:= LocalNode],
- PartsWaiting = [Part || {Part, _} <- Waiting, lists:member(Part, PartsSameNode)],
+ PartsWaiting = [Part || #partition_info{index = Part} <- Waiting, lists:member(Part, PartsSameNode)],
if
( length(PartsWaiting) + length(RunningList) ) < Max ->
case proplists:get_value(Partition, RunningList) of
@@ -765,20 +827,20 @@ node_available({Partition, _, _}, Owners, Waiting) ->
skip
end.
-remote_node_available({_Partition, _, undefined}, _Busies) ->
+remote_node_available(Partition, _Busies) when Partition#partition_info.node =:= undefined ->
true;
-remote_node_available({_Partition, _, RemoteNode}, Busies) ->
- not sets:is_element(RemoteNode, Busies).
+remote_node_available(Partition, Busies) ->
+ not sets:is_element(Partition#partition_info.node, Busies).
-start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) ->
+start_fssource(PartitionVal, Ip, Port, State) ->
+ Partition = PartitionVal#partition_info.index,
#state{owners = Owners} = State,
LocalNode = proplists:get_value(Partition, Owners),
- lager:info("Starting fssource for ~p on ~p to ~p", [Partition, LocalNode,
- Ip]),
case riak_repl2_fssource_sup:enable(LocalNode, Partition, {Ip, Port}) of
{ok, Pid} ->
link(Pid),
- Running = orddict:store(Pid, PartitionVal, State#state.running_sources),
+ _ = riak_repl2_fssource:soft_link(Pid),
+ Running = lists:keystore(Pid, #partition_info.running_source, State#state.running_sources, PartitionVal#partition_info{running_source = Pid}),
State#state{running_sources = Running};
{error, Reason} ->
case Reason of
@@ -796,7 +858,7 @@ start_fssource(Partition2={Partition,_,_} = PartitionVal, Ip, Port, State) ->
end,
#state{transport = Transport, socket = Socket} = State,
Transport:send(Socket, term_to_binary({unreserve, Partition})),
- PQueue = queue:in(Partition2, State#state.partition_queue),
+ PQueue = queue:in(PartitionVal, State#state.partition_queue),
State#state{partition_queue=PQueue}
end.
@@ -822,7 +884,7 @@ sort_partitions(Ring) ->
sort_partitions(OffsetPartitions, BigN, []).
sort_partitions([], _, Acc) ->
- [{P,N,undefined} || {P,N} <- lists:reverse(Acc)];
+ [#partition_info{index = P} || {P,_N} <- lists:reverse(Acc)];
sort_partitions(In, N, Acc) ->
Split = min(length(In), N) - 1,
{A, [P|B]} = lists:split(Split, In),
@@ -880,7 +942,8 @@ gather_source_stats(PDict) ->
gather_source_stats([], Acc) ->
lists:reverse(Acc);
-gather_source_stats([{Pid, _} | Tail], Acc) ->
+gather_source_stats([PartitionInfo | Tail], Acc) ->
+ Pid = PartitionInfo#partition_info.running_source,
try riak_repl2_fssource:legacy_status(Pid, infinity) of
Stats ->
gather_source_stats(Tail, [{riak_repl_util:safe_pid_to_list(Pid), Stats} | Acc])
@@ -892,10 +955,11 @@ gather_source_stats([{Pid, _} | Tail], Acc) ->
is_fullsync_in_progress(State) ->
QEmpty = queue:is_empty(State#state.partition_queue),
+ PurgatoryEmpty = queue:is_empty(State#state.purgatory),
Waiting = State#state.whereis_waiting,
Running = State#state.running_sources,
- case {QEmpty, Waiting, Running} of
- {true, [], []} ->
+ case {QEmpty, PurgatoryEmpty, Waiting, Running} of
+ {true, true, [], []} ->
false;
_ ->
true
@@ -904,9 +968,10 @@ is_fullsync_in_progress(State) ->
maybe_complete_fullsync(Running, State) ->
EmptyRunning = Running == [],
QEmpty = queue:is_empty(State#state.partition_queue),
+ PurgatoryEmpty = queue:is_empty(State#state.purgatory),
Waiting = State#state.whereis_waiting,
- case {EmptyRunning, QEmpty, Waiting} of
- {true, true, []} ->
+ case {EmptyRunning, QEmpty, PurgatoryEmpty, Waiting} of
+ {true, true, true, []} ->
MyClusterName = riak_core_connection:symbolic_clustername(),
lager:info("Fullsync complete from ~s to ~s",
[MyClusterName, State#state.other_cluster]),
@@ -922,7 +987,8 @@ maybe_complete_fullsync(Running, State) ->
{noreply, State2#state{running_sources = Running,
fullsyncs_completed = TotalFullsyncs,
fullsync_start_time = undefined,
- last_fullsync_duration=ElapsedSeconds
+ last_fullsync_duration=ElapsedSeconds,
+ last_fullsync_completed = Finish
}};
_ ->
% there's something waiting for a response.
@@ -971,3 +1037,39 @@ notify_rt_dirty_nodes(State = #state{dirty_nodes = DirtyNodes,
nodeset_to_string_list(Set) ->
string:join([erlang:atom_to_list(V) || V <- ordsets:to_list(Set)],",").
+
+increment_error_dict(PartitionInfo, ExitType, State) when is_record(PartitionInfo, partition_info) ->
+ increment_error_dict(PartitionInfo#partition_info.index, ExitType, State);
+
+increment_error_dict(PartitionIndex, soft_exit, State) ->
+ increment_error_dict(PartitionIndex, #state.soft_retries, State);
+
+increment_error_dict(PartitionIndex, 'EXIT', State) ->
+ increment_error_dict(PartitionIndex, #state.retries, State);
+
+increment_error_dict(PartitionIndex, ElementN, State) when is_integer(ElementN) ->
+ Dict = element(ElementN, State),
+ Dict2 = dict:update_counter(PartitionIndex, 1, Dict),
+ State2 = setelement(ElementN, State, Dict2),
+ {dict:fetch(PartitionIndex, Dict2), State2}.
+
+% If we are linked to a remote pid, it is possible for the disterl to
+% reconnect at a bad time and lose the exit message, thus we cannot
+% rely solely on the exit message to flush it. What we can do is monitor
+% the process. 'DOWN' messages always come in after the exit message, so
+% if we get the 'DOWN' message first, we know the exit message is never
+% going to arrive, and is effectively flushed. If we get the exit first,
+% we can flush the down message next, since that will arrive as a noproc
+% in any case.
+flush_exit_message(Pid) ->
+ Mon = erlang:monitor(process, Pid),
+ receive
+ {'EXIT', Pid, _} ->
+ receive
+ {'DOWN', Mon, process, Pid, _} ->
+ ok
+ end;
+ {'DOWN', Mon, process, Pid, _} ->
+ ok
+ end.
+
diff --git a/src/riak_repl2_fscoordinator_serv.erl b/src/riak_repl2_fscoordinator_serv.erl
index 71216d75..a11bd32b 100644
--- a/src/riak_repl2_fscoordinator_serv.erl
+++ b/src/riak_repl2_fscoordinator_serv.erl
@@ -48,18 +48,8 @@ start_link(Socket, Transport, Proto, Props) ->
%% @doc Get the stats for every serv.
%% @see status/1
status() ->
- LeaderNode = riak_repl2_leader:leader_node(),
- case LeaderNode of
- undefined ->
- {[], []};
- _ ->
- case riak_repl2_fscoordinator_serv_sup:started() of
- [] ->
- [];
- Repls ->
- [status(Pid) || {_Remote, Pid} <- Repls]
- end
- end.
+ Repls = riak_repl2_fscoordinator_serv_sup:started(),
+ [status(Pid) || {_Remove, Pid} <- Repls].
%% @doc Get the status for the given serv.
-spec status(Pid :: pid()) -> [tuple()].
diff --git a/src/riak_repl2_fssource.erl b/src/riak_repl2_fssource.erl
index 013cf1ee..38492826 100644
--- a/src/riak_repl2_fssource.erl
+++ b/src/riak_repl2_fssource.erl
@@ -3,8 +3,9 @@
-behaviour(gen_server).
%% API
--export([start_link/2, connected/6, connect_failed/3, start_fullsync/1,
- stop_fullsync/1, cluster_name/1, legacy_status/2, fullsync_complete/1]).
+-export([start_link/2, start_link/3, connected/6, connect_failed/3,
+ start_fullsync/1, stop_fullsync/1, fullsync_complete/1,
+ cluster_name/1, legacy_status/2, soft_link/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -19,11 +20,15 @@
connection_ref,
fullsync_worker,
work_dir,
- strategy
+ strategy,
+ owner
}).
start_link(Partition, IP) ->
- gen_server:start_link(?MODULE, [Partition, IP], []).
+ start_link(Partition, IP, undefined).
+
+start_link(Partition, IP, Owner) ->
+ gen_server:start_link(?MODULE, [Partition, IP, Owner], []).
%% connection manager callbacks
connected(Socket, Transport, Endpoint, Proto, Pid, Props) ->
@@ -51,9 +56,33 @@ cluster_name(Pid) ->
legacy_status(Pid, Timeout) ->
gen_server:call(Pid, legacy_status, Timeout).
+%% @doc Create a 'soft link' between the calling process and the fssource.
+%% A soft-link allows for a soft_exit message to be sent before a normal
+%% exit to any process that has created a soft link. Only one link is
+%% held at a time, and alink is in only one direction (the fssource
+%% reports to calling process).
+soft_link(Pid) ->
+ % not using default long timeout because this is primarily used by the
+ % fscoordinator, and we don't want to potentially block that for up to
+ % 2 minutes. 15 seconds is bad enough in a worst case scenario.
+ try gen_server:call(Pid, {soft_link, self()}, timer:seconds(15)) of
+ ok -> % older versions returned 'ok' for the catchall
+ false;
+ true ->
+ true
+ catch
+ _What:Reason ->
+ lager:debug("Could not create soft link to ~p from ~p due to ~p", [Pid, self(), Reason]),
+ {error, Reason}
+ end.
+
%% gen server
init([Partition, IP]) ->
+ init([Partition, IP, undefined]);
+
+init([Partition, IP, Owner]) ->
+
RequestedStrategy = app_helper:get_env(riak_repl,
fullsync_strategy,
?DEFAULT_FULLSYNC_STRATEGY),
@@ -72,8 +101,8 @@ init([Partition, IP]) ->
case connect(IP, SupportedStrategy, Partition) of
{error, Reason} ->
{stop, Reason};
- Result ->
- Result
+ {ok, State}->
+ {ok, State#state{owner = Owner}}
end;
{error, Reason} ->
%% the vnode is probably busy. Try again later.
@@ -118,7 +147,7 @@ handle_call({connected, Socket, Transport, _Endpoint, Proto, Props},
{ok, FullsyncWorker} = riak_repl_aae_source:start_link(Cluster,
Client, Transport,
Socket, Partition,
- self(), ClientVer),
+ self(), Proto),
%% Give control of socket to AAE worker. It will consume all TCP messages.
ok = Transport:controlling_process(Socket, FullsyncWorker),
riak_repl_aae_source:start_exchange(FullsyncWorker),
@@ -149,9 +178,22 @@ handle_call(stop_fullsync, _From, State=#state{fullsync_worker=FSW,
handle_call(legacy_status, _From, State=#state{fullsync_worker=FSW,
socket=Socket,
strategy=Strategy}) ->
- Res = case is_pid(FSW) andalso is_process_alive(FSW) of
- true -> gen_fsm:sync_send_all_state_event(FSW, status, infinity);
- false -> []
+ lager:debug("Sending status to ~p", [FSW]),
+ Res = case is_pid(FSW) of
+ true ->
+ % try/catch because there may be a message in the pid's
+ % mailbox that will cause it to exit before it gets to our
+ % status request message.
+ try gen_fsm:sync_send_all_state_event(FSW, status, infinity) of
+ SyncSendRes ->
+ SyncSendRes
+ catch
+ What:Why ->
+ lager:notice("Error getting fullsync worker ~p status: ~p:~p", [FSW, What, Why]),
+ []
+ end;
+ false ->
+ []
end,
SocketStats = riak_core_tcp_mon:format_socket_stats(
riak_core_tcp_mon:socket_status(Socket), []),
@@ -172,6 +214,10 @@ handle_call(cluster_name, _From, State) ->
ClusterName
end,
{reply, Name, State};
+handle_call({soft_link, NewOwner}, _From, State) ->
+ lager:debug("Changing soft_link from ~p to ~p", [State#state.owner, NewOwner]),
+ State2 = State#state{owner = NewOwner},
+ {reply, true, State2};
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
@@ -221,16 +267,20 @@ handle_info({Proto, Socket, Data},
gen_fsm:send_event(State#state.fullsync_worker, Msg),
{noreply, State}
end;
+handle_info({soft_exit, Pid, Reason}, State = #state{fullsync_worker = Pid}) ->
+ lager:debug("Fullsync worker exited normally, but really wanted it to be ~p", [Reason]),
+ maybe_soft_exit(Reason, State);
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir}) ->
- %% check if process alive only if it's defined
- case is_pid(FSW) andalso is_process_alive(FSW) of
+ %% try to exit the fullsync worker; if we're dying because it did,
+ %% don't worry about the error (cause it's already dead).
+ case is_pid(FSW) of
false ->
ok;
true ->
- gen_fsm:sync_send_all_state_event(FSW, stop)
+ catch gen_fsm:sync_send_all_state_event(FSW, stop)
end,
case WorkDir of
undefined -> ok;
@@ -287,7 +337,7 @@ maybe_exchange_caps(_, Caps, Socket, Transport) ->
%% Start a connection to the remote sink node at IP, using the given fullsync strategy,
%% for the given partition. The protocol version will be determined from the strategy.
connect(IP, Strategy, Partition) ->
- lager:info("Connecting to remote ~p for partition ~p", [IP, Partition]),
+ lager:debug("Connecting to remote ~p for partition ~p", [IP, Partition]),
TcpOptions = [{keepalive, true},
{nodelay, true},
{packet, 4},
@@ -306,3 +356,13 @@ connect(IP, Strategy, Partition) ->
lager:warning("Error connecting to remote ~p for partition ~p", [IP, Partition]),
{error, Reason}
end.
+
+maybe_soft_exit(Reason, State) ->
+ case State#state.owner of
+ undefined ->
+ {stop, Reason, State};
+ Owner ->
+ Owner ! {soft_exit, self(), Reason},
+ {stop, normal, State}
+ end.
+
diff --git a/src/riak_repl2_ip.erl b/src/riak_repl2_ip.erl
index f930dee7..cf92cd75 100644
--- a/src/riak_repl2_ip.erl
+++ b/src/riak_repl2_ip.erl
@@ -539,7 +539,7 @@ determine_netmask_test_() ->
natmap_test_() ->
error_logger:tty(false),
- [
+ [{timeout, 30,
{"forward lookups work",
fun() ->
Map = [
@@ -555,8 +555,8 @@ natmap_test_() ->
maybe_apply_nat_map({10, 0, 0, 10}, 9090, Map)),
ok
end
- },
- {"forward lookups with ports work",
+ }},
+ {timeout, 30, {"forward lookups with ports work",
fun() ->
Map = [
{{{65, 172, 243, 10}, 10080}, {10, 0, 0, 10}},
@@ -571,8 +571,8 @@ natmap_test_() ->
maybe_apply_nat_map({10, 0, 0, 10}, 9090, Map)),
ok
end
- },
- {"reverse lookups work",
+ }},
+ {timeout, 30, {"reverse lookups work",
fun() ->
Map = [
{{65, 172, 243, 10}, {10, 0, 0, 10}},
@@ -587,8 +587,8 @@ natmap_test_() ->
apply_reverse_nat_map({10, 0, 0, 20}, 9090, Map)),
ok
end
- },
- {"reverse lookups with ports work",
+ }},
+ {timeout, 30, {"reverse lookups with ports work",
fun() ->
Map = [
{{{65, 172, 243, 10}, 10080}, {10, 0, 0, 10}},
@@ -603,8 +603,8 @@ natmap_test_() ->
apply_reverse_nat_map({10, 0, 0, 20}, 9080, Map)),
ok
end
- },
- {"forward lookups with hostnames work",
+ }},
+ {timeout, 30, {"forward lookups with hostnames work",
fun() ->
Map = [
{{65, 172, 243, 10}, "localhost"},
@@ -615,8 +615,8 @@ natmap_test_() ->
maybe_apply_nat_map({65, 172, 243, 10}, 9080, Map)),
ok
end
- },
- {"reverse lookups with hostnames work",
+ }},
+ {timeout, 30, {"reverse lookups with hostnames work",
fun() ->
{ok, {hostent, "basho.com", _, inet, 4, Addresses}} = inet_res:gethostbyname("basho.com"),
Map = [
@@ -628,7 +628,7 @@ natmap_test_() ->
apply_reverse_nat_map(hd(Addresses), 9080, Map)),
ok
end
- }
+ }}
].
-endif.
diff --git a/src/riak_repl2_rt.erl b/src/riak_repl2_rt.erl
index 49bf504f..8935eca1 100644
--- a/src/riak_repl2_rt.erl
+++ b/src/riak_repl2_rt.erl
@@ -79,7 +79,8 @@ ensure_rt(WantEnabled0, WantStarted0) ->
Status = riak_repl2_rtq:status(),
CStatus = proplists:get_value(consumers, Status, []),
Enabled = lists:sort([Remote || {Remote, _Stats} <- CStatus]),
- Started = lists:sort([Remote || {Remote, _Pid} <- riak_repl2_rtsource_conn_sup:enabled()]),
+ Connections = riak_repl2_rtsource_conn_sup:enabled(),
+ Started = lists:sort([Remote || {Remote, _Pid} <- Connections]),
ToEnable = WantEnabled -- Enabled,
ToDisable = Enabled -- WantEnabled,
@@ -96,6 +97,16 @@ ensure_rt(WantEnabled0, WantStarted0) ->
application:set_env(riak_repl, rtenabled, true)
end,
+ %% For each connection to validate, call maybe_rebalance_delayed to handle
+ %% the potential need to rebalance connections.
+ ToValidate = Started -- ToStop,
+ _ = [case lists:keyfind(Remote, 1, Connections) of
+ {_, PID} ->
+ riak_repl2_rtsource_conn:maybe_rebalance_delayed(PID);
+ false ->
+ ok
+ end || Remote <- ToValidate ],
+
case ToEnable ++ ToDisable ++ ToStart ++ ToStop of
[] ->
[];
@@ -110,9 +121,9 @@ ensure_rt(WantEnabled0, WantStarted0) ->
%% Stop running sources, re-register to get rid of pending
%% deliver functions
_ = [begin
- _ = riak_repl2_rtsource_conn_sup:disable(Remote),
- riak_repl2_rtq:register(Remote)
- end || Remote <- ToStop],
+ _ = riak_repl2_rtsource_conn_sup:disable(Remote),
+ riak_repl2_rtq:register(Remote)
+ end || Remote <- ToStop],
%% Unregister disabled sources, freeing up the queue
_ = [riak_repl2_rtq:unregister(Remote) || Remote <- ToDisable],
@@ -124,9 +135,11 @@ ensure_rt(WantEnabled0, WantStarted0) ->
end.
register_remote_locator() ->
- Locator = fun(Name, _Policy) ->
- riak_core_cluster_mgr:get_ipaddrs_of_cluster(Name)
- end,
+ Locator = fun(_, {use_only, Addrs}) ->
+ {ok, Addrs};
+ (Name, _Policy) ->
+ riak_core_cluster_mgr:get_ipaddrs_of_cluster(Name)
+ end,
ok = riak_core_connection_mgr:register_locator(rt_repl, Locator).
%% Register an active realtime sink (supervised under ranch)
@@ -149,12 +162,13 @@ postcommit(RObj) ->
Objects = Objects0 ++ [RObj],
Meta = set_bucket_meta(RObj),
- BinObjs = case orddict:fetch(?BT_META_TYPED_BUCKET, Meta) of
- false ->
- riak_repl_util:to_wire(w1, Objects);
- true ->
- riak_repl_util:to_wire(w2, Objects)
- end,
+ BinObjs =
+ case orddict:fetch(?BT_META_TYPED_BUCKET, Meta) of
+ false ->
+ riak_repl_util:to_wire(w1, Objects);
+ true ->
+ riak_repl_util:to_wire(w2, Objects)
+ end,
%% try the proxy first, avoids race conditions with unregister()
%% during shutdown
case whereis(riak_repl2_rtq_proxy) of
@@ -170,7 +184,7 @@ postcommit(RObj) ->
%% gen_server callbacks
init([]) ->
- {ok, #state{}}.
+ {ok, #state{}}.
handle_call(status, _From, State = #state{sinks = SinkPids}) ->
Timeout = app_helper:get_env(riak_repl, status_timeout, 5000),
diff --git a/src/riak_repl2_rtq.erl b/src/riak_repl2_rtq.erl
index 1085617c..2cb84d26 100644
--- a/src/riak_repl2_rtq.erl
+++ b/src/riak_repl2_rtq.erl
@@ -100,7 +100,7 @@ start_link() ->
-type start_option() :: overload_threshold_option() | overload_recover_option().
-type start_options() :: [start_option()].
%% @doc Start linked, registers to module name, with given options. This makes
-%% testing some options a bit easier as it removes a dependence on `app_helper'.
+%% testing some options a bit easier as it removes a dependance on app_helper.
-spec start_link(Options :: start_options()) -> {'ok', pid()}.
start_link(Options) ->
case ets:info(?overload_ets) of
diff --git a/src/riak_repl2_rtsource_conn.erl b/src/riak_repl2_rtsource_conn.erl
index a6dddb63..100d70a3 100644
--- a/src/riak_repl2_rtsource_conn.erl
+++ b/src/riak_repl2_rtsource_conn.erl
@@ -38,12 +38,14 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
+-export([riak_core_connection_mgr_connect/1]).
-endif.
%% API
-export([start_link/1,
stop/1,
status/1, status/2,
+ address/1, maybe_rebalance_delayed/1,
legacy_status/1, legacy_status/2]).
%% connection manager callbacks
@@ -57,6 +59,23 @@
-define(DEFAULT_HBINTERVAL, 15).
-define(DEFAULT_HBTIMEOUT, 15).
+-define(TCP_OPTIONS, [{keepalive, true},
+ {nodelay, true},
+ {packet, 0},
+ {active, false}]).
+
+%% nodes running 1.3.1 have a bug in the service_mgr module.
+%% this bug prevents it from being able to negotiate a version list longer
+%% than 2. Until we no longer support communicating with that version,
+%% we need to artifically truncate the version list.
+%% TODO: expand version list or remove comment when we no
+%% longer support 1.3.1
+%% prefered version list: [{2,0}, {1,5}, {1,1}, {1,0}]
+
+
+-define(CLIENT_SPEC, {{realtime,[{3,0}, {2,0}, {1,5}]},
+ {?TCP_OPTIONS, ?MODULE, self()}}).
+
-record(state, {remote, % remote name
address, % {IP, Port}
connection_ref, % reference handed out by connection manager
@@ -72,6 +91,7 @@
hb_timeout_tref,% heartbeat timeout timer reference
hb_sent_q, % queue of heartbeats now() that were sent
hb_rtt, % RTT in milliseconds for last completed heartbeat
+ rb_timeout_tref,% Rebalance timeout timer reference
cont = <<>>}). % continuation from previous TCP buffer
%% API - start trying to send realtime repl to remote site
@@ -81,6 +101,16 @@ start_link(Remote) ->
stop(Pid) ->
gen_server:call(Pid, stop, ?LONG_TIMEOUT).
+address(Pid) ->
+ gen_server:call(Pid, address, ?LONG_TIMEOUT).
+
+%% @doc Check if we need to rebalance.
+%% If we do, delay some time, recheck that we still
+%% need to rebalance, and if we still do, then execute
+%% reconnection to the better sink node.
+maybe_rebalance_delayed(Pid) ->
+ gen_server:cast(Pid, rebalance_delayed).
+
status(Pid) ->
status(Pid, infinity).
@@ -117,22 +147,9 @@ connect_failed(_ClientProto, Reason, RtSourcePid) ->
%% Initialize
init([Remote]) ->
- TcpOptions = [{keepalive, true},
- {nodelay, true},
- {packet, 0},
- {active, false}],
- % nodes running 1.3.1 have a bug in the service_mgr module.
- % this bug prevents it from being able to negotiate a version list longer
- % than 2. Until we no longer support communicating with that version,
- % we need to artifically truncate the version list.
- % TODO: expand version list or remove comment when we no
- % longer support 1.3.1
- % prefered version list: [{2,0}, {1,5}, {1,1}, {1,0}]
- ClientSpec = {{realtime,[{3,0}, {2,0}, {1,5}]}, {TcpOptions, ?MODULE, self()}},
-
%% Todo: check for bad remote name
lager:debug("connecting to remote ~p", [Remote]),
- case riak_core_connection_mgr:connect({rt_repl, Remote}, ClientSpec) of
+ case riak_core_connection_mgr:connect({rt_repl, Remote}, ?CLIENT_SPEC) of
{ok, Ref} ->
lager:debug("connection ref ~p", [Ref]),
{ok, #state{remote = Remote, connection_ref = Ref}};
@@ -143,6 +160,8 @@ init([Remote]) ->
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
+handle_call(address, _From, State = #state{ address=A }) ->
+ {reply, {ok, A}, State};
handle_call(status, _From, State =
#state{remote = R, address = _A, transport = T, socket = S,
helper_pid = H,
@@ -210,7 +229,7 @@ handle_call({connected, Socket, Transport, EndPoint, Proto}, _From,
SocketTag = riak_repl_util:generate_socket_tag("rt_source", Transport, Socket),
lager:debug("Keeping stats for " ++ SocketTag),
riak_core_tcp_mon:monitor(Socket, {?TCP_MON_RT_APP, source,
- SocketTag}, Transport),
+ SocketTag}, Transport),
State2 = State#state{transport = Transport,
socket = Socket,
address = EndPoint,
@@ -219,7 +238,7 @@ handle_call({connected, Socket, Transport, EndPoint, Proto}, _From,
helper_pid = HelperPid,
ver = Ver},
lager:info("Established realtime connection to site ~p address ~s",
- [Remote, peername(State2)]),
+ [Remote, peername(State2)]),
case Proto of
{realtime, _OurVer, {1, 0}} ->
@@ -246,14 +265,18 @@ handle_cast({connect_failed, _HelperPid, Reason},
State = #state{remote = Remote}) ->
lager:warning("Realtime replication connection to site ~p failed - ~p\n",
[Remote, Reason]),
- {stop, normal, State}.
+ {stop, normal, State};
+
+handle_cast(rebalance_delayed, State) ->
+ {noreply, maybe_rebalance(State, delayed)}.
+
handle_info({Proto, _S, TcpBin}, State= #state{cont = Cont})
- when Proto == tcp; Proto == ssl ->
+ when Proto == tcp; Proto == ssl ->
recv(<>, State);
handle_info({Closed, _S},
State = #state{remote = Remote, cont = Cont})
- when Closed == tcp_closed; Closed == ssl_closed ->
+ when Closed == tcp_closed; Closed == ssl_closed ->
case size(Cont) of
0 ->
ok;
@@ -268,7 +291,7 @@ handle_info({Closed, _S},
{stop, normal, State};
handle_info({Error, _S, Reason},
State = #state{remote = Remote, cont = Cont})
- when Error == tcp_error; Error == ssl_error ->
+ when Error == tcp_error; Error == ssl_error ->
riak_repl_stats:rt_source_errors(),
lager:warning("Realtime connection ~s to ~p network error ~p - ~b bytes pending\n",
[peername(State), Remote, Reason, size(Cont)]),
@@ -297,6 +320,8 @@ handle_info({heartbeat_timeout, HBSent}, State = #state{hb_sent_q = HBSentQ,
lager:debug("hb_sent_q_len after heartbeat_timeout: ~p", [queue:len(HBSentQ)]),
{stop, normal, State}
end;
+handle_info(rebalance_now, State) ->
+ {noreply, maybe_rebalance(State#state{rb_timeout_tref = undefined}, now)};
handle_info(Msg, State) ->
lager:warning("Unhandled info: ~p", [Msg]),
{noreply, State}.
@@ -308,6 +333,68 @@ terminate(_Reason, #state{helper_pid=_HelperPid, remote=Remote}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+maybe_rebalance(State, now) ->
+ case should_rebalance(State) of
+ no ->
+ State;
+ {yes, UsefulAddrs} ->
+ reconnect(State, UsefulAddrs)
+ end;
+maybe_rebalance(State, delayed) ->
+ case State#state.rb_timeout_tref of
+ undefined ->
+ RbTimeoutTref = erlang:send_after(rebalance_delay_millis(), self(), rebalance_now),
+ State#state{rb_timeout_tref = RbTimeoutTref};
+ _ ->
+ %% Already sent a "rebalance_now"
+ State
+ end.
+
+should_rebalance(#state{address=ConnectedAddr, remote=Remote}) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Addrs = riak_repl_ring:get_clusterIpAddrs(Ring, Remote),
+ {ok, ShuffledAddrs} = riak_core_cluster_mgr:shuffle_remote_ipaddrs(Addrs),
+ lager:debug("ShuffledAddrs: ~p, ConnectedAddr: ~p", [ShuffledAddrs, ConnectedAddr]),
+ case (ShuffledAddrs /= []) andalso same_ipaddr(ConnectedAddr, hd(ShuffledAddrs)) of
+ true ->
+ no; % we're already connected to the ideal buddy
+ false ->
+ %% compute the addrs that are "better" than the currently connected addr
+ BetterAddrs = lists:filter(fun(A) -> not same_ipaddr(ConnectedAddr, A) end,
+ ShuffledAddrs),
+ %% remove those that are blacklisted anyway
+ UsefulAddrs = riak_core_connection_mgr:filter_blacklisted_ipaddrs(BetterAddrs),
+ lager:debug("BetterAddrs: ~p, UsefulAddrs ~p", [BetterAddrs, UsefulAddrs]),
+ case UsefulAddrs of
+ [] ->
+ no;
+ UsefulAddrs ->
+ {yes, UsefulAddrs}
+ end
+ end.
+
+rebalance_delay_millis() ->
+ MaxDelaySecs =
+ app_helper:get_env(riak_repl, realtime_connection_rebalance_max_delay_secs, 5*60),
+ round(MaxDelaySecs * crypto:rand_uniform(0, 1000)).
+
+reconnect(State=#state{remote=Remote}, BetterAddrs) ->
+ lager:info("trying reconnect to one of: ~p", [BetterAddrs]),
+
+ %% if we have a pending connection attempt - drop that
+ riak_core_connection_mgr:disconnect({rt_repl, Remote}),
+
+ lager:debug("re-connecting to remote ~p", [Remote]),
+ case riak_core_connection_mgr:connect({rt_repl, Remote}, ?CLIENT_SPEC, {use_only, BetterAddrs}) of
+ {ok, Ref} ->
+ lager:debug("connecting ref ~p", [Ref]),
+ {noreply, State#state{ connection_ref = Ref}};
+ {error, Reason}->
+ lager:warning("Error connecting to remote ~p (ignoring as we're reconnecting)", [Reason]),
+ {noreply, State}
+ end.
+
+
cancel_timer(undefined) -> ok;
cancel_timer(TRef) -> _ = erlang:cancel_timer(TRef).
@@ -372,8 +459,11 @@ send_heartbeat(State = #state{hb_interval = undefined}) ->
send_heartbeat(State = #state{hb_timeout = HBTimeout,
hb_sent_q = SentQ,
helper_pid = HelperPid}) when is_integer(HBTimeout) ->
- Now = now(), % using now as need a unique reference for this heartbeat
- % to spot late heartbeat timeout messages
+
+ % Using now as need a unique reference for this heartbeat
+ % to spot late heartbeat timeout messages
+ Now = now(),
+
riak_repl2_rtsource_helper:send_heartbeat(HelperPid),
TRef = erlang:send_after(timer:seconds(HBTimeout), self(), {heartbeat_timeout, Now}),
State2 = State#state{hb_interval_tref = undefined, hb_timeout_tref = TRef,
@@ -386,7 +476,7 @@ send_heartbeat(State) ->
%% Schedule the next heartbeat
schedule_heartbeat(State = #state{hb_interval_tref = undefined,
- hb_interval = HBInterval}) when is_integer(HBInterval) ->
+ hb_interval = HBInterval}) when is_integer(HBInterval) ->
TRef = erlang:send_after(timer:seconds(HBInterval), self(), send_heartbeat),
State#state{hb_interval_tref = TRef};
@@ -394,6 +484,14 @@ schedule_heartbeat(State) ->
lager:warning("Heartbeat is misconfigured and is not a valid integer."),
State.
+same_ipaddr({IP,Port}, {IP,Port}) ->
+ true;
+same_ipaddr({_IP1,_Port1}, {_IP2,_Port2}) ->
+ false;
+same_ipaddr(X,Y) ->
+ lager:warning("ipaddrs have unexpected format! ~p, ~p", [X,Y]),
+ false.
+
%% ===================================================================
%% EUnit tests
%% ===================================================================
@@ -454,30 +552,29 @@ cache_peername_test_case() ->
%% Set up the test
setup_connection_for_peername() ->
riak_repl_test_util:reset_meck(riak_core_connection_mgr, [no_link, passthrough]),
- meck:expect(riak_core_connection_mgr, connect, fun(_ServiceAndRemote, ClientSpec) ->
- proc_lib:spawn_link(fun() ->
- Version = stateful:version(),
- {_Proto, {TcpOpts, Module, Pid}} = ClientSpec,
- {ok, Socket} = gen_tcp:connect("127.0.0.1", ?SINK_PORT, [binary | TcpOpts]),
-
- ok = Module:connected(Socket, gen_tcp, {"127.0.0.1", ?SINK_PORT}, Version, Pid, []),
-
- % simulate local socket problem
- inet:close(Socket),
+ meck:expect(riak_core_connection_mgr, connect,
+ fun(_ServiceAndRemote, ClientSpec) ->
+ proc_lib:spawn_link(?MODULE, riak_core_connection_mgr_connect, [ClientSpec]),
+ {ok, make_ref()}
+ end).
+riak_core_connection_mgr_connect(ClientSpec) ->
+ Version = stateful:version(),
+ {_Proto, {TcpOpts, Module, Pid}} = ClientSpec,
+ {ok, Socket} = gen_tcp:connect("127.0.0.1", ?SINK_PORT, [binary | TcpOpts]),
- % get the State from the source connection.
- {status,Pid,_,[_,_,_,_,[_,_,{data,[{_,State}]}]]} = sys:get_status(Pid),
+ ok = Module:connected(Socket, gen_tcp, {"127.0.0.1", ?SINK_PORT}, Version, Pid, []),
- % getting the peername from the socket should produce error string
- ?assertEqual("error:einval", peername(inet, Socket)),
+ % simulate local socket problem
+ inet:close(Socket),
- % while getting the peername from the State should produce the cached string
- ?assertEqual("127.0.0.1:5007", peername(State))
+ % get the State from the source connection.
+ {status,Pid,_,[_,_,_,_,[_,_,{data,[{_,State}]}]]} = sys:get_status(Pid),
- end),
+ % getting the peername from the socket should produce error string
+ ?assertEqual("error:einval", peername(inet, Socket)),
- {ok, make_ref()}
- end).
+ % while getting the peername from the State should produce the cached string
+ ?assertEqual("127.0.0.1:5007", peername(State)).
%% Connect to the 'fake' sink
connect(RemoteName) ->
@@ -500,7 +597,7 @@ connect(RemoteName) ->
{sink_started, SinkPid} ->
{SourcePid, SinkPid}
after 1000 ->
- {error, timeout}
+ {error, timeout}
end.
-endif.
diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl
index cae1fa7e..9f1e2bf0 100644
--- a/src/riak_repl_aae_source.erl
+++ b/src/riak_repl_aae_source.erl
@@ -25,8 +25,6 @@
-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
terminate/3, code_change/4]).
--export([replicate_diff/3]).
-
-type index() :: non_neg_integer().
-type index_n() :: {index(), pos_integer()}.
@@ -51,6 +49,7 @@
timeout :: pos_integer(),
wire_ver :: atom(),
diff_batch_size = 1000 :: non_neg_integer(),
+ estimated_nr_keys :: non_neg_integer(),
local_lock = false :: boolean(),
owner :: pid(),
proto :: term(),
@@ -64,6 +63,9 @@
%% filter, but simply sent to the remote site directly.
-define(GET_OBJECT_LIMIT, 1000).
+%% Diff percentage needed to use bloom filter
+-define(DIFF_PERCENTAGE, 5).
+
%%%===================================================================
%%% API
%%%===================================================================
@@ -85,9 +87,11 @@ cancel_fullsync(Pid) ->
%%%===================================================================
init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) ->
- lager:info("AAE fullsync source worker started for partition ~p",
+ lager:debug("AAE fullsync source worker started for partition ~p",
[Partition]),
+ Ver = riak_repl_util:deduce_wire_version_from_proto(Proto),
+ {_, ClientVer, _} = Proto,
State = #state{cluster=Cluster,
client=Client,
transport=Transport,
@@ -95,8 +99,8 @@ init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) ->
index=Partition,
built=0,
owner=OwnerPid,
- wire_ver=w1,
- proto=Proto},
+ wire_ver=Ver,
+ proto=ClientVer},
{ok, prepare_exchange, State}.
handle_event(_Event, StateName, State) ->
@@ -120,7 +124,8 @@ handle_info({'DOWN', TreeMref, process, Pid, Why}, _StateName, State=#state{tree
%% Local hashtree process went down. Stop exchange.
lager:info("Monitored pid ~p, AAE Hashtree process went down because: ~p", [Pid, Why]),
send_complete(State),
- {stop, {aae_hashtree_went_down, Why}, State};
+ State#state.owner ! {soft_exit, self(), {aae_hastree_went_down, Why}},
+ {stop, normal, State};
handle_info(Error={'DOWN', _, _, _, _}, _StateName, State) ->
%% Something else exited. Stop exchange.
lager:info("Something went down ~p", [Error]),
@@ -195,7 +200,8 @@ prepare_exchange(start_exchange, State0=#state{transport=Transport,
Error ->
lager:info("AAE source failed get_lock for partition ~p, got ~p",
[Partition, Error]),
- {stop, Error, State}
+ State#state.owner ! {soft_exit, self(), Error},
+ {stop, normal, State}
end;
{error, wrong_node} ->
{stop, wrong_node, State0}
@@ -209,7 +215,8 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) ->
lager:info("Remote lock tree for partition ~p failed, got ~p",
[Partition, Error]),
send_complete(State),
- {stop, {remote, Error}, State}
+ State#state.owner ! {soft_exit, self(), {remote, Error}},
+ {stop, normal, State}
end.
%% @doc Now that locks have been acquired, ask both the local and remote
@@ -217,77 +224,92 @@ prepare_exchange(start_exchange, State=#state{index=Partition}) ->
%% a timely manner, the exchange will timeout. Since the trees will
%% continue to finish the update even after the exchange times out,
%% a future exchange should eventually make progress.
-update_trees(init, State) ->
- NumKeys = 10000000,
- {ok, Bloom} = ebloom:new(NumKeys, 0.01, random:uniform(1000)),
- Limit = app_helper:get_env(riak_repl, fullsync_direct_limit, ?GET_OBJECT_LIMIT),
- Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline),
- Buffer = case Mode of
- inline ->
- undefined;
- buffered ->
- ets:new(?MODULE, [public, set])
- end,
- Exchange = #exchange{mode=Mode,
- buffer=Buffer,
- bloom=Bloom,
- limit=Limit,
- count=0},
- State2 = State#state{exchange=Exchange},
- update_trees(start_exchange, State2);
+update_trees(init, State=#state{indexns=IndexNs}) ->
+ update_trees({start_exchange, IndexNs}, State);
update_trees(cancel_fullsync, State) ->
lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]),
send_complete(State),
{stop, normal, State};
-update_trees(finish_fullsync, State=#state{owner=Owner}) ->
- send_complete(State),
- lager:info("AAE fullsync source completed partition ~p",
- [State#state.index]),
- riak_repl2_fssource:fullsync_complete(Owner),
- %% TODO: Why stay in update_trees? Should we stop instead?
- {next_state, update_trees, State};
-update_trees(continue, State=#state{indexns=IndexNs}) ->
- case IndexNs of
- [_] ->
- send_diffs(init, State);
- [_|RestNs] ->
- State2 = State#state{built=0, indexns=RestNs},
- gen_fsm:send_event(self(), start_exchange),
- {next_state, update_trees, State2}
- end;
-update_trees(start_exchange, State=#state{tree_pid=TreePid,
- index=Partition,
- indexns=[IndexN|_IndexNs]}) ->
- lager:debug("Start exchange for partition,IndexN ~p,~p", [Partition, IndexN]),
- update_request(TreePid, {Partition, undefined}, IndexN),
- case send_synchronous_msg(?MSG_UPDATE_TREE, IndexN, State) of
+update_trees({start_exchange, [IndexHead|IndexTail]}, State=#state{tree_pid=TreePid,
+ index=Partition,
+ owner=Owner}) ->
+ update_request(TreePid, {Partition, undefined}, IndexHead),
+ case send_synchronous_msg(?MSG_UPDATE_TREE, IndexHead, State) of
ok ->
- update_trees({tree_built, Partition, IndexN}, State);
+ gen_fsm:send_event(self(), tree_built),
+ case IndexTail of
+ [] -> ok;
+ _ -> gen_fsm:send_event(self(), {start_exchange, IndexTail})
+ end,
+ {next_state, update_trees, State};
not_responsible ->
- update_trees({not_responsible, Partition, IndexN}, State)
+ lager:debug("Skipping AAE fullsync tree update for vnode ~p because"
+ " it is not responsible for the preflist ~p", [Partition, IndexHead]),
+ gen_server:cast(Owner, not_responsible),
+ {stop, normal, State}
end;
update_trees({not_responsible, Partition, IndexN}, State=#state{owner=Owner}) ->
lager:debug("VNode ~p does not cover preflist ~p", [Partition, IndexN]),
gen_server:cast(Owner, not_responsible),
{stop, normal, State};
-update_trees({tree_built, _, _}, State) ->
+
+update_trees(tree_built, State = #state{indexns=IndexNs}) ->
Built = State#state.built + 1,
+ NeededBuilts = length(IndexNs) * 2, %% All local and remote
case Built of
- 2 ->
+ NeededBuilts ->
+ %% Trees built now we can estimate how many keys
+ {ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid),
+ lager:debug("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]),
+
lager:debug("Moving to key exchange state"),
- gen_fsm:send_event(self(), start_key_exchange),
- {next_state, key_exchange, State};
+ key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys});
_ ->
{next_state, update_trees, State#state{built=Built}}
end.
%% @doc Now that locks have been acquired and both hashtrees have been updated,
%% perform a key exchange and trigger replication for any divergent keys.
+key_exchange(init, State) ->
+ EstimatedNrKeys = State#state.estimated_nr_keys,
+ Limit = app_helper:get_env(riak_repl, fullsync_direct_limit, ?GET_OBJECT_LIMIT),
+ PercentLimit = app_helper:get_env(riak_repl, fullsync_direct_percentage_limit, ?DIFF_PERCENTAGE),
+ UsedLimit = max(Limit, EstimatedNrKeys * PercentLimit div 100),
+ Mode = app_helper:get_env(riak_repl, fullsync_direct_mode, inline),
+ Buffer = case Mode of
+ inline ->
+ undefined;
+ buffered ->
+ ets:new(?MODULE, [public, set])
+ end,
+ Exchange = #exchange{mode=Mode,
+ buffer=Buffer,
+ bloom=undefined,
+ limit=UsedLimit,
+ count=0},
+ State2 = State#state{exchange=Exchange},
+ key_exchange(start_key_exchange, State2);
key_exchange(cancel_fullsync, State) ->
lager:info("AAE fullsync source cancelled for partition ~p", [State#state.index]),
send_complete(State),
{stop, normal, State};
+key_exchange(finish_fullsync, State=#state{owner=Owner}) ->
+ send_complete(State),
+ lager:debug("AAE fullsync source completed partition ~p",
+ [State#state.index]),
+ riak_repl2_fssource:fullsync_complete(Owner),
+ %% TODO: Why stay in key_exchange? Should we stop instead?
+ {next_state, key_exchange, State};
+key_exchange(continue, State=#state{indexns=IndexNs}) ->
+ case IndexNs of
+ [_] ->
+ send_diffs(init, State);
+ [_|RestNs] ->
+ State2 = State#state{built=0, indexns=RestNs},
+ gen_fsm:send_event(self(), start_key_exchange),
+ {next_state, key_exchange, State2}
+ end;
key_exchange(start_key_exchange, State=#state{cluster=Cluster,
transport=Transport,
socket=Socket,
@@ -354,8 +376,8 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster,
spawn_link(fun() ->
StageStart=os:timestamp(),
Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid),
- lager:info("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)",
- [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]),
+ lager:debug("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)",
+ [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]),
gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2})
end),
@@ -390,7 +412,7 @@ compute_differences({'$aae_src', done, Exchange}, State) ->
%% Just move on to the next tree exchange since we accumulate
%% differences across all trees.
State2 = State#state{exchange=Exchange},
- update_trees(continue, State2).
+ key_exchange(continue, State2).
maybe_send_direct(#exchange{mode=inline, count=Count, limit=Limit},
#state{index=Partition}) ->
@@ -404,7 +426,7 @@ maybe_send_direct(#exchange{buffer=Buffer}, State=#state{index=Partition}) ->
true = ets:delete(Buffer),
Sorted = lists:sort(Keys),
Count = length(Sorted),
- lager:info("Directly sending ~p differences for partition ~p", [Count, Partition]),
+ lager:debug("Directly sending ~p differences for partition ~p", [Count, Partition]),
_ = [send_missing(Bucket, Key, State) || {Bucket, Key} <- Sorted],
ok.
@@ -421,7 +443,7 @@ send_diffs(init, State=#state{exchange=Exchange}) ->
%% this will start a worker process, which will tell us it's done with
%% diffs_done once all differences are sent.
_ = maybe_send_direct(Exchange, State),
- _ = finish_sending_differences(Exchange#exchange.bloom, State),
+ _ = finish_sending_differences(Exchange, State),
%% wait for differences from bloom_folder or to be done
{next_state, send_diffs, State};
@@ -429,22 +451,29 @@ send_diffs(init, State=#state{exchange=Exchange}) ->
%% All indexes in this Partition are done.
%% Note: recv'd from an async send event
send_diffs(diff_done, State) ->
- update_trees(finish_fullsync, State).
+ key_exchange(finish_fullsync, State).
%%%===================================================================
%%% Internal functions
%%%===================================================================
-
-finish_sending_differences(Bloom, #state{index=Partition}) ->
- case ebloom:elements(Bloom) == 0 of
- true ->
- lager:info("No differences, skipping bloom fold for partition ~p", [Partition]),
+finish_sending_differences(#exchange{bloom=undefined, count=DiffCnt},
+ #state{index=Partition, estimated_nr_keys=EstimatedNrKeys}) ->
+ lager:info("Syncing without bloom ~p/~p differences for partition ~p with EstimatedNrKeys ~p",
+ [0, DiffCnt, Partition, EstimatedNrKeys]),
+ gen_fsm:send_event(self(), diff_done);
+
+finish_sending_differences(#exchange{bloom=Bloom, count=DiffCnt},
+ #state{index=Partition, estimated_nr_keys=EstimatedNrKeys}) ->
+ case ebloom:elements(Bloom) of
+ Count = 0 ->
+ lager:info("Syncing without bloom ~p/~p differences for partition ~p with EstimatedNrKeys ~p",
+ [Count, DiffCnt, Partition, EstimatedNrKeys]),
gen_fsm:send_event(self(), diff_done);
- false ->
+ Count ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
OwnerNode = riak_core_ring:index_owner(Ring, Partition),
- Count = ebloom:elements(Bloom),
- lager:info("Bloom folding over ~p differences for partition ~p", [Count, Partition]),
+ lager:info("Bloom folding over ~p/~p differences for partition ~p with EstimatedNrKeys ~p",
+ [Count, DiffCnt, Partition, EstimatedNrKeys]),
Self = self(),
Worker = fun() ->
FoldRef = make_ref(),
@@ -477,6 +506,13 @@ finish_sending_differences(Bloom, #state{index=Partition}) ->
spawn_link(Worker) %% this isn't the Pid we need because it's just the vnode:fold
end.
+maybe_create_bloom(#exchange{bloom=undefined}, State) ->
+ EstimatedNrKeys = State#state.estimated_nr_keys,
+ {ok, Bloom} = ebloom:new(max(10000, EstimatedNrKeys), 0.01, random:uniform(1000)),
+ Bloom;
+maybe_create_bloom(#exchange{bloom=Bloom}, _State) ->
+ Bloom.
+
bloom_fold({B, K}, V, {MPid, Bloom}) ->
case ebloom:contains(Bloom, <>) of
true ->
@@ -487,45 +523,6 @@ bloom_fold({B, K}, V, {MPid, Bloom}) ->
end,
{MPid, Bloom}.
-%% @private
-%% Returns accumulator as a list of one element that is the count of
-%% keys that differed. Initial value of Acc is always [].
-replicate_diff(KeyDiff, Acc, State=#state{index=Partition}) ->
- NumObjects =
- case KeyDiff of
- {remote_missing, Bin} ->
- %% send object and related objects to remote
- {Bucket,Key} = binary_to_term(Bin),
- lager:debug("Keydiff: remote partition ~p remote missing: ~p:~p",
- [Partition, Bucket, Key]),
- send_missing(Bucket, Key, State);
- {different, Bin} ->
- %% send object and related objects to remote
- {Bucket,Key} = binary_to_term(Bin),
- lager:debug("Keydiff: remote partition ~p different: ~p:~p",
- [Partition, Bucket, Key]),
- send_missing(Bucket, Key, State);
- {missing, Bin} ->
- %% remote has a key we don't have. Ignore it.
- {Bucket,Key} = binary_to_term(Bin),
- lager:debug("Keydiff: remote partition ~p local missing: ~p:~p (ignored)",
- [Partition, Bucket, Key]),
- 0;
- Other ->
- lager:warning("Unexpected error keydiff: ~p (ignored)", [Other]),
- 0
- end,
-
- case Acc of
- [] ->
- [1];
- [Count] ->
- %% accrue number of differences sent from this segment
- [Count+NumObjects];
- _Other ->
- Acc
- end.
-
accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) ->
case KeyDiff of
{remote_missing, Bin} ->
@@ -553,7 +550,6 @@ accumulate_diff(KeyDiff, Exchange, State=#state{index=Partition}) ->
maybe_accumulate_key(Bucket, Key,
Exchange=#exchange{mode=Mode,
- bloom=Bloom,
count=Count,
limit=Limit},
State) ->
@@ -563,8 +559,9 @@ maybe_accumulate_key(Bucket, Key,
Exchange2#exchange{count=Count+1};
true ->
%% Past threshold, add to bloom filter for future bloom fold
+ Bloom = maybe_create_bloom(Exchange, State),
ebloom:insert(Bloom, <>),
- Exchange#exchange{count=Count+1}
+ Exchange#exchange{bloom=Bloom, count=Count+1}
end.
handle_direct(inline, Bucket, Key, Exchange, State) ->
@@ -655,7 +652,7 @@ update_request(Tree, {Index, _}, IndexN) ->
as_event(fun() ->
case riak_kv_index_hashtree:update(IndexN, Tree) of
ok ->
- {tree_built, Index, IndexN};
+ tree_built;
not_responsible ->
{not_responsible, Index, IndexN}
end
diff --git a/src/riak_repl_app.erl b/src/riak_repl_app.erl
index 9d3bc88c..4b536598 100644
--- a/src/riak_repl_app.erl
+++ b/src/riak_repl_app.erl
@@ -8,6 +8,7 @@
% versions < 1.3
-export([get_matching_address/2,
cluster_mgr_member_fun/1,
+ cluster_mgr_all_member_fun/1,
cluster_mgr_write_cluster_members_to_ring/2,
cluster_mgr_read_cluster_targets_from_ring/0]).
@@ -73,6 +74,11 @@ start(_Type, _StartArgs) ->
riak_core_cluster_mgr:register_member_fun(
fun cluster_mgr_member_fun/1),
+ %% register functions for cluster manager to find it's own
+ %% nodes' ip addrs
+ riak_core_cluster_mgr:register_all_member_fun(
+ fun cluster_mgr_all_member_fun/1),
+
%% cluster manager leader will follow repl leader
riak_repl2_leader:register_notify_fun(
fun riak_core_cluster_mgr:set_leader/2),
@@ -156,7 +162,19 @@ prune_old_workdirs(WorkRoot) ->
end.
%% Get the list of nodes of our ring
+%% This list includes all up-nodes, that host the riak_kv service
cluster_mgr_member_fun({IP, Port}) ->
+ lists_shuffle([ {XIP,XPort} || {_Node,{XIP,XPort}} <- cluster_mgr_members({IP, Port}, riak_core_node_watcher:nodes(riak_kv)),
+ is_integer(XPort) ]).
+
+%% this list includes *all* members of the ring (even those marked down).
+%% returns a list [ { node(), {IP, Port} | unreachable }, ... ]
+cluster_mgr_all_member_fun({IP, Port}) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ cluster_mgr_members({IP, Port}, riak_core_ring:all_members(Ring)).
+
+cluster_mgr_members({IP, Port}, Nodes) ->
+
%% find the subnet for the interface we connected to
{ok, MyIPs} = inet:getifaddrs(),
{ok, NormIP} = riak_repl_util:normalize_ip(IP),
@@ -170,12 +188,11 @@ cluster_mgr_member_fun({IP, Port}) ->
lager:warning("Connected IP not present locally, must be NAT. Returning ~p",
[{IP,Port}]),
%% might as well return the one IP we know will work
- [{IP, Port}];
+ [{node(), {IP, Port}}];
CIDR ->
?TRACE(lager:notice("CIDR is ~p", [CIDR])),
%AddressMask = riak_repl2_ip:mask_address(NormIP, MyMask),
%?TRACE(lager:notice("address mask is ~p", [AddressMask])),
- Nodes = riak_core_node_watcher:nodes(riak_kv),
{Results, BadNodes} = rpc:multicall(Nodes, riak_repl2_ip,
get_matching_address, [RealIP, CIDR]),
% when this code was written, a multicall will list the results
@@ -187,27 +204,28 @@ cluster_mgr_member_fun({IP, Port}) ->
case RealIP == NormIP of
true ->
%% No nat, just return the results
- lists_shuffle(Results2);
+ Results2;
false ->
%% NAT is in effect
- NatRes = lists:foldl(fun({XIP, XPort}, Acc) ->
+ NatRes = lists:foldl(fun({XNode, {XIP, XPort}}, Acc) ->
case riak_repl2_ip:apply_reverse_nat_map(XIP, XPort, Map) of
error ->
%% there's no NAT configured for this IP!
%% location_down is the closest thing we
%% can reply with.
lager:warning("There's no NAT mapping for"
- "~p:~b to an external IP",
- [XIP, XPort]),
+ "~p:~b to an external IP (node: ~p)",
+ [XIP, XPort, XNode]),
Acc;
{ExternalIP, ExternalPort} ->
- [{ExternalIP, ExternalPort}|Acc];
+ [{XNode, {ExternalIP, ExternalPort}} | Acc];
ExternalIP ->
- [{ExternalIP, XPort}|Acc]
+ [{XNode, {ExternalIP, XPort}}|Acc]
end
end, [], Results2),
- lager:debug("~p -> ~p", [Results2, NatRes]),
- lists_shuffle(NatRes)
+ Results3 = NatRes ++ [ {Node, unreachable} || Node <- BadNodes ],
+ lager:debug("nat: ~p -> ~p", [Results2, Results3]),
+ Results3
end
end.
@@ -218,9 +236,9 @@ maybe_retry_ip_rpc(Results, Nodes, BadNodes, Args) ->
({{badrpc, {'EXIT', {undef, _StrackTrace}}}, Node}) ->
RPCResult = riak_core_util:safe_rpc(Node, riak_repl_app, get_matching_address, Args),
lager:debug("rpc to get_matching_address: ~p", [RPCResult]),
- RPCResult;
- ({Result, _Node}) ->
- Result
+ {Node, RPCResult};
+ ({Result, Node}) ->
+ {Node, Result}
end,
lists:map(MaybeRetry, Zipped).
diff --git a/src/riak_repl_fullsync_helper.erl b/src/riak_repl_fullsync_helper.erl
index d23386e6..ecbfb32e 100644
--- a/src/riak_repl_fullsync_helper.erl
+++ b/src/riak_repl_fullsync_helper.erl
@@ -393,7 +393,7 @@ missing_key(PBKey, DiffState) ->
%% the same, then a badfun error will occur since the MD5s of the
%% modules are not the same.
%%
-%% See [http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html]
+%% See http://www.javalimit.com/2010/05/passing-funs-to-other-erlang-nodes.html
keylist_fold({B,Key}=K, V, {MPid, Count, Total}) ->
try
H = hash_object(B,Key,V),
diff --git a/src/riak_repl_stats.erl b/src/riak_repl_stats.erl
index f654ce49..8924c99b 100644
--- a/src/riak_repl_stats.erl
+++ b/src/riak_repl_stats.erl
@@ -151,11 +151,8 @@ rt_dirty() ->
get_stats() ->
case erlang:whereis(riak_repl_stats) of
Pid when is_pid(Pid) ->
- case riak_core_stat_cache:get_stats(?APP) of
- {ok, Stats, _TS} ->
- Stats;
- Error -> Error
- end;
+ {ok, Stats, _TS} = riak_core_stat_cache:get_stats(?APP),
+ Stats;
_ -> []
end.
diff --git a/src/riak_repl_wm_stats.erl b/src/riak_repl_wm_stats.erl
index 4302080e..02e9a43c 100644
--- a/src/riak_repl_wm_stats.erl
+++ b/src/riak_repl_wm_stats.erl
@@ -125,7 +125,6 @@ format_pid_stat(Pair) ->
jsonify_stats([], Acc) ->
- %?debugFmt("Got []: Acc: ~w", [Acc]),
lists:flatten(lists:reverse(Acc));
jsonify_stats([{fullsync, Num, _Left}|T], Acc) ->
diff --git a/test/riak_core_cluster_conn_eqc.erl b/test/riak_core_cluster_conn_eqc.erl
index 79348e81..e6486be0 100644
--- a/test/riak_core_cluster_conn_eqc.erl
+++ b/test/riak_core_cluster_conn_eqc.erl
@@ -175,7 +175,8 @@ postcondition(connected, connected, _S ,{call, ?MODULE, status, _}, R) ->
ExpectedStatus = {fake_socket,
ranch_tcp,
"overtherainbow",
- [{clustername, "FARFARAWAY"}]},
+ [{clustername, "FARFARAWAY"}],
+ {1,0}},
{_, status, Status} = R,
?P(Status =:= ExpectedStatus);
postcondition(State, State, _S ,{call, ?MODULE, status, [Pid]}, R) ->
@@ -226,7 +227,8 @@ connected_to_remote(Pid) ->
fake_socket,
ranch_tcp,
"overtherainbow",
- [{clustername, "FARFARAWAY"}]},
+ [{clustername, "FARFARAWAY"}],
+ {1,0}},
gen_fsm:send_event(Pid, Event).
cluster_name(Pid) ->
diff --git a/test/riak_core_cluster_mgr_tests.erl b/test/riak_core_cluster_mgr_tests.erl
index 980793fe..7340edd4 100644
--- a/test/riak_core_cluster_mgr_tests.erl
+++ b/test/riak_core_cluster_mgr_tests.erl
@@ -106,13 +106,13 @@ single_node_test_() ->
?assertEqual({ok, [?REMOTE_CLUSTER_NAME]}, Knowners)
end},
+ %% We should get "127.0.0.1",5002 every time
+ %% since local is always nonode@nohost
{"get ipaddres of cluster", fun() ->
- Original = [{"127.0.0.1",5001}, {"127.0.0.1",5002}, {"127.0.0.1",5003}],
- Rotated1 = [{"127.0.0.1",5002}, {"127.0.0.1",5003}, {"127.0.0.1",5001}],
- Rotated2 = [{"127.0.0.1",5003}, {"127.0.0.1",5001}, {"127.0.0.1",5002}],
- ?assert({ok,Original} == riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME)),
- ?assert({ok,Rotated1} == riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME)),
- ?assert({ok,Rotated2} == riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME))
+ Original = [{"127.0.0.1",5002}, {"127.0.0.1",5001}, {"127.0.0.1",5003}],
+ ?assertEqual({ok,Original},riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME)),
+ ?assertEqual({ok,Original},riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME)),
+ ?assertEqual({ok,Original},riak_core_cluster_mgr:get_ipaddrs_of_cluster(?REMOTE_CLUSTER_NAME))
end}
] end }.
diff --git a/test/riak_repl_test_util.erl b/test/riak_repl_test_util.erl
index 3e0616ea..176389ed 100644
--- a/test/riak_repl_test_util.erl
+++ b/test/riak_repl_test_util.erl
@@ -105,9 +105,9 @@ maybe_start_lager(_) ->
start_lager().
start_lager() ->
- %error_logger:tty(false),
{ok, Started} = application:ensure_all_started(lager),
- lager:set_loglevel(lager_console_backend, debug),
+ % But keep it quiet, please
+ lager:set_loglevel(lager_console_backend, '=emergency'),
Started.
%% @doc Stop the applications listsed. The list is assumed to be in the