Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions include/riak_control.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,27 @@
partition :: integer(),
owner :: owner(),
vnodes :: services(),
handoffs :: handoffs()
}).
handoffs :: handoffs() }).

-define(PARTITION_INFO, #partition_info).
-type partition() :: ?PARTITION_INFO{}.
-type partitions() :: [partition()].

%% Riak 1.3
-record(member_info,
{ node :: atom(),
status :: status(),
reachable :: boolean(),
vnodes :: vnodes(),
handoffs :: handoffs(),
ring_pct :: float(),
pending_pct :: float(),
mem_total :: integer(),
mem_used :: integer(),
mem_erlang :: integer() }).

%% Riak 1.4.1+
-record(member_info_v2,
{ node :: atom(),
status :: status(),
reachable :: boolean(),
Expand All @@ -73,11 +90,11 @@
mem_used :: integer(),
mem_erlang :: integer(),
action :: action(),
replacement :: node()
}).
replacement :: node() }).

-type partitions() :: [#partition_info{}].
-type members() :: [#member_info{}].
-define(MEMBER_INFO, #member_info_v2).
-type member() :: ?MEMBER_INFO{}.
-type members() :: [member()].

%% These two should always match, in terms of webmachine dispatcher
%% logic, and ADMIN_BASE_PATH should always end with a /
Expand Down
19 changes: 19 additions & 0 deletions priv/admin/js/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ minispade.register('core', function() {
riak_pipe_vnode_status: DS.attr("string"),
riak_search_vnode_status: DS.attr("string"),

/**
* Return status of whether the node is incompatible or not.
*
* @returns {Boolean}
*/
incompatible: function() {
return this.get('status') === 'incompatible';
}.property('status'),

/**
* Consider an available node one which is compatible, and
* reachable.
*
* @returns {Boolean}
*/
available: function() {
return !this.get('incompatible') && this.get('reachable');
}.property('status', 'reachable'),

/**
* Coerce vnode status into representations that are useful
* for the user interface.
Expand Down
4 changes: 2 additions & 2 deletions priv/admin/js/generated/templates.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion priv/admin/js/templates/current_cluster_item.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<div class="clear"></div>
</div>
<div class="item4 gui-text memory-box">
{{#if reachable}}
{{#if available}}
<div class="membar-bg">
<div class="mem-colors">
<div class="erlang-mem mem-color" {{bindAttr style="memErlangStyle"}} {{bindAttr name="memErlangCeil"}}></div>
Expand Down
2 changes: 1 addition & 1 deletion priv/admin/js/templates/current_nodes_item.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<div class="clear"></div>
</div>
<div class="item5 gui-text memory-box">
{{#if reachable}}
{{#if available}}
<div class="membar-bg">
<div class="mem-colors">
<div class="erlang-mem mem-color" {{bindAttr style="memErlangStyle"}} {{bindAttr name="memErlangCeil"}}></div>
Expand Down
2 changes: 1 addition & 1 deletion src/riak_control_formatting.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ action_result(_,Req,C) ->
%% return a proplist of details for a given index
node_ring_details (P=#partition_info{index=Index,vnodes=Vnodes},Nodes) ->
case lists:keyfind(P#partition_info.owner,2,Nodes) of
#member_info{node=Node,status=Status,reachable=Reachable} ->
?MEMBER_INFO{node=Node,status=Status,reachable=Reachable} ->
Handoffs = P#partition_info.handoffs,
VnodeStatuses = [{atom_to_list(VnodeName) ++
"_vnode_status", vnode_status(VnodeName, VnodeStatus, Handoffs)}
Expand Down
76 changes: 57 additions & 19 deletions src/riak_control_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
code_change/3]).

%% exported for RPC calls.
-export([get_my_info/0]).
-export([get_my_info/0,
get_my_info_v2/0]).

-record(state, {vsn :: version(),
services :: services(),
Expand Down Expand Up @@ -272,7 +273,7 @@ update_partitions(State=#state{ring=Ring}) ->
State#state{partitions=Partitions}.

%% @doc Ping and retrieve vnode workers.
-spec get_member_info({node(), status()}, ring()) -> #member_info{}.
-spec get_member_info({node(), status()}, ring()) -> member().
get_member_info(_Member={Node, Status}, Ring) ->
RingSize = riak_core_ring:num_partitions(Ring),

Expand All @@ -283,45 +284,64 @@ get_member_info(_Member={Node, Status}, Ring) ->
PctPending = length(FutureIndices) / RingSize,

%% try and get a list of all the vnodes running on the node
case rpc:call(Node, riak_control_session, get_my_info, []) of
case rpc:call(Node, riak_control_session, get_my_info_v2, []) of
{badrpc,nodedown} ->
#member_info{node = Node,
?MEMBER_INFO{node = Node,
status = Status,
reachable = false,
vnodes = [],
handoffs = [],
ring_pct = PctRing,
pending_pct = PctPending};
{badrpc,_Reason} ->
#member_info{node = Node,
?MEMBER_INFO{node = Node,
status = incompatible,
reachable = true,
vnodes = [],
handoffs = [],
ring_pct = PctRing,
pending_pct = PctPending};
MemberInfo = #member_info{} ->
%% there is a race condition here, when a node is stopped
%% gracefully (e.g. `riak stop`) the event will reach us
%% before the node is actually down and the rpc call will
%% succeed, but since it's shutting down it won't have any
%% vnode workers running...
MemberInfo#member_info{status = Status,
MemberInfo = ?MEMBER_INFO{} ->
MemberInfo?MEMBER_INFO{status = Status,
ring_pct = PctRing,
pending_pct = PctPending}
pending_pct = PctPending};
MemberInfo0 = #member_info{} ->
%% Upgrade older member information record.
MemberInfo = upgrade_member_info(MemberInfo0),
MemberInfo?MEMBER_INFO{status = Status,
ring_pct = PctRing,
pending_pct = PctPending};
_ ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with matching against #member_info{} in this case? It seems the main difference is that the new record has additional fields.

A capability might also help, allowing new nodes to send the original record to old nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't match member_info because we can't guarantee a specific format for it. I've added a capability for handling legacy record formats, but we still don't have a solution for 1.4.0/1.4.1 compatibility.

%% default case where a record incompatibility causes a
%% failure matching the record format.
?MEMBER_INFO{node = Node,
status = incompatible,
reachable = true,
vnodes = [],
handoffs = [],
ring_pct = PctRing,
pending_pct = PctPending}
end.

%% @doc Return current nodes information.
-spec get_my_info() -> #member_info{}.
-spec get_my_info() -> member().
get_my_info() ->
erlang:throw({badrpc, incompatible}).

%% @doc Return current nodes information.
-spec get_my_info_v2() -> member().
get_my_info_v2() ->
{Total, Used} = get_my_memory(),
#member_info{node = node(),
Handoffs = get_handoff_status(),
VNodes = riak_core_vnode_manager:all_vnodes(),
ErlangMemory = proplists:get_value(total,erlang:memory()),
?MEMBER_INFO{node = node(),
reachable = true,
mem_total = Total,
mem_used = Used,
mem_erlang = proplists:get_value(total,erlang:memory()),
vnodes = riak_core_vnode_manager:all_vnodes(),
handoffs = get_handoff_status()}.
mem_erlang = ErlangMemory,
vnodes = VNodes,
handoffs = Handoffs}.

%% @doc Return current nodes memory.
-spec get_my_memory() -> {term(), term()}.
Expand Down Expand Up @@ -364,7 +384,7 @@ get_handoff_status() ->
%% @doc Get handoffs for every node.
-spec get_all_handoffs(#state{}) -> handoffs().
get_all_handoffs(#state{nodes=Members}) ->
lists:flatten([HS || #member_info{handoffs=HS} <- Members]).
lists:flatten([HS || ?MEMBER_INFO{handoffs=HS} <- Members]).

%% @doc Get information for a particular index.
-spec get_partition_details(#state{}, {integer(), term()}, handoffs())
Expand Down Expand Up @@ -469,3 +489,21 @@ maybe_stage_change(Node, Action, Replacement) ->
stop ->
rpc:call(Node, riak_core, stop, [])
end.

%% @doc Conditionally upgrade member info records once they cross node
%% boundaries.
-spec upgrade_member_info(member() | #member_info{}) -> member().
upgrade_member_info(MemberInfo = ?MEMBER_INFO{}) ->
MemberInfo;
upgrade_member_info(MemberInfo = #member_info{}) ->
?MEMBER_INFO{
node = MemberInfo#member_info.node,
status = MemberInfo#member_info.status,
reachable = MemberInfo#member_info.reachable,
vnodes = MemberInfo#member_info.vnodes,
handoffs = MemberInfo#member_info.handoffs,
ring_pct = MemberInfo#member_info.ring_pct,
pending_pct = MemberInfo#member_info.pending_pct,
mem_total = MemberInfo#member_info.mem_total,
mem_used = MemberInfo#member_info.mem_used,
mem_erlang = MemberInfo#member_info.mem_erlang}.
46 changes: 23 additions & 23 deletions src/riak_control_wm_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ to_json(ReqData, Context) ->

%% Get the current node list.
{ok, _V, Nodes} = riak_control_session:get_nodes(),
Current = [jsonify_node(Node) || Node=#member_info{} <- Nodes],
Current = [jsonify_node(Node) || Node=?MEMBER_INFO{} <- Nodes],

%% Get the current list of planned changes and updated claim.
Planned = case riak_control_session:get_plan() of
Expand All @@ -215,7 +215,7 @@ to_json(ReqData, Context) ->
{mochijson2:encode({struct,[{cluster,Clusters}]}), ReqData, Context}.

%% @doc Generate a new "planned" cluster which outlines transitions.
-spec merge_transitions(list(#member_info{}), list(), list()) ->
-spec merge_transitions(list(member()), list(), list()) ->
[{struct, list()}].
merge_transitions(Nodes, Changes, Claim) ->
lists:foldl(fun(Node, TransitionedNodes) ->
Expand All @@ -224,32 +224,32 @@ merge_transitions(Nodes, Changes, Claim) ->
end, [], Nodes).

%% @doc Merge change into member info record.
-spec apply_changes(#member_info{}, list(), list()) -> #member_info{}.
-spec apply_changes(member(), list(), list()) -> member().
apply_changes(Node, Changes, Claim) ->
apply_status_change(apply_claim_change(Node, Claim), Changes).

%% @doc Merge change into member info record.
-spec apply_status_change(#member_info{}, list()) -> #member_info{}.
-spec apply_status_change(member(), list()) -> member().
apply_status_change(Node, Changes) ->
Name = Node#member_info.node,
Name = Node?MEMBER_INFO.node,

case lists:keyfind(Name, 1, Changes) of
false ->
Node;
{_, {Action, Replacement}} ->
Node#member_info{action=Action, replacement=Replacement};
Node?MEMBER_INFO{action=Action, replacement=Replacement};
{_, Action} ->
Node#member_info{action=Action}
Node?MEMBER_INFO{action=Action}
end.

%% @doc Merge change into member info record.
-spec apply_claim_change(#member_info{}, list()) -> #member_info{}.
-spec apply_claim_change(member(), list()) -> member().
apply_claim_change(Node, Claim) ->
Name = Node#member_info.node,
Name = Node?MEMBER_INFO.node,

case lists:keyfind(Name, 1, Claim) of
false ->
Node#member_info{ring_pct=0.0, pending_pct=0.0};
Node?MEMBER_INFO{ring_pct=0.0, pending_pct=0.0};
{_, {_, Future}} ->
%% @doc Hack until core returns normalized values.
Normalized = if
Expand All @@ -258,29 +258,29 @@ apply_claim_change(Node, Claim) ->
true ->
Future
end,
Node#member_info{ring_pct=Normalized, pending_pct=Normalized}
Node?MEMBER_INFO{ring_pct=Normalized, pending_pct=Normalized}
end.

%% @doc Turn a node into a proper struct for serialization.
-spec jsonify_node(#member_info{}) -> {struct, list()}.
-spec jsonify_node(member()) -> {struct, list()}.
jsonify_node(Node) ->
LWM=app_helper:get_env(riak_control,low_mem_watermark,0.1),
MemUsed = Node#member_info.mem_used,
MemTotal = Node#member_info.mem_total,
Reachable = Node#member_info.reachable,
MemUsed = Node?MEMBER_INFO.mem_used,
MemTotal = Node?MEMBER_INFO.mem_total,
Reachable = Node?MEMBER_INFO.reachable,
LowMem = low_mem(Reachable, MemUsed, MemTotal, LWM),
{struct,[{"name",Node#member_info.node},
{"status",Node#member_info.status},
{struct,[{"name",Node?MEMBER_INFO.node},
{"status",Node?MEMBER_INFO.status},
{"reachable",Reachable},
{"ring_pct",Node#member_info.ring_pct},
{"pending_pct",Node#member_info.pending_pct},
{"ring_pct",Node?MEMBER_INFO.ring_pct},
{"pending_pct",Node?MEMBER_INFO.pending_pct},
{"mem_total",MemTotal},
{"mem_used",MemUsed},
{"mem_erlang",Node#member_info.mem_erlang},
{"mem_erlang",Node?MEMBER_INFO.mem_erlang},
{"low_mem",LowMem},
{"me",Node#member_info.node == node()},
{"action",Node#member_info.action},
{"replacement",Node#member_info.replacement}]}.
{"me",Node?MEMBER_INFO.node == node()},
{"action",Node?MEMBER_INFO.action},
{"replacement",Node?MEMBER_INFO.replacement}]}.

%% @doc Given a struct/proplist that we've received via JSON,
%% recursively turn the keys into atoms from binaries.
Expand Down
49 changes: 32 additions & 17 deletions src/riak_control_wm_nodes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,31 +79,46 @@ to_json(ReqData, Context) ->
%% Get the current node list.
{ok, _V, RawNodes} = riak_control_session:get_nodes(),

Nodes = [jsonify_node(Node) || Node=#member_info{} <- RawNodes],
Nodes = [jsonify_node(Node) || Node=?MEMBER_INFO{} <- RawNodes],
Encoded = mochijson2:encode({struct, [{nodes, Nodes}]}),

{Encoded, ReqData, Context}.

%% @doc Turn a node into a proper struct for serialization.
-spec jsonify_node(#member_info{}) -> {struct, list()}.
-spec jsonify_node(member()) -> {struct, list()}.
jsonify_node(Node) ->
LWM=app_helper:get_env(riak_control,low_mem_watermark,0.1),
MemUsed = Node#member_info.mem_used,
MemTotal = Node#member_info.mem_total,
Reachable = Node#member_info.reachable,
LowMem = case Reachable of
false ->
false;
true ->
1.0 - (MemUsed/MemTotal) < LWM
end,
{struct,[{"name",Node#member_info.node},
{"status",Node#member_info.status},
MemUsed = Node?MEMBER_INFO.mem_used,
MemTotal = Node?MEMBER_INFO.mem_total,
Reachable = Node?MEMBER_INFO.reachable,
LowMem = low_mem(Reachable, MemUsed, MemTotal, LWM),
{struct,[{"name",Node?MEMBER_INFO.node},
{"status",Node?MEMBER_INFO.status},
{"reachable",Reachable},
{"ring_pct",Node#member_info.ring_pct},
{"pending_pct",Node#member_info.pending_pct},
{"ring_pct",Node?MEMBER_INFO.ring_pct},
{"pending_pct",Node?MEMBER_INFO.pending_pct},
{"mem_total",MemTotal},
{"mem_used",MemUsed},
{"mem_erlang",Node#member_info.mem_erlang},
{"mem_erlang",Node?MEMBER_INFO.mem_erlang},
{"low_mem",LowMem},
{"me",Node#member_info.node == node()}]}.
{"me",Node?MEMBER_INFO.node == node()},
{"action",Node?MEMBER_INFO.action},
{"replacement",Node?MEMBER_INFO.replacement}]}.

%% @doc Determine if a node has low memory.
-spec low_mem(boolean(), number() | atom(), number() | atom(), number())
-> boolean().
low_mem(Reachable, MemUsed, MemTotal, LWM) ->
case Reachable of
false ->
false;
true ->
%% There is a race where the node is online, but memsup is
%% still starting so memory is unavailable.
case MemTotal of
undefined ->
false;
_ ->
1.0 - (MemUsed/MemTotal) < LWM
end
end.