Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
fa11a67
Improve AAE fullsync by estimating number of keys
Sep 8, 2014
e6d1ec1
Add timeout to natmap_test_
Sep 15, 2014
c8ec0d9
Use deduce_wire_version_from_proto for AAE
Sep 17, 2014
a13c865
Keep legacy for riak_repl_aae_source state.proto
Sep 17, 2014
3045282
Rename fullsync config parameters
krestenkrab Sep 18, 2014
d1c4130
Merge branch 'miklix_estimate_keys-2.0' into feature/aae-fullsync-est…
jtuple Oct 6, 2014
779c1ec
Remove unused riak_repl_aae_source:replicate_diff/3
jtuple Oct 6, 2014
83dd192
Remove redundant and unnecessary logging
Oct 16, 2014
e43257a
Revert some of "Remove redundant and unnecessary logging"
Oct 16, 2014
67b9535
Fix xref and dialyzer
Oct 21, 2014
1114a70
Implement soft_exit, primarily for aae_fullsyn.
lordnull Sep 29, 2014
a1166e1
Fixed invalid call to update error count track.
lordnull Nov 21, 2014
c36c3ca
Fix error/retry exit counts on location down msgs
engelsanchez Nov 26, 2014
5479089
Fix dialyzer warnings
engelsanchez Dec 5, 2014
75ef166
Merge pull request #640 from basho/feature/chatty-aae-transient-fs-fa…
borshop Dec 8, 2014
4bf87a6
Added last_fullsync_completed stat tracking.
lordnull Dec 8, 2014
38b7b4b
Revert riak_kv deps changes
Dec 9, 2014
ce0c1b4
Update logging after review
Dec 10, 2014
da47f0a
Cancel directly on not_responsible from remote cluster
Dec 15, 2014
2da227c
Address some minor bugs around establishing SSL connections
kellymclaughlin Dec 15, 2014
fa4478f
Merge pull request #644 from basho/bugfix/ssl-test-issues
borshop Dec 15, 2014
6bdc099
handle not_responsible for local partitions
Dec 16, 2014
90733b6
Added test and fix to coord_serv not givin list for status.
lordnull Dec 17, 2014
39fa59f
Removed pointless check for leader.
lordnull Dec 17, 2014
df42bcf
Merge pull request #623 from basho/feature/aae-fullsync-estimate-keys
borshop Dec 19, 2014
9cd1ed0
Merge pull request #645 from basho/bugfix/mw/snmp-stats-crash-when-no…
borshop Dec 19, 2014
d5aa377
Merge pull request #642 from basho/feature/mw/last-fullsync-completed
borshop Dec 19, 2014
8f079a2
Remove partition from purgatory when giving up
engelsanchez Dec 23, 2014
31a1657
Silence noisy lager output during tests
engelsanchez Dec 23, 2014
8585a89
Merge pull request #648 from basho/bugfix/remove-from-purgatory
borshop Dec 24, 2014
982eb52
Fixes to make edoc happy
macintux Aug 19, 2014
05822f4
Add new cluster membership function
krestenkrab Oct 6, 2014
b0a173f
Add client code for {1,1} cluster protocol
krestenkrab Oct 6, 2014
3c9f7ee
Register default all_member_fun
krestenkrab Oct 7, 2014
570420d
Add rtsource_conn:address, and :reconnect
krestenkrab Oct 7, 2014
3efd0d7
Expose filter_blacklisted_ipaddrs function
krestenkrab Oct 7, 2014
06ebbe5
Extra code in ring change hook to maybe reconnect
krestenkrab Oct 7, 2014
eb0bd2c
improve ip address comparison
krestenkrab Oct 7, 2014
651ee80
fixes to make riak start
Oct 7, 2014
981ec6a
improved logging
Oct 9, 2014
c1b18ab
change blacklist behaviour to not blacklist unkown endpoints
Oct 9, 2014
b4df4c9
delayed rebalance of rt connections
Oct 9, 2014
ef29773
refac
Oct 9, 2014
3d33b54
indent
Oct 9, 2014
4b8b350
remove spamming log statement
Oct 9, 2014
b54555e
Only send one rebalance_now.
Oct 13, 2014
51c417b
Remove extra multiplication with 1000
Oct 13, 2014
a6873bd
Remove duplicated code
Oct 13, 2014
610508e
Refactor maybe_rebalance
Oct 13, 2014
5188180
Fix eunit tests.
Oct 15, 2014
5c71490
Indent & Refac
Oct 22, 2014
6e61348
Fix dialyzer
Oct 24, 2014
1bbea37
Bug: takewhile -> filter
Nov 26, 2014
23e81b5
Fix exometer induced dialyzer errors
engelsanchez Dec 15, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dialyzer.ignore-warnings
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ riak_repl_keylist_client.erl:267: The call application:unset_env('riak_repl',{'p
riak_repl_keylist_client.erl:120: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:132: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term())
riak_core_connection.erl:108: Function exchange_handshakes_with/4 has no local return
riak_core_connection.erl:171: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
riak_core_connection.erl:189: Function try_ssl/4 will never be called
riak_core_connection.erl:225: Function negotiate_proto_with_server/3 will never be called
riak_core_connection.erl:172: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
riak_repl_keylist_client.erl:106: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:216: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
riak_repl_keylist_client.erl:265: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
Expand Down
3 changes: 2 additions & 1 deletion include/riak_core_connection.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

%% handshake messages to safely initiate a connection. Let's not accept
%% a connection to a telnet session by accident!
-define(CTRL_REV, {1,0}).
-define(CTRL_REV, {1,1}).
-define(CTRL_HELLO, <<"riak-ctrl:hello">>).
-define(CTRL_TELL_IP_ADDR, <<"riak-ctrl:ip_addr">>).
-define(CTRL_ACK, <<"riak-ctrl:ack">>).
-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>).
-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>).
-define(CTRL_ALL_MEMBERS, <<"riak-ctrl:all_members">>).


-define(CONNECTION_SETUP_TIMEOUT, 10000).
Expand Down
3 changes: 3 additions & 0 deletions include/riak_repl.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
-define(DEFAULT_SOURCE_RETRIES, infinity).
%% How many times we should retry when failing a reservation
-define(DEFAULT_RESERVE_RETRIES, 0).
%% How many times during a fullsync we should retry a partion that has sent
%% a 'soft_exit' message to the coordinator
-define(DEFAULT_SOURCE_SOFT_RETRIES, infinity).
%% 20 seconds. sources should claim within 5 seconds, but give them a little more time
-define(RESERVATION_TIMEOUT, (20 * 1000)).
-define(DEFAULT_MAX_FS_BUSIES_TOLERATED, 10).
Expand Down
93 changes: 78 additions & 15 deletions src/riak_core_cluster_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,21 @@

-type remote() :: {cluster_by_name, clustername()} | {cluster_by_addr, ip_addr()}.
-type peer_address() :: {string(), pos_integer()}.
-type node_address() :: {atom(), peer_address()}.
-type ranch_transport_messages() :: {atom(), atom(), atom()}.
-record(state, {mode :: atom(),
remote :: remote(),
socket :: port(),
name :: clustername(),
previous_name="undefined" :: clustername(),
members=[] :: [peer_address()],
members=[] :: [peer_address() | node_address()],
connection_ref :: reference(),
connection_timeout :: timeout(),
transport :: atom(),
address :: peer_address(),
connection_props :: proplists:proplist(),
transport_msgs :: ranch_transport_messages()}).
transport_msgs :: ranch_transport_messages(),
proto_version :: {non_neg_integer(), non_neg_integer()} }).
-type state() :: #state{}.

%%%===================================================================
Expand Down Expand Up @@ -121,14 +123,17 @@ status(Ref, Timeout) ->
connected(Socket,
Transport,
Addr,
{?REMOTE_CLUSTER_PROTO_ID, _MyVer, _RemoteVer},
{?REMOTE_CLUSTER_PROTO_ID,
_MyVer ={CommonMajor,LocalMinor},
_RemoteVer={CommonMajor,RemoteMinor}},
{_Remote, Client},
Props) ->
%% give control over the socket to the `Client' process.
%% tell client we're connected and to whom
Transport:controlling_process(Socket, Client),
gen_fsm:send_event(Client,
{connected_to_remote, Socket, Transport, Addr, Props}).
{connected_to_remote, Socket, Transport, Addr, Props,
{CommonMajor, min(LocalMinor,RemoteMinor)}}).

-spec connect_failed({term(), term()}, {error, term()}, {_, atom() | pid() | port() | {atom(), _} | {via, _, _}}) -> ok.
connect_failed({_Proto, _Vers}, {error, _}=Error, {_Remote, Client}) ->
Expand Down Expand Up @@ -189,7 +194,7 @@ connecting({connect_failed, Error}, State=#state{remote=Remote}) ->
%% This is fatal! We are being supervised by conn_sup and if we
%% die, it will restart us.
{stop, Error, State};
connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
connecting({connected_to_remote, Socket, Transport, Addr, Props, ProtoVersion}, State) ->
RemoteName = proplists:get_value(clustername, Props),
_ = lager:debug("Cluster Manager control channel client connected to"
" remote ~p at ~p named ~p",
Expand All @@ -202,7 +207,9 @@ connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
transport=Transport,
address=Addr,
connection_props=Props,
transport_msgs = TransportMsgs},
transport_msgs = TransportMsgs,
proto_version=ProtoVersion
},
_ = request_cluster_name(UpdState),
{next_state, waiting_for_cluster_name, UpdState, ?CONNECTION_SETUP_TIMEOUT};
connecting(poll_cluster, State) ->
Expand Down Expand Up @@ -240,13 +247,21 @@ waiting_for_cluster_name(_, _From, _State) ->
{reply, ok, waiting_for_cluster_name, _State}.

%% Async message handling for the `waiting_for_cluster_members' state
waiting_for_cluster_members({cluster_members, Members}, State) ->
waiting_for_cluster_members({cluster_members, NewMembers}, State = #state{ proto_version={1,0} }) ->
#state{address=Addr,
name=Name,
previous_name=PreviousName,
members=OldMembers,
remote=Remote} = State,
%% This is the first time we're updating the cluster manager
%% with the name of this cluster, so it's old name is undefined.
%% this is 1.0 code. NewMembers is list of {IP,Port}

SortedNew = ordsets:from_list(NewMembers),
Members =
NewMembers ++ lists:filter(fun(Mem) ->
not ordsets:is_element(Mem, SortedNew)
end,
OldMembers),

ClusterUpdatedMsg = {cluster_updated,
PreviousName,
Name,
Expand All @@ -255,14 +270,48 @@ waiting_for_cluster_members({cluster_members, Members}, State) ->
Remote},
gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
{next_state, connected, State#state{members=Members}};
waiting_for_cluster_members({all_cluster_members, NewMembers}, State) ->
#state{address=Addr,
name=Name,
previous_name=PreviousName,
members=OldMembers,
remote=Remote} = State,

%% this is 1.1+ code. Members is list of {node,{IP,Port}}

Members =
lists:foldl(fun(Elm={_Node,{_Ip,Port}}, Acc) when is_integer(Port) ->
[Elm|Acc];
({Node,_}, Acc) ->
case lists:keyfind(Node, 1, OldMembers) of
Elm={Node,{_IP,Port}} when is_integer(Port) ->
[Elm|Acc];
_ ->
Acc
end
end,
[],
NewMembers ),

ClusterUpdatedMsg = {cluster_updated,
PreviousName,
Name,
[Member || {_Node,Member} <- Members],
Addr,
Remote},
gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
{next_state, connected, State#state{members=Members}};
waiting_for_cluster_members(_, _State) ->
{next_state, waiting_for_cluster_members, _State}.

%% Sync message handling for the `waiting_for_cluster_members' state
waiting_for_cluster_members(status, _From, State) ->
{reply, {waiting_for_cluster_members, State#state.name}, waiting_for_cluster_members, State};
waiting_for_cluster_members(_, _From, _State) ->
{reply, ok, waiting_for_cluster_members, _State}.
waiting_for_cluster_members(Other, _From, State) ->
_ = lager:error("cluster_conn: client got unexpected "
"msg from remote: ~p, ~p",
[State#state.remote, Other]),
{reply, ok, waiting_for_cluster_members, State}.

%% Async message handling for the `connected' state
connected(poll_cluster, State) ->
Expand Down Expand Up @@ -307,11 +356,18 @@ handle_info({TransOK, Socket, Name},
{next_state, waiting_for_cluster_name, State};
handle_info({TransOK, Socket, Members},
waiting_for_cluster_members,
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}, proto_version={1,0}}) ->
Transport = State#state.transport,
gen_fsm:send_event(self(), {cluster_members, binary_to_term(Members)}),
_ = Transport:setopts(Socket, [{active, once}]),
{next_state, waiting_for_cluster_members, State};
handle_info({TransOK, Socket, Members},
waiting_for_cluster_members,
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
Transport = State#state.transport,
gen_fsm:send_event(self(), {all_cluster_members, binary_to_term(Members)}),
_ = Transport:setopts(Socket, [{active, once}]),
{next_state, waiting_for_cluster_members, State};
handle_info({TransOK, Socket, Data},
StateName,
State=#state{address=Addr,
Expand Down Expand Up @@ -362,18 +418,25 @@ code_change(_OldVsn, StateName, State, _Extra) ->
request_cluster_name(#state{mode=test}) ->
ok;
request_cluster_name(#state{socket=Socket, transport=Transport}) ->
_ = inet:setopts(Socket, [{active, once}]),
_ = Transport:setopts(Socket, [{active, once}]),
Transport:send(Socket, ?CTRL_ASK_NAME).

-spec request_member_ips(state()) -> ok | {error, term()}.
request_member_ips(#state{mode=test}) ->
ok;
request_member_ips(#state{socket=Socket, transport=Transport}) ->
request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,0}}) ->
Transport:send(Socket, ?CTRL_ASK_MEMBERS),
%% get the IP we think we've connected to
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
%% make it a string
PeerIPStr = inet_parse:ntoa(PeerIP),
Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort}));
request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,1}}) ->
Transport:send(Socket, ?CTRL_ALL_MEMBERS),
%% get the IP we think we've connected to
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
%% make it a string
PeerIPStr = inet_parse:ntoa(PeerIP),
Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort})).

initiate_connection(State=#state{mode=test}) ->
Expand All @@ -383,7 +446,7 @@ initiate_connection(State=#state{remote=Remote}) ->
%% `riak_core_connection_mgr::connect/4' is incorrect.
{ok, Ref} = riak_core_connection_mgr:connect(
Remote,
{{?REMOTE_CLUSTER_PROTO_ID, [{1,0}]},
{{?REMOTE_CLUSTER_PROTO_ID, [{1,1},{1,0}]},
{?CTRL_OPTIONS, ?MODULE, {Remote, self()}}},
default),
State#state{connection_ref=Ref}.
Expand Down
Loading