Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 96 additions & 18 deletions src/datadog/config_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,81 @@

namespace datadog {
namespace tracing {
namespace {

using Rules =
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>;

Expected<Rules> parse_trace_sampling_rules(const nlohmann::json& json_rules) {
Rules parsed_rules;

std::string type = json_rules.type_name();
if (type != "array") {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_WRONG_TYPE, std::move(message)};
}

for (const auto& json_rule : json_rules) {
auto matcher = SpanMatcher::from_json(json_rule);
if (auto* error = matcher.if_error()) {
std::string prefix;
return error->with_prefix(prefix);
}

TraceSamplerRate rate;
if (auto sample_rate = json_rule.find("sample_rate");
sample_rate != json_rule.end()) {
type = sample_rate->type_name();
if (type != "number") {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_SAMPLE_RATE_WRONG_TYPE,
std::move(message)};
}

auto maybe_rate = Rate::from(*sample_rate);
if (auto error = maybe_rate.if_error()) {
return *error;
}

rate.value = *maybe_rate;
}

if (auto provenance_it = json_rule.find("provenance");
provenance_it != json_rule.cend()) {
if (!provenance_it->is_string()) {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_SAMPLE_RATE_WRONG_TYPE,
std::move(message)};
}

auto provenance = provenance_it->get<std::string_view>();
if (provenance == "customer") {
rate.mechanism = SamplingMechanism::REMOTE_RULE;
} else if (provenance == "dynamic") {
rate.mechanism = SamplingMechanism::REMOTE_ADAPTIVE_RULE;
}
}

parsed_rules.emplace(std::move(*matcher), std::move(rate));
}

return parsed_rules;
}

} // namespace

ConfigManager::ConfigManager(const FinalizedTracerConfig& config)
: clock_(config.clock),
default_metadata_(config.metadata),
trace_sampler_(
std::make_shared<TraceSampler>(config.trace_sampler, clock_)),
rules_(config.trace_sampler.rules),
span_defaults_(std::make_shared<SpanDefaults>(config.defaults)),
report_traces_(config.report_traces) {}

std::shared_ptr<TraceSampler> ConfigManager::trace_sampler() {
std::lock_guard<std::mutex> lock(mutex_);
return trace_sampler_.value();
return trace_sampler_;
}

std::shared_ptr<const SpanDefaults> ConfigManager::span_defaults() {
Expand All @@ -35,32 +98,48 @@ std::vector<ConfigMetadata> ConfigManager::update(const ConfigUpdate& conf) {

std::lock_guard<std::mutex> lock(mutex_);

decltype(rules_) rules;

if (!conf.trace_sampling_rate) {
reset_config(ConfigName::TRACE_SAMPLING_RATE, trace_sampler_, metadata);
auto found = default_metadata_.find(ConfigName::TRACE_SAMPLING_RATE);
if (found != default_metadata_.cend()) {
metadata.push_back(found->second);
}
} else {
ConfigMetadata trace_sampling_metadata(
ConfigName::TRACE_SAMPLING_RATE,
to_string(*conf.trace_sampling_rate, 1),
ConfigMetadata::Origin::REMOTE_CONFIG);

TraceSamplerConfig trace_sampler_cfg;
trace_sampler_cfg.sample_rate = *conf.trace_sampling_rate;
auto rate = Rate::from(*conf.trace_sampling_rate);
rules[catch_all] = TraceSamplerRate{*rate, SamplingMechanism::RULE};

metadata.emplace_back(std::move(trace_sampling_metadata));
}

auto finalized_trace_sampler_cfg = finalize_config(trace_sampler_cfg);
if (auto error = finalized_trace_sampler_cfg.if_error()) {
trace_sampling_metadata.error = *error;
if (!conf.trace_sampling_rules) {
auto found = default_metadata_.find(ConfigName::TRACE_SAMPLING_RULES);
if (found != default_metadata_.cend()) {
metadata.emplace_back(found->second);
}
} else {
ConfigMetadata trace_sampling_rules_metadata(
ConfigName::TRACE_SAMPLING_RULES, conf.trace_sampling_rules->dump(),
Comment thread
dmehala marked this conversation as resolved.
ConfigMetadata::Origin::REMOTE_CONFIG);

auto trace_sampler =
std::make_shared<TraceSampler>(*finalized_trace_sampler_cfg, clock_);
auto maybe_rules = parse_trace_sampling_rules(*conf.trace_sampling_rules);
if (auto error = maybe_rules.if_error()) {
trace_sampling_rules_metadata.error = std::move(*error);
} else {
rules.merge(*maybe_rules);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at the docs for this and if I'm understanding correctly, items already in rules will not be overriden by those in maybe_rules. Is that the intended behavior?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, good catch.

}

// This reset rate limiting and `TraceSampler` has no `operator==`.
// TODO: Instead of creating another `TraceSampler`, we should
// update the default sampling rate.
trace_sampler_ = std::move(trace_sampler);
metadata.emplace_back(std::move(trace_sampling_metadata));
metadata.emplace_back(std::move(trace_sampling_rules_metadata));
}

rules.insert(rules_.cbegin(), rules_.cend());
trace_sampler_->set_rules(rules);

if (!conf.tags) {
reset_config(ConfigName::TAGS, span_defaults_, metadata);
} else {
Expand Down Expand Up @@ -109,10 +188,9 @@ std::vector<ConfigMetadata> ConfigManager::reset() { return update({}); }

nlohmann::json ConfigManager::config_json() const {
std::lock_guard<std::mutex> lock(mutex_);
return nlohmann::json{
{"defaults", to_json(*span_defaults_.value())},
{"trace_sampler", trace_sampler_.value()->config_json()},
{"report_traces", report_traces_.value()}};
return nlohmann::json{{"defaults", to_json(*span_defaults_.value())},
{"trace_sampler", trace_sampler_->config_json()},
{"report_traces", report_traces_.value()}};
}

} // namespace tracing
Expand Down
4 changes: 3 additions & 1 deletion src/datadog/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class ConfigManager {
Clock clock_;
std::unordered_map<ConfigName, ConfigMetadata> default_metadata_;

DynamicConfig<std::shared_ptr<TraceSampler>> trace_sampler_;
std::shared_ptr<TraceSampler> trace_sampler_;
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash> rules_;

DynamicConfig<std::shared_ptr<const SpanDefaults>> span_defaults_;
DynamicConfig<bool> report_traces_;

Expand Down
1 change: 1 addition & 0 deletions src/datadog/config_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct ConfigUpdate {
Optional<bool> report_traces;
Optional<double> trace_sampling_rate;
Optional<std::vector<StringView>> tags;
const nlohmann::json* trace_sampling_rules = nullptr;
};

} // namespace tracing
Expand Down
11 changes: 6 additions & 5 deletions src/datadog/datadog_agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Expected<DatadogAgentConfig> load_datadog_agent_env_config() {

if (auto raw_rc_poll_interval_value =
lookup(environment::DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS)) {
auto res = parse_int(*raw_rc_poll_interval_value, 10);
auto res = parse_double(*raw_rc_poll_interval_value);
if (auto error = res.if_error()) {
return error->with_prefix(
"DatadogAgent: Remote Configuration poll interval error ");
Expand Down Expand Up @@ -114,12 +114,13 @@ Expected<FinalizedDatadogAgentConfig> finalize_config(
"milliseconds."};
}

if (int rc_poll_interval_seconds =
if (double rc_poll_interval_seconds =
value_or(env_config->remote_configuration_poll_interval_seconds,
user_config.remote_configuration_poll_interval_seconds, 5);
rc_poll_interval_seconds > 0) {
user_config.remote_configuration_poll_interval_seconds, 5.0);
rc_poll_interval_seconds >= 0.0) {
result.remote_configuration_poll_interval =
std::chrono::seconds(rc_poll_interval_seconds);
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::duration<double>(rc_poll_interval_seconds));
} else {
return Error{Error::DATADOG_AGENT_INVALID_REMOTE_CONFIG_POLL_INTERVAL,
"DatadogAgent: Remote Configuration poll interval must be a "
Expand Down
2 changes: 1 addition & 1 deletion src/datadog/datadog_agent_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct DatadogAgentConfig {
Optional<bool> remote_configuration_enabled;
// How often, in seconds, to query the Datadog Agent for remote configuration
// updates.
Optional<int> remote_configuration_poll_interval_seconds;
Optional<double> remote_configuration_poll_interval_seconds;

static Expected<HTTPClient::URL> parse(StringView);
};
Expand Down
11 changes: 9 additions & 2 deletions src/datadog/remote_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace {
enum CapabilitiesFlag : uint64_t {
APM_TRACING_SAMPLE_RATE = 1 << 12,
APM_TRACING_TAGS = 1 << 15,
APM_TRACING_ENABLED = 1 << 19
APM_TRACING_ENABLED = 1 << 19,
APM_TRACING_SAMPLE_RULES = 1 << 29,
};

constexpr std::array<uint8_t, sizeof(uint64_t)> capabilities_byte_array(
Expand All @@ -46,7 +47,7 @@ constexpr std::array<uint8_t, sizeof(uint64_t)> capabilities_byte_array(

constexpr std::array<uint8_t, sizeof(uint64_t)> k_apm_capabilities =
capabilities_byte_array(APM_TRACING_SAMPLE_RATE | APM_TRACING_TAGS |
APM_TRACING_ENABLED);
APM_TRACING_ENABLED | APM_TRACING_SAMPLE_RULES);

constexpr StringView k_apm_product = "APM_TRACING";
constexpr StringView k_apm_product_path_substring = "/APM_TRACING/";
Expand All @@ -69,6 +70,12 @@ ConfigUpdate parse_dynamic_config(const nlohmann::json& j) {
config_update.report_traces = tracing_enabled_it->get<bool>();
}

if (auto tracing_sampling_rules_it = j.find("tracing_sampling_rules");
tracing_sampling_rules_it != j.cend() &&
tracing_sampling_rules_it->is_array()) {
config_update.trace_sampling_rules = &(*tracing_sampling_rules_it);
}

return config_update;
}

Expand Down
7 changes: 7 additions & 0 deletions src/datadog/sampling_mechanism.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ enum class SamplingMechanism {
// Individual span kept by a matching span sampling rule when the enclosing
// trace was dropped.
SPAN_RULE = 8,
// Reserved for future use.
OTLP_RULE = 9,
// Sampling rule configured by user via remote configuration.
REMOTE_RULE = 11,
// Adaptive sampling rule automatically computed by Datadog backend and sent
// via remote configuration.
REMOTE_ADAPTIVE_RULE = 12,
};

} // namespace tracing
Expand Down
16 changes: 16 additions & 0 deletions src/datadog/span_matcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,23 @@ struct SpanMatcher {
nlohmann::json to_json() const;

static Expected<SpanMatcher> from_json(const nlohmann::json&);

bool operator==(const SpanMatcher& other) const {
return (service == other.service && name == other.name &&
resource == other.resource && tags == other.tags);
}

// TODO: add tags
struct Hash {
size_t operator()(const SpanMatcher& rule) const {
return std::hash<std::string>()(rule.service) ^
(std::hash<std::string>()(rule.name) << 1) ^
(std::hash<std::string>()(rule.resource) << 2);
}
};
};

static const SpanMatcher catch_all;

} // namespace tracing
} // namespace datadog
7 changes: 6 additions & 1 deletion src/datadog/span_sampler_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ namespace {
std::string to_string(const std::vector<SpanSamplerConfig::Rule> &rules) {
nlohmann::json res;
for (const auto &r : rules) {
res.emplace_back(r.to_json());
auto j = r.to_json();
j["sample_rate"] = r.sample_rate;
if (r.max_per_second) {
j["max_per_second"] = *r.max_per_second;
}
res.emplace_back(std::move(j));
Comment on lines +19 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to know, as they are being set in to_json, why is it necessary to set them again?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_json is not overloaded by SpanSamplerConfig::Rule, meaning SpanMatcher::to_json is called instead.

}

return res.dump();
Expand Down
27 changes: 18 additions & 9 deletions src/datadog/trace_sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,32 @@ TraceSampler::TraceSampler(const FinalizedTraceSamplerConfig& config,
limiter_(clock, config.max_per_second),
limiter_max_per_second_(config.max_per_second) {}

void TraceSampler::set_rules(
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>
rules) {
std::lock_guard lock(mutex_);
rules_ = std::move(rules);
}

SamplingDecision TraceSampler::decide(const SpanData& span) {
SamplingDecision decision;
decision.origin = SamplingDecision::Origin::LOCAL;

// First check sampling rules.
auto found_rule =
std::find_if(rules_.begin(), rules_.end(),
[&](const auto& rule) { return rule.match(span); });
const auto found_rule =
std::find_if(rules_.cbegin(), rules_.cend(),
[&](const auto& it) { return it.first.match(span); });

// `mutex_` protects `limiter_`, `collector_sample_rates_`, and
// `collector_default_sample_rate_`, so let's lock it here.
std::lock_guard lock(mutex_);

if (found_rule != rules_.end()) {
const auto& rule = *found_rule;
decision.mechanism = int(SamplingMechanism::RULE);
const auto& [rule, rate] = *found_rule;
decision.mechanism = int(rate.mechanism);
decision.limiter_max_per_second = limiter_max_per_second_;
decision.configured_rate = rule.sample_rate;
const std::uint64_t threshold = max_id_from_rate(rule.sample_rate);
decision.configured_rate = rate.value;
const std::uint64_t threshold = max_id_from_rate(rate.value);
if (knuth_hash(span.trace_id.low) < threshold) {
const auto result = limiter_.allow();
if (result.allowed) {
Expand Down Expand Up @@ -99,8 +106,10 @@ void TraceSampler::handle_collector_response(

nlohmann::json TraceSampler::config_json() const {
std::vector<nlohmann::json> rules;
for (const auto& rule : rules_) {
rules.push_back(to_json(rule));
for (const auto& [rule, rate] : rules_) {
nlohmann::json j = rule.to_json();
j["sampling_rate"] = rate.value.value();
rules.push_back(std::move(j));
}

return nlohmann::json::object({
Expand Down
8 changes: 6 additions & 2 deletions src/datadog/trace_sampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,22 @@ struct SamplingDecision;
struct SpanData;

class TraceSampler {
private:
std::mutex mutex_;

Optional<Rate> collector_default_sample_rate_;
std::unordered_map<std::string, Rate> collector_sample_rates_;

std::vector<FinalizedTraceSamplerConfig::Rule> rules_;
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash> rules_;
Limiter limiter_;
double limiter_max_per_second_;

public:
TraceSampler(const FinalizedTraceSamplerConfig& config, const Clock& clock);

void set_rules(
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>
rules);

// Return a sampling decision for the specified root span.
SamplingDecision decide(const SpanData&);

Expand Down
Loading