Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/riak_repl_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ start(_Type, _StartArgs) ->
%% Spin up supervisor
case riak_repl_sup:start_link() of
{ok, Pid} ->
%% register stats
riak_repl_stats:register_stats(),
ok = riak_core_ring_events:add_guarded_handler(riak_repl_ring_handler, []),
{ok, Pid};
{error, Reason} ->
Expand Down
2 changes: 1 addition & 1 deletion src/riak_repl_console.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ status(quiet) ->

status2(Verbose) ->
Config = get_config(),
Stats1 = lists:sort(ets:tab2list(riak_repl_stats)),
Stats1 = lists:sort(riak_repl_stats:get_stats()),
LeaderStats = leader_stats(),
ClientStats = client_stats(),
ServerStats = server_stats(),
Expand Down
253 changes: 166 additions & 87 deletions src/riak_repl_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
-author('Andy Gross <andy@basho.com>').
-behaviour(gen_server).
-include("riak_repl.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
terminate/2,
code_change/3]).
-export([start_link/0,
client_bytes_sent/1,
Expand All @@ -27,20 +32,17 @@
objects_forwarded/0,
elections_elected/0,
elections_leader_changed/0,
add_counter/1,
add_counter/2,
increment_counter/1,
increment_counter/2]).
-record(state, {t,
last_report,
last_client_bytes_sent=0,
last_client_bytes_recv=0,
last_server_bytes_sent=0,
last_server_bytes_recv=0
}).
register_stats/0,
get_stats/0]).

-define(APP, riak_repl).

start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

register_stats() ->
[register_stat(Name, Type) || {Name, Type} <- stats()],
folsom_metrics:notify_existing_metric({?APP, last_report}, now(), gauge).

client_bytes_sent(Bytes) ->
increment_counter(client_bytes_sent, Bytes).

Expand All @@ -49,7 +51,7 @@ client_bytes_recv(Bytes) ->

client_connects() ->
increment_counter(client_connects).

client_connect_errors() ->
increment_counter(client_connect_errors).

Expand All @@ -69,7 +71,7 @@ server_connect_errors() ->
increment_counter(server_connect_errors).

server_fullsyncs() ->
increment_counter(server_fullsyncs).
increment_counter(server_fullsyncs).

objects_dropped_no_clients() ->
increment_counter(objects_dropped_no_clients).
Expand All @@ -78,91 +80,99 @@ objects_dropped_no_leader() ->
increment_counter(objects_dropped_no_leader).

objects_sent() ->
increment_counter(objects_sent).
increment_counter(objects_sent).

objects_forwarded() ->
increment_counter(objects_forwarded).
increment_counter(objects_forwarded).

elections_elected() ->
increment_counter(elections_elected).
increment_counter(elections_elected).

elections_leader_changed() ->
increment_counter(elections_leader_changed).

init([]) ->
T = ets:new(?MODULE, [public, named_table, set, {write_concurrency, true}]),
[ets:insert(T, {Stat, 0}) || Stat <- [server_bytes_sent,
server_bytes_recv,
server_connects,
server_connect_errors,
server_fullsyncs,
client_bytes_sent,
client_bytes_recv,
client_connects,
client_connect_errors,
client_redirect,
objects_dropped_no_clients,
objects_dropped_no_leader,
objects_sent,
objects_forwarded,
elections_elected,
elections_leader_changed]],
[ets:insert(T, {Stat, []}) || Stat <- [client_rx_kbps,
client_tx_kbps,
server_rx_kbps,
server_tx_kbps]],
schedule_report_bw(),
{ok, #state{t=T,last_report=now()}}.
increment_counter(elections_leader_changed).

add_counter(Name) ->
add_counter(Name, 0).
get_stats() ->
lists:flatten([backwards_compat(Stat, Type) ||
{Stat, Type} <- stats()]).

add_counter(Name, InitVal) when is_atom(Name) andalso is_integer(InitVal) ->
gen_server:call(?MODULE, {add_counter, Name, InitVal}, infinity).
init([]) ->
schedule_report_bw(),
{ok, ok}.

register_stat(Name, counter) ->
folsom_metrics:new_counter({?APP, Name});
register_stat(Name, history) ->
BwHistoryLen = get_bw_history_len(),
folsom_metrics:new_history({?APP, Name}, BwHistoryLen);
register_stat(Name, gauge) ->
folsom_metrics:new_gauge({?APP, Name}).

stats() ->
[{server_bytes_sent, counter},
{server_bytes_recv, counter},
{server_connects, counter},
{server_connect_errors, counter},
{server_fullsyncs, counter},
{client_bytes_sent, counter},
{client_bytes_recv, counter},
{client_connects, counter},
{client_connect_errors, counter},
{client_redirect, counter},
{objects_dropped_no_clients, counter},
{objects_dropped_no_leader, counter},
{objects_sent, counter},
{objects_forwarded, counter},
{elections_elected, counter},
{elections_leader_changed, counter},
{client_rx_kbps, history},
{client_tx_kbps, history},
{server_rx_kbps, history},
{server_tx_kbps, history},
{last_report, gauge},
{last_client_bytes_sent, gauge},
{last_client_bytes_recv, gauge},
{last_server_bytes_sent, gauge},
{last_server_bytes_recv, gauge}].

increment_counter(Name) ->
increment_counter(Name, 1).

increment_counter(Name, IncrBy) when is_atom(Name) andalso is_integer(IncrBy) ->
%gen_server:cast(?MODULE, {increment_counter, Name, IncrBy}).
catch ets:update_counter(?MODULE, Name, IncrBy).
folsom_metrics:notify_existing_metric({?APP, Name}, {inc, IncrBy}, counter).

handle_call({add_counter, Name, InitVal}, _From, State=#state{t=T}) ->
ets:insert(T, {Name, InitVal}),
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({increment_counter, Name, IncrBy}, State=#state{t=T}) ->
catch ets:update_counter(T, Name, IncrBy),

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(report_bw, State=#state{last_client_bytes_sent=LastClientBytesSent,
last_client_bytes_recv=LastClientBytesRecv,
last_server_bytes_sent=LastServerBytesSent,
last_server_bytes_recv=LastServerBytesRecv}) ->
handle_info(report_bw, State) ->
ThisClientBytesSent=lookup_stat(client_bytes_sent),
ThisClientBytesRecv=lookup_stat(client_bytes_recv),
ThisServerBytesSent=lookup_stat(server_bytes_sent),
ThisServerBytesRecv=lookup_stat(server_bytes_recv),

Now = now(),
DeltaSecs = now_diff(Now, State#state.last_report),
ClientTx = bytes_to_kbits_per_sec(ThisClientBytesSent, LastClientBytesSent, DeltaSecs),
ClientRx = bytes_to_kbits_per_sec(ThisClientBytesRecv, LastClientBytesRecv, DeltaSecs),
ServerTx = bytes_to_kbits_per_sec(ThisServerBytesSent, LastServerBytesSent, DeltaSecs),
ServerRx = bytes_to_kbits_per_sec(ThisServerBytesRecv, LastServerBytesRecv, DeltaSecs),

BwHistoryLen = app_helper:get_env(riak_repl, bw_history_len, 8),

update_list(client_tx_kbps, ClientTx, BwHistoryLen, State#state.t),
update_list(client_rx_kbps, ClientRx, BwHistoryLen, State#state.t),
update_list(server_tx_kbps, ServerTx, BwHistoryLen, State#state.t),
update_list(server_rx_kbps, ServerRx, BwHistoryLen, State#state.t),

DeltaSecs = now_diff(Now, lookup_stat(last_report)),
ClientTx = bytes_to_kbits_per_sec(ThisClientBytesSent, lookup_stat(last_client_bytes_sent), DeltaSecs),
ClientRx = bytes_to_kbits_per_sec(ThisClientBytesRecv, lookup_stat(last_client_bytes_recv), DeltaSecs),
ServerTx = bytes_to_kbits_per_sec(ThisServerBytesSent, lookup_stat(last_server_bytes_sent), DeltaSecs),
ServerRx = bytes_to_kbits_per_sec(ThisServerBytesRecv, lookup_stat(last_server_bytes_recv), DeltaSecs),

[folsom_metrics:notify_existing_metric({?APP, Metric}, Reading, history)
|| {Metric, Reading} <- [{client_tx_kbps, ClientTx},
{client_rx_kbps, ClientRx},
{server_tx_kbps, ServerTx},
{server_rx_kbps, ServerRx}]],

[folsom_metrics:notify_existing_metric({?APP, Metric}, Reading, gauge)
|| {Metric, Reading} <- [{last_client_bytes_sent, ThisClientBytesSent},
{last_client_bytes_recv, ThisClientBytesRecv},
{last_server_bytes_sent, ThisServerBytesSent},
{last_server_bytes_recv, ThisServerBytesRecv}]],

schedule_report_bw(),
{noreply, State#state{last_report = Now,
last_client_bytes_sent = ThisClientBytesSent,
last_client_bytes_recv = ThisClientBytesRecv,
last_server_bytes_sent = ThisServerBytesSent,
last_server_bytes_recv = ThisServerBytesRecv}};
{noreply, State};
handle_info(_Info, State) -> {noreply, State}.

terminate(_Reason, _State) -> ok.
Expand All @@ -171,21 +181,90 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
schedule_report_bw() ->
BwHistoryInterval = app_helper:get_env(riak_repl, bw_history_interval, 60000),
timer:send_after(BwHistoryInterval, report_bw).


%% Convert two values in bytes to a kbits/sec
bytes_to_kbits_per_sec(This, Last, Delta) ->
trunc((This - Last) / (128 * Delta)). %% x8/1024 = x/128

update_list(Name, Entry, MaxLen, Tid) ->
Current = lookup_stat(Name),
Updated = [Entry | lists:sublist(Current, MaxLen-1)],
ets:insert(Tid, {Name, Updated}).

lookup_stat(Name) ->
[{Name,Val}]=ets:lookup(?MODULE, Name),
Val.
folsom_metrics:get_metric_value({?APP, Name}).

now_diff({Lmega,Lsecs,_Lmicro}, {Emega,Esecs,_Emicro}) ->
1000000*(Lmega-Emega)+(Lsecs-Esecs).


get_bw_history_len() ->
app_helper:get_env(riak_repl, bw_history_len, 8).

backwards_compat(Name, history) ->
Stats = folsom_metrics:get_history_values({?APP, Name}, get_bw_history_len()),
Readings = [[Reading || {event, Reading} <- Events] || {_Moment, Events} <- Stats],
{Name, Readings};
backwards_compat(_Name, gauge) ->
[];
backwards_compat(Name, _Type) ->
{Name, lookup_stat(Name)}.

-ifdef(TEST).

repl_stats_test_() ->
{setup, fun() ->
folsom:start(),
{ok, Pid} = riak_repl_stats:start_link(),
Pid end,
fun(Pid) ->
folsom:stop(),
exit(Pid, kill) end,
[{"Register stats", fun test_register_stats/0},
{"Populate stats", fun test_populate_stats/0},
{"Check stats", fun test_check_stats/0}]
}.

test_register_stats() ->
register_stats(),
RegisteredReplStats = [Stat || {App, Stat} <- folsom_metrics:get_metrics(),
App == riak_repl],
{Stats, _Types} = lists:unzip(stats()),
?assertEqual(lists:sort(Stats), lists:sort(RegisteredReplStats)).

test_populate_stats() ->
Bytes = 1000,
ok = client_bytes_sent(Bytes),
ok = client_bytes_recv(Bytes),
ok = client_connects(),
ok = client_connect_errors(),
ok = client_redirect(),
ok = server_bytes_sent(Bytes),
ok = server_bytes_recv(Bytes),
ok = server_connects(),
ok = server_connect_errors(),
ok = server_fullsyncs(),
ok = objects_dropped_no_clients(),
ok = objects_dropped_no_leader(),
ok = objects_sent(),
ok = objects_forwarded(),
ok = elections_elected(),
ok = elections_leader_changed().

test_check_stats() ->
?assertEqual([{server_bytes_sent,1000},
{server_bytes_recv,1000},
{server_connects,1},
{server_connect_errors,1},
{server_fullsyncs,1},
{client_bytes_sent,1000},
{client_bytes_recv,1000},
{client_connects,1},
{client_connect_errors,1},
{client_redirect,1},
{objects_dropped_no_clients,1},
{objects_dropped_no_leader,1},
{objects_sent,1},
{objects_forwarded,1},
{elections_elected,1},
{elections_leader_changed,1},
{client_rx_kbps,[]},
{client_tx_kbps,[]},
{server_rx_kbps,[]},
{server_tx_kbps,[]}], get_stats()).

-endif.