Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/include/opentelemetry/metrics/observer_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ class ObserverResult
public:
virtual void Observe(T value) noexcept = 0;

virtual void Observer(T value, const common::KeyValueIterable &attributes) noexcept = 0;
virtual void Observe(T value, const common::KeyValueIterable &attributes) noexcept = 0;

template <class U,
nostd::enable_if_t<common::detail::is_key_value_iterable<U>::value> * = nullptr>
void Observe(T value, const U &attributes) noexcept
{
this->Observe(value, common::KeyValueIterableView<T>{attributes});
this->Observe(value, common::KeyValueIterableView<U>{attributes});
}

void Observe(T value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,49 @@ class DefaultAggregation
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
};
}

static std::unique_ptr<Aggregation> CreateAggregation(AggregationType aggregation_type,
InstrumentDescriptor instrument_descriptor)
{
switch (aggregation_type)
{
case AggregationType::kDrop:
return std::unique_ptr<Aggregation>(new DropAggregation());
break;
case AggregationType::kHistogram:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongHistogramAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleHistogramAggregation());
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongLastValueAggregation());
}
else
{
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation());
}
break;
case AggregationType::kSum:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
}
else
{
return std::unique_ptr<Aggregation>(new DoubleSumAggregation(true));
}
break;
default:
return DefaultAggregation::CreateAggregation(instrument_descriptor);
}
}
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include <map>
# include "opentelemetry/common/key_value_iterable_view.h"
# include "opentelemetry/sdk/metrics/instruments.h"
# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

# include <map>
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -50,7 +50,7 @@ class DefaultMeasurementProcessor : public MeasurementProcessor
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
metric_storages_[MakeKey(reader)] = std::unique_ptr<SyncMetricStorage>(
new SyncMetricStorage(instr_desc, AggregationType::kSum));
new SyncMetricStorage(instr_desc, AggregationType::kSum, new DefaultAttributesProcessor()));
return true;
}

Expand Down
48 changes: 48 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/observer_result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/key_value_iterable.h"
# include "opentelemetry/metrics/observer_result.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"

# include <map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{
template <class T>
class ObserverResult final : public opentelemetry::metrics::ObserverResult<T>
{
public:
ObserverResult(const AttributesProcessor *attributes_processor)
: attributes_processor_(attributes_processor)
{}

void Observe(T value) noexcept override { data_.insert({{}, value}); }

void Observe(T value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override
{
auto attr = attributes_processor_->process(attributes);
data_.insert({attr, value});
}

const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &GetMeasurements()
{
return data_;
}

private:
std::unordered_map<MetricAttributes, T, AttributeHashGenerator> data_;

const AttributesProcessor *attributes_processor_;
};
} // namespace metrics
} // namespace sdk

OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/common/attributemap_hash.h"
# include "opentelemetry/sdk/metrics/instruments.h"

# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h"
# include "opentelemetry/sdk/resource/resource.h"

# include <memory>
# include "opentelemetry/sdk/metrics/observer_result.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class AsyncMetricStorage : public MetricStorage
{
public:
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
void (*measurement_callback)(opentelemetry::metrics::ObserverResult<T> &),
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
Comment thread
esigo marked this conversation as resolved.
aggregation_type_{aggregation_type},
measurement_collection_callback_{measurement_callback},
attributes_processor_{attributes_processor},
active_attributes_hashmap_(new AttributesHashMap())
{}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData &)> metric_collection_callback) noexcept override
{
opentelemetry::sdk::metrics::ObserverResult<T> ob_res(attributes_processor_);

// read the measurement using configured callback
measurement_collection_callback_(ob_res);

// process the read measurements - aggregate and store in hashmap
for (auto &measurement : ob_res.GetMeasurements())
{
auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
agg->Aggregate(measurement.second);
active_attributes_hashmap_->Set(measurement.first, std::move(agg));
}

// TBD -> read aggregation from hashmap, and perform metric collection
MetricData metric_data;
if (metric_collection_callback(metric_data))
{
return true;
}
return false;
}

private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult<T> &);
const AttributesProcessor *attributes_processor_;
std::unique_ptr<AttributesHashMap> active_attributes_hashmap_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept = 0;
nostd::function_ref<bool(MetricData &)> callback) noexcept = 0;
};

class WritableMetricStorage
Expand All @@ -48,7 +48,7 @@ class NoopMetricStorage : public MetricStorage
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{
MetricData metric_data;
if (callback(metric_data))
Expand Down
74 changes: 14 additions & 60 deletions sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{

public:
SyncMetricStorage(
InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor = new DefaultAttributesProcessor())
SyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor}
{
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return std::move(this->create_aggregation());
return std::move(
DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_));
};
}

Expand All @@ -45,8 +45,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{
return;
}
auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordLong(long value,
Expand All @@ -57,9 +56,8 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value) noexcept override
Expand All @@ -69,8 +67,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_);
aggregation->Aggregate(value);
attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value);
}

void RecordDouble(double value,
Expand All @@ -81,20 +78,19 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
return;
}

auto attr = attributes_processor_->process(attributes);
auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_);
aggregation->Aggregate(value);
auto attr = attributes_processor_->process(attributes);
attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value);
}

bool Collect(
MetricCollector *collector,
nostd::span<MetricCollector *> collectors,
opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library,
opentelemetry::sdk::resource::Resource *resource,
nostd::function_ref<bool(MetricData)> callback) noexcept override
nostd::function_ref<bool(MetricData &)> callback) noexcept override
{

if (callback(MetricData()))
MetricData metric_data;
if (callback(metric_data))
{
return true;
}
Expand All @@ -107,48 +103,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
std::unique_ptr<AttributesHashMap> attributes_hashmap_;
const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;

std::unique_ptr<Aggregation> create_aggregation()
{
switch (aggregation_type_)
{
case AggregationType::kDrop:
return std::move(std::unique_ptr<Aggregation>(new DropAggregation()));
break;
case AggregationType::kHistogram:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongHistogramAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleHistogramAggregation()));
}
break;
case AggregationType::kLastValue:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongLastValueAggregation()));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleLastValueAggregation()));
}
break;
case AggregationType::kSum:
if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong)
{
return std::move(std::unique_ptr<Aggregation>(new LongSumAggregation(true)));
}
else
{
return std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation(true)));
}
break;
default:
return std::move(DefaultAggregation::CreateAggregation(instrument_descriptor_));
}
}
};

} // namespace metrics
Expand Down
Loading