99#include " base/notification.h"
1010#include " log/etcd_consistent_store.h"
1111#include " log/logged_certificate.h"
12+ #include " monitoring/event_metric.h"
1213#include " monitoring/monitoring.h"
1314#include " monitoring/latency.h"
1415#include " util/etcd_delete.h"
1516#include " util/executor.h"
1617#include " util/masterelection.h"
1718#include " util/util.h"
1819
20+ DECLARE_int32 (etcd_stats_collection_interval_seconds);
21+
1922DECLARE_int32 (node_state_ttl_seconds);
2023
2124namespace cert_trans {
@@ -33,6 +36,20 @@ static Gauge<std::string>* etcd_total_entries =
3336 Gauge<std::string>::New(" etcd_total_entries" , " type" ,
3437 " Total number of entries in etcd by type." );
3538
39+ static Gauge<std::string>* etcd_store_stats =
40+ Gauge<std::string>::New(" etcd_store_stats" , " name" ,
41+ " Re-export of etcd's store stats." );
42+
43+ static EventMetric<std::string> etcd_throttle_delay_ms (
44+ " etcd_throttle_delay_ms" , " type" ,
45+ " Count and total thottle delay applied to requests, broken down by "
46+ " request type" );
47+
48+ static Counter<std::string>* etcd_rejected_requests =
49+ Counter<std::string>::New(" etcd_rejected_requests" , " type" ,
50+ " Total number of requests rejected due to "
51+ " overload, broken down by request type." );
52+
3653static Latency<std::chrono::milliseconds, std::string> etcd_latency_by_op_ms (
3754 " etcd_latency_by_op_ms" , " operation" ,
3855 " Etcd latency in ms broken down by operation." );
@@ -49,27 +66,74 @@ void CheckMappingIsOrdered(const ct::SequenceMapping& mapping) {
4966}
5067
5168
69+ util::StatusOr<int64_t > GetStat (const std::map<std::string, int64_t >& stats,
70+ const std::string& name) {
71+ const auto & it (stats.find (name));
72+ if (it == stats.end ()) {
73+ return util::Status (util::error::FAILED_PRECONDITION, name + " missing." );
74+ }
75+ return it->second ;
76+ }
77+
78+
79+ util::StatusOr<int64_t > CalculateNumEtcdEntries (
80+ const std::map<std::string, int64_t >& stats) {
81+ util::StatusOr<int64_t > created (GetStat (stats, " createSuccess" ));
82+ if (!created.ok ()) {
83+ return created;
84+ }
85+
86+ util::StatusOr<int64_t > deleted (GetStat (stats, " deleteSuccess" ));
87+ if (!deleted.ok ()) {
88+ return deleted;
89+ }
90+
91+ util::StatusOr<int64_t > compareDeleted (
92+ GetStat (stats, " compareAndDeleteSuccess" ));
93+ if (!compareDeleted.ok ()) {
94+ return compareDeleted;
95+ }
96+ util::StatusOr<int64_t > expired (GetStat (stats, " expireCount" ));
97+ if (!expired.ok ()) {
98+ return expired;
99+ }
100+
101+ const int64_t num_removed (deleted.ValueOrDie () +
102+ compareDeleted.ValueOrDie () +
103+ expired.ValueOrDie ());
104+ return created.ValueOrDie () - num_removed;
105+ }
106+
52107} // namespace
53108
54109
55110template <class Logged >
56111EtcdConsistentStore<Logged>::EtcdConsistentStore(
57- util::Executor* executor, EtcdClient* client,
112+ libevent::Base* base, util::Executor* executor, EtcdClient* client,
58113 const MasterElection* election, const std::string& root,
59114 const std::string& node_id)
60115 : client_(CHECK_NOTNULL(client)),
116+ base_ (CHECK_NOTNULL(base)),
61117 executor_(CHECK_NOTNULL(executor)),
62118 election_(CHECK_NOTNULL(election)),
63119 root_(root),
64120 node_id_(node_id),
65121 serving_sth_watch_task_(CHECK_NOTNULL(executor)),
122+ cluster_config_watch_task_(CHECK_NOTNULL(executor)),
123+ etcd_stats_task_(executor_),
66124 received_initial_sth_(false ),
67125 exiting_(false ) {
68126 // Set up watches on things we're interested in...
69127 WatchServingSTH (
70128 std::bind (&EtcdConsistentStore<Logged>::OnEtcdServingSTHUpdated, this ,
71129 std::placeholders::_1),
72130 serving_sth_watch_task_.task ());
131+ WatchClusterConfig (
132+ std::bind (&EtcdConsistentStore<Logged>::OnClusterConfigUpdated, this ,
133+ std::placeholders::_1),
134+ cluster_config_watch_task_.task ());
135+
136+ StartEtcdStatsFetch ();
73137
74138 // And wait for the initial updates to come back so that we've got a
75139 // view on the current state before proceding...
@@ -84,8 +148,13 @@ template <class Logged>
84148EtcdConsistentStore<Logged>::~EtcdConsistentStore () {
85149 VLOG (1 ) << " Cancelling watch tasks." ;
86150 serving_sth_watch_task_.Cancel ();
151+ cluster_config_watch_task_.Cancel ();
87152 VLOG (1 ) << " Waiting for watch tasks to return." ;
88153 serving_sth_watch_task_.Wait ();
154+ cluster_config_watch_task_.Wait ();
155+ VLOG (1 ) << " Cancelling stats task." ;
156+ etcd_stats_task_.Cancel ();
157+ etcd_stats_task_.Wait ();
89158 VLOG (1 ) << " Joining cleanup thread" ;
90159 {
91160 std::lock_guard<std::mutex> lock (mutex_);
@@ -219,9 +288,15 @@ util::Status EtcdConsistentStore<Logged>::AddPendingEntry(Logged* entry) {
219288
220289 CHECK_NOTNULL (entry);
221290 CHECK (!entry->has_sequence_number ());
291+
292+ util::Status status (MaybeReject (" add_pending_entry" ));
293+ if (!status.ok ()) {
294+ return status;
295+ }
296+
222297 const std::string full_path (GetEntryPath (*entry));
223298 EntryHandle<Logged> handle (full_path, *entry);
224- util::Status status ( CreateEntry (&handle) );
299+ status = CreateEntry (&handle);
225300 if (status.CanonicalCode () == util::error::FAILED_PRECONDITION) {
226301 // Entry with that hash already exists.
227302 EntryHandle<Logged> preexisting_entry;
@@ -690,11 +765,11 @@ void EtcdConsistentStore<Logged>::UpdateLocalServingSTH(
690765template <class Logged >
691766void EtcdConsistentStore<Logged>::OnEtcdServingSTHUpdated(
692767 const Update<ct::SignedTreeHead>& update) {
693- VLOG (1 ) << " Got ServingSTH version " << update.handle_ .Handle () << " : "
694- << update.handle_ .Entry ().DebugString ();
695768 std::unique_lock<std::mutex> lock (mutex_);
696769
697770 if (update.exists_ ) {
771+ VLOG (1 ) << " Got ServingSTH version " << update.handle_ .Handle () << " : "
772+ << update.handle_ .Entry ().DebugString ();
698773 UpdateLocalServingSTH (lock, update.handle_ );
699774 } else {
700775 LOG (WARNING) << " ServingSTH non-existent/deleted." ;
@@ -707,6 +782,21 @@ void EtcdConsistentStore<Logged>::OnEtcdServingSTHUpdated(
707782}
708783
709784
785+ template <class Logged >
786+ void EtcdConsistentStore<Logged>::OnClusterConfigUpdated(
787+ const Update<ct::ClusterConfig>& update) {
788+ if (update.exists_ ) {
789+ VLOG (1 ) << " Got ClusterConfig version " << update.handle_ .Handle () << " : "
790+ << update.handle_ .Entry ().DebugString ();
791+ std::lock_guard<std::mutex> lock (mutex_);
792+ cluster_config_.reset (new ct::ClusterConfig (update.handle_ .Entry ()));
793+ } else {
794+ LOG (WARNING) << " ClusterConfig non-existent/deleted." ;
795+ // TODO(alcutter): What to do here?
796+ }
797+ }
798+
799+
710800template <class Logged >
711801util::StatusOr<int64_t > EtcdConsistentStore<Logged>::CleanupOldEntries() {
712802 ScopedLatency scoped_latency (
@@ -789,6 +879,82 @@ util::StatusOr<int64_t> EtcdConsistentStore<Logged>::CleanupOldEntries() {
789879}
790880
791881
882+ template <class Logged >
883+ void EtcdConsistentStore<Logged>::StartEtcdStatsFetch() {
884+ if (etcd_stats_task_.task ()->CancelRequested ()) {
885+ etcd_stats_task_.task ()->Return (util::Status::CANCELLED);
886+ return ;
887+ }
888+ EtcdClient::StatsResponse* response (new EtcdClient::StatsResponse);
889+ util::Task* stats_task (etcd_stats_task_.task ()->AddChild (
890+ bind (&EtcdConsistentStore<Logged>::EtcdStatsFetchDone, this , response,
891+ std::placeholders::_1)));
892+ client_->GetStoreStats (response, stats_task);
893+ }
894+
895+
896+ template <class Logged >
897+ void EtcdConsistentStore<Logged>::EtcdStatsFetchDone(
898+ EtcdClient::StatsResponse* response, util::Task* task) {
899+ CHECK_NOTNULL (response);
900+ CHECK_NOTNULL (task);
901+ std::unique_ptr<EtcdClient::StatsResponse> response_deleter (response);
902+ if (task->status ().ok ()) {
903+ for (const auto & stat : response->stats ) {
904+ VLOG (2 ) << " etcd stat: " << stat.first << " = " << stat.second ;
905+ etcd_store_stats->Set (stat.first , stat.second );
906+ }
907+ const util::StatusOr<int64_t > num_entries (
908+ CalculateNumEtcdEntries (response->stats ));
909+ if (num_entries.ok ()) {
910+ {
911+ std::lock_guard<std::mutex> lock (mutex_);
912+ num_etcd_entries_ = num_entries.ValueOrDie ();
913+ }
914+ etcd_total_entries->Set (" all" , num_etcd_entries_);
915+ } else {
916+ VLOG (1 ) << " Failed to calculate num_entries: " << num_entries.status ();
917+ }
918+ } else {
919+ LOG (WARNING) << " Etcd stats fetch failed: " << task->status ();
920+ }
921+
922+ base_->Delay (
923+ std::chrono::seconds (FLAGS_etcd_stats_collection_interval_seconds),
924+ etcd_stats_task_.task ()->AddChild (
925+ std::bind (&EtcdConsistentStore<Logged>::StartEtcdStatsFetch, this )));
926+ }
927+
928+ // This method attempts to modulate the incoming traffic in response to the
929+ // number of entries currently in etcd.
930+ //
931+ // Once the number of entries is above reject_threshold, we will start
932+ // returning a RESOURCE_EXHAUSTED status, which should result in a 503 being
933+ // sent to the client.
934+ template <class Logged >
935+ util::Status EtcdConsistentStore<Logged>::MaybeReject(
936+ const std::string& type) const {
937+ std::unique_lock<std::mutex> lock (mutex_);
938+
939+ if (!cluster_config_) {
940+ // No config, whatever.
941+ return util::Status::OK;
942+ }
943+
944+ const int64_t etcd_size (num_etcd_entries_);
945+ const int64_t reject_threshold (
946+ cluster_config_->etcd_reject_add_pending_threshold ());
947+ lock.unlock ();
948+
949+ if (etcd_size >= reject_threshold) {
950+ etcd_rejected_requests->Increment (type);
951+ return util::Status (util::error::RESOURCE_EXHAUSTED,
952+ " Rejected due to high number of pending entries." );
953+ }
954+ return util::Status::OK;
955+ }
956+
957+
792958} // namespace cert_trans
793959
794960#endif // CERT_TRANS_LOG_ETCD_CONSISTENT_STORE_INL_H_
0 commit comments