44-author ('Andy Gross <andy@basho.com>' ).
55-behaviour (gen_server ).
66-include (" riak_repl.hrl" ).
7- -export ([init /1 ,
8- handle_call /3 ,
9- handle_cast /2 ,
7+
8+ -ifdef (TEST ).
9+ -include_lib (" eunit/include/eunit.hrl" ).
10+ -endif .
11+
12+ -export ([init /1 ,
13+ handle_call /3 ,
14+ handle_cast /2 ,
1015 handle_info /2 ,
11- terminate /2 ,
16+ terminate /2 ,
1217 code_change /3 ]).
1318-export ([start_link /0 ,
1419 client_bytes_sent /1 ,
2732 objects_forwarded /0 ,
2833 elections_elected /0 ,
2934 elections_leader_changed /0 ,
30- add_counter /1 ,
31- add_counter /2 ,
32- increment_counter /1 ,
33- increment_counter /2 ]).
34- -record (state , {t ,
35- last_report ,
36- last_client_bytes_sent = 0 ,
37- last_client_bytes_recv = 0 ,
38- last_server_bytes_sent = 0 ,
39- last_server_bytes_recv = 0
40- }).
35+ register_stats /0 ,
36+ get_stats /0 ]).
37+
38+ -define (APP , riak_repl ).
4139
4240start_link () -> gen_server :start_link ({local , ? MODULE }, ? MODULE , [], []).
4341
42+ register_stats () ->
43+ [register_stat (Name , Type ) || {Name , Type } <- stats ()],
44+ folsom_metrics :notify_existing_metric ({? APP , last_report }, now (), gauge ).
45+
4446client_bytes_sent (Bytes ) ->
4547 increment_counter (client_bytes_sent , Bytes ).
4648
@@ -49,7 +51,7 @@ client_bytes_recv(Bytes) ->
4951
5052client_connects () ->
5153 increment_counter (client_connects ).
52-
54+
5355client_connect_errors () ->
5456 increment_counter (client_connect_errors ).
5557
@@ -69,7 +71,7 @@ server_connect_errors() ->
6971 increment_counter (server_connect_errors ).
7072
7173server_fullsyncs () ->
72- increment_counter (server_fullsyncs ).
74+ increment_counter (server_fullsyncs ).
7375
7476objects_dropped_no_clients () ->
7577 increment_counter (objects_dropped_no_clients ).
@@ -78,91 +80,99 @@ objects_dropped_no_leader() ->
7880 increment_counter (objects_dropped_no_leader ).
7981
8082objects_sent () ->
81- increment_counter (objects_sent ).
83+ increment_counter (objects_sent ).
8284
8385objects_forwarded () ->
84- increment_counter (objects_forwarded ).
86+ increment_counter (objects_forwarded ).
8587
8688elections_elected () ->
87- increment_counter (elections_elected ).
89+ increment_counter (elections_elected ).
8890
8991elections_leader_changed () ->
90- increment_counter (elections_leader_changed ).
91-
92- init ([]) ->
93- T = ets :new (? MODULE , [public , named_table , set , {write_concurrency , true }]),
94- [ets :insert (T , {Stat , 0 }) || Stat <- [server_bytes_sent ,
95- server_bytes_recv ,
96- server_connects ,
97- server_connect_errors ,
98- server_fullsyncs ,
99- client_bytes_sent ,
100- client_bytes_recv ,
101- client_connects ,
102- client_connect_errors ,
103- client_redirect ,
104- objects_dropped_no_clients ,
105- objects_dropped_no_leader ,
106- objects_sent ,
107- objects_forwarded ,
108- elections_elected ,
109- elections_leader_changed ]],
110- [ets :insert (T , {Stat , []}) || Stat <- [client_rx_kbps ,
111- client_tx_kbps ,
112- server_rx_kbps ,
113- server_tx_kbps ]],
114- schedule_report_bw (),
115- {ok , # state {t = T ,last_report = now ()}}.
92+ increment_counter (elections_leader_changed ).
11693
117- add_counter (Name ) ->
118- add_counter (Name , 0 ).
94+ get_stats () ->
95+ lists :flatten ([backwards_compat (Stat , Type ) ||
96+ {Stat , Type } <- stats ()]).
11997
120- add_counter (Name , InitVal ) when is_atom (Name ) andalso is_integer (InitVal ) ->
121- gen_server :call (? MODULE , {add_counter , Name , InitVal }, infinity ).
98+ init ([]) ->
99+ schedule_report_bw (),
100+ {ok , ok }.
101+
102+ register_stat (Name , counter ) ->
103+ folsom_metrics :new_counter ({? APP , Name });
104+ register_stat (Name , history ) ->
105+ BwHistoryLen = get_bw_history_len (),
106+ folsom_metrics :new_history ({? APP , Name }, BwHistoryLen );
107+ register_stat (Name , gauge ) ->
108+ folsom_metrics :new_gauge ({? APP , Name }).
109+
110+ stats () ->
111+ [{server_bytes_sent , counter },
112+ {server_bytes_recv , counter },
113+ {server_connects , counter },
114+ {server_connect_errors , counter },
115+ {server_fullsyncs , counter },
116+ {client_bytes_sent , counter },
117+ {client_bytes_recv , counter },
118+ {client_connects , counter },
119+ {client_connect_errors , counter },
120+ {client_redirect , counter },
121+ {objects_dropped_no_clients , counter },
122+ {objects_dropped_no_leader , counter },
123+ {objects_sent , counter },
124+ {objects_forwarded , counter },
125+ {elections_elected , counter },
126+ {elections_leader_changed , counter },
127+ {client_rx_kbps , history },
128+ {client_tx_kbps , history },
129+ {server_rx_kbps , history },
130+ {server_tx_kbps , history },
131+ {last_report , gauge },
132+ {last_client_bytes_sent , gauge },
133+ {last_client_bytes_recv , gauge },
134+ {last_server_bytes_sent , gauge },
135+ {last_server_bytes_recv , gauge }].
122136
123137increment_counter (Name ) ->
124138 increment_counter (Name , 1 ).
125139
126140increment_counter (Name , IncrBy ) when is_atom (Name ) andalso is_integer (IncrBy ) ->
127- % gen_server:cast(?MODULE, {increment_counter, Name, IncrBy}).
128- catch ets :update_counter (? MODULE , Name , IncrBy ).
141+ folsom_metrics :notify_existing_metric ({? APP , Name }, {inc , IncrBy }, counter ).
129142
130- handle_call ({add_counter , Name , InitVal }, _From , State = # state {t = T }) ->
131- ets :insert (T , {Name , InitVal }),
143+ handle_call (_Req , _From , State ) ->
132144 {reply , ok , State }.
133- handle_cast ({ increment_counter , Name , IncrBy }, State = # state { t = T }) ->
134- catch ets : update_counter ( T , Name , IncrBy ),
145+
146+ handle_cast ( _Msg , State ) ->
135147 {noreply , State }.
136148
137- handle_info (report_bw , State = # state {last_client_bytes_sent = LastClientBytesSent ,
138- last_client_bytes_recv = LastClientBytesRecv ,
139- last_server_bytes_sent = LastServerBytesSent ,
140- last_server_bytes_recv = LastServerBytesRecv }) ->
149+ handle_info (report_bw , State ) ->
141150 ThisClientBytesSent = lookup_stat (client_bytes_sent ),
142151 ThisClientBytesRecv = lookup_stat (client_bytes_recv ),
143152 ThisServerBytesSent = lookup_stat (server_bytes_sent ),
144153 ThisServerBytesRecv = lookup_stat (server_bytes_recv ),
145154
146155 Now = now (),
147- DeltaSecs = now_diff (Now , State # state .last_report ),
148- ClientTx = bytes_to_kbits_per_sec (ThisClientBytesSent , LastClientBytesSent , DeltaSecs ),
149- ClientRx = bytes_to_kbits_per_sec (ThisClientBytesRecv , LastClientBytesRecv , DeltaSecs ),
150- ServerTx = bytes_to_kbits_per_sec (ThisServerBytesSent , LastServerBytesSent , DeltaSecs ),
151- ServerRx = bytes_to_kbits_per_sec (ThisServerBytesRecv , LastServerBytesRecv , DeltaSecs ),
152-
153- BwHistoryLen = app_helper :get_env (riak_repl , bw_history_len , 8 ),
154-
155- update_list (client_tx_kbps , ClientTx , BwHistoryLen , State # state .t ),
156- update_list (client_rx_kbps , ClientRx , BwHistoryLen , State # state .t ),
157- update_list (server_tx_kbps , ServerTx , BwHistoryLen , State # state .t ),
158- update_list (server_rx_kbps , ServerRx , BwHistoryLen , State # state .t ),
159-
156+ DeltaSecs = now_diff (Now , lookup_stat (last_report )),
157+ ClientTx = bytes_to_kbits_per_sec (ThisClientBytesSent , lookup_stat (last_client_bytes_sent ), DeltaSecs ),
158+ ClientRx = bytes_to_kbits_per_sec (ThisClientBytesRecv , lookup_stat (last_client_bytes_recv ), DeltaSecs ),
159+ ServerTx = bytes_to_kbits_per_sec (ThisServerBytesSent , lookup_stat (last_server_bytes_sent ), DeltaSecs ),
160+ ServerRx = bytes_to_kbits_per_sec (ThisServerBytesRecv , lookup_stat (last_server_bytes_recv ), DeltaSecs ),
161+
162+ [folsom_metrics :notify_existing_metric ({? APP , Metric }, Reading , history )
163+ || {Metric , Reading } <- [{client_tx_kbps , ClientTx },
164+ {client_rx_kbps , ClientRx },
165+ {server_tx_kbps , ServerTx },
166+ {server_rx_kbps , ServerRx }]],
167+
168+ [folsom_metrics :notify_existing_metric ({? APP , Metric }, Reading , gauge )
169+ || {Metric , Reading } <- [{last_client_bytes_sent , ThisClientBytesSent },
170+ {last_client_bytes_recv , ThisClientBytesRecv },
171+ {last_server_bytes_sent , ThisServerBytesSent },
172+ {last_server_bytes_recv , ThisServerBytesRecv }]],
173+
160174 schedule_report_bw (),
161- {noreply , State # state {last_report = Now ,
162- last_client_bytes_sent = ThisClientBytesSent ,
163- last_client_bytes_recv = ThisClientBytesRecv ,
164- last_server_bytes_sent = ThisServerBytesSent ,
165- last_server_bytes_recv = ThisServerBytesRecv }};
175+ {noreply , State };
166176handle_info (_Info , State ) -> {noreply , State }.
167177
168178terminate (_Reason , _State ) -> ok .
@@ -171,21 +181,90 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
171181schedule_report_bw () ->
172182 BwHistoryInterval = app_helper :get_env (riak_repl , bw_history_interval , 60000 ),
173183 timer :send_after (BwHistoryInterval , report_bw ).
174-
175184
176185% % Convert two values in bytes to a kbits/sec
177186bytes_to_kbits_per_sec (This , Last , Delta ) ->
178187 trunc ((This - Last ) / (128 * Delta )). % % x8/1024 = x/128
179188
180- update_list (Name , Entry , MaxLen , Tid ) ->
181- Current = lookup_stat (Name ),
182- Updated = [Entry | lists :sublist (Current , MaxLen - 1 )],
183- ets :insert (Tid , {Name , Updated }).
184-
185189lookup_stat (Name ) ->
186- [{Name ,Val }]= ets :lookup (? MODULE , Name ),
187- Val .
190+ folsom_metrics :get_metric_value ({? APP , Name }).
188191
189192now_diff ({Lmega ,Lsecs ,_Lmicro }, {Emega ,Esecs ,_Emicro }) ->
190193 1000000 * (Lmega - Emega )+ (Lsecs - Esecs ).
191-
194+
195+ get_bw_history_len () ->
196+ app_helper :get_env (riak_repl , bw_history_len , 8 ).
197+
198+ backwards_compat (Name , history ) ->
199+ Stats = folsom_metrics :get_history_values ({? APP , Name }, get_bw_history_len ()),
200+ Readings = [[Reading || {event , Reading } <- Events ] || {_Moment , Events } <- Stats ],
201+ {Name , Readings };
202+ backwards_compat (_Name , gauge ) ->
203+ [];
204+ backwards_compat (Name , _Type ) ->
205+ {Name , lookup_stat (Name )}.
206+
207+ -ifdef (TEST ).
208+
209+ repl_stats_test_ () ->
210+ {setup , fun () ->
211+ folsom :start (),
212+ {ok , Pid } = riak_repl_stats :start_link (),
213+ Pid end ,
214+ fun (Pid ) ->
215+ folsom :stop (),
216+ exit (Pid , kill ) end ,
217+ [{" Register stats" , fun test_register_stats /0 },
218+ {" Populate stats" , fun test_populate_stats /0 },
219+ {" Check stats" , fun test_check_stats /0 }]
220+ }.
221+
222+ test_register_stats () ->
223+ register_stats (),
224+ RegisteredReplStats = [Stat || {App , Stat } <- folsom_metrics :get_metrics (),
225+ App == riak_repl ],
226+ {Stats , _Types } = lists :unzip (stats ()),
227+ ? assertEqual (lists :sort (Stats ), lists :sort (RegisteredReplStats )).
228+
229+ test_populate_stats () ->
230+ Bytes = 1000 ,
231+ ok = client_bytes_sent (Bytes ),
232+ ok = client_bytes_recv (Bytes ),
233+ ok = client_connects (),
234+ ok = client_connect_errors (),
235+ ok = client_redirect (),
236+ ok = server_bytes_sent (Bytes ),
237+ ok = server_bytes_recv (Bytes ),
238+ ok = server_connects (),
239+ ok = server_connect_errors (),
240+ ok = server_fullsyncs (),
241+ ok = objects_dropped_no_clients (),
242+ ok = objects_dropped_no_leader (),
243+ ok = objects_sent (),
244+ ok = objects_forwarded (),
245+ ok = elections_elected (),
246+ ok = elections_leader_changed ().
247+
248+ test_check_stats () ->
249+ ? assertEqual ([{server_bytes_sent ,1000 },
250+ {server_bytes_recv ,1000 },
251+ {server_connects ,1 },
252+ {server_connect_errors ,1 },
253+ {server_fullsyncs ,1 },
254+ {client_bytes_sent ,1000 },
255+ {client_bytes_recv ,1000 },
256+ {client_connects ,1 },
257+ {client_connect_errors ,1 },
258+ {client_redirect ,1 },
259+ {objects_dropped_no_clients ,1 },
260+ {objects_dropped_no_leader ,1 },
261+ {objects_sent ,1 },
262+ {objects_forwarded ,1 },
263+ {elections_elected ,1 },
264+ {elections_leader_changed ,1 },
265+ {client_rx_kbps ,[]},
266+ {client_tx_kbps ,[]},
267+ {server_rx_kbps ,[]},
268+ {server_tx_kbps ,[]}], get_stats ()).
269+
270+ -endif .
0 commit comments