diff --git a/api/include/opentelemetry/metrics/observer_result.h b/api/include/opentelemetry/metrics/observer_result.h index 355b5a4a31..84893deef0 100644 --- a/api/include/opentelemetry/metrics/observer_result.h +++ b/api/include/opentelemetry/metrics/observer_result.h @@ -26,13 +26,13 @@ class ObserverResult public: virtual void Observe(T value) noexcept = 0; - virtual void Observer(T value, const common::KeyValueIterable &attributes) noexcept = 0; + virtual void Observe(T value, const common::KeyValueIterable &attributes) noexcept = 0; template ::value> * = nullptr> void Observe(T value, const U &attributes) noexcept { - this->Observe(value, common::KeyValueIterableView{attributes}); + this->Observe(value, common::KeyValueIterableView{attributes}); } void Observe(T value, diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index cad43d7aa5..f193864926 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -47,6 +47,49 @@ class DefaultAggregation return std::move(std::unique_ptr(new DropAggregation())); }; } + + static std::unique_ptr CreateAggregation(AggregationType aggregation_type, + InstrumentDescriptor instrument_descriptor) + { + switch (aggregation_type) + { + case AggregationType::kDrop: + return std::unique_ptr(new DropAggregation()); + break; + case AggregationType::kHistogram: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr(new LongHistogramAggregation()); + } + else + { + return std::unique_ptr(new DoubleHistogramAggregation()); + } + break; + case AggregationType::kLastValue: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr(new LongLastValueAggregation()); + } + else + { + return std::unique_ptr(new DoubleLastValueAggregation()); + } + break; + case AggregationType::kSum: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr(new LongSumAggregation(true)); + } + else + { + return std::unique_ptr(new DoubleSumAggregation(true)); + } + break; + default: + return DefaultAggregation::CreateAggregation(instrument_descriptor); + } + } }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/measurement_processor.h b/sdk/include/opentelemetry/sdk/metrics/measurement_processor.h index 7713848cf8..ffd47c5c92 100644 --- a/sdk/include/opentelemetry/sdk/metrics/measurement_processor.h +++ b/sdk/include/opentelemetry/sdk/metrics/measurement_processor.h @@ -4,12 +4,12 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW +# include # include "opentelemetry/common/key_value_iterable_view.h" # include "opentelemetry/sdk/metrics/instruments.h" # include "opentelemetry/sdk/metrics/metric_reader.h" # include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" - -# include +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -50,7 +50,7 @@ class DefaultMeasurementProcessor : public MeasurementProcessor InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; metric_storages_[MakeKey(reader)] = std::unique_ptr( - new SyncMetricStorage(instr_desc, AggregationType::kSum)); + new SyncMetricStorage(instr_desc, AggregationType::kSum, new DefaultAttributesProcessor())); return true; } diff --git a/sdk/include/opentelemetry/sdk/metrics/observer_result.h b/sdk/include/opentelemetry/sdk/metrics/observer_result.h new file mode 100644 index 0000000000..ca7227bc5b --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/observer_result.h @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/common/key_value_iterable.h" +# include "opentelemetry/metrics/observer_result.h" +# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" + +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ +template +class ObserverResult final : public opentelemetry::metrics::ObserverResult +{ +public: + ObserverResult(const AttributesProcessor *attributes_processor) + : attributes_processor_(attributes_processor) + {} + + void Observe(T value) noexcept override { data_.insert({{}, value}); } + + void Observe(T value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override + { + auto attr = attributes_processor_->process(attributes); + data_.insert({attr, value}); + } + + const std::unordered_map &GetMeasurements() + { + return data_; + } + +private: + std::unordered_map data_; + + const AttributesProcessor *attributes_processor_; +}; +} // namespace metrics +} // namespace sdk + +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h new file mode 100644 index 0000000000..d5491e91cb --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/common/attributemap_hash.h" +# include "opentelemetry/sdk/metrics/instruments.h" + +# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" +# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" +# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/state/metric_storage.h" +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" +# include "opentelemetry/sdk/resource/resource.h" + +# include +# include "opentelemetry/sdk/metrics/observer_result.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +template +class AsyncMetricStorage : public MetricStorage +{ +public: + AsyncMetricStorage(InstrumentDescriptor instrument_descriptor, + const AggregationType aggregation_type, + void (*measurement_callback)(opentelemetry::metrics::ObserverResult &), + const AttributesProcessor *attributes_processor) + : instrument_descriptor_(instrument_descriptor), + aggregation_type_{aggregation_type}, + measurement_collection_callback_{measurement_callback}, + attributes_processor_{attributes_processor}, + active_attributes_hashmap_(new AttributesHashMap()) + {} + + bool Collect( + MetricCollector *collector, + nostd::span collectors, + opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, + opentelemetry::sdk::resource::Resource *resource, + nostd::function_ref metric_collection_callback) noexcept override + { + opentelemetry::sdk::metrics::ObserverResult ob_res(attributes_processor_); + + // read the measurement using configured callback + measurement_collection_callback_(ob_res); + + // process the read measurements - aggregate and store in hashmap + for (auto &measurement : ob_res.GetMeasurements()) + { + auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); + agg->Aggregate(measurement.second); + active_attributes_hashmap_->Set(measurement.first, std::move(agg)); + } + + // TBD -> read aggregation from hashmap, and perform metric collection + MetricData metric_data; + if (metric_collection_callback(metric_data)) + { + return true; + } + return false; + } + +private: + InstrumentDescriptor instrument_descriptor_; + AggregationType aggregation_type_; + void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult &); + const AttributesProcessor *attributes_processor_; + std::unique_ptr active_attributes_hashmap_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h index c2213a701b..fbff04c47f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h @@ -23,7 +23,7 @@ class MetricStorage nostd::span collectors, opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept = 0; + nostd::function_ref callback) noexcept = 0; }; class WritableMetricStorage @@ -48,7 +48,7 @@ class NoopMetricStorage : public MetricStorage nostd::span collectors, opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept override + nostd::function_ref callback) noexcept override { MetricData metric_data; if (callback(metric_data)) diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index ee64c5fba4..e3ee673339 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -25,17 +25,17 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { public: - SyncMetricStorage( - InstrumentDescriptor instrument_descriptor, - const AggregationType aggregation_type, - const AttributesProcessor *attributes_processor = new DefaultAttributesProcessor()) + SyncMetricStorage(InstrumentDescriptor instrument_descriptor, + const AggregationType aggregation_type, + const AttributesProcessor *attributes_processor) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, attributes_hashmap_(new AttributesHashMap()), attributes_processor_{attributes_processor} { create_default_aggregation_ = [&]() -> std::unique_ptr { - return std::move(this->create_aggregation()); + return std::move( + DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_)); }; } @@ -45,8 +45,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage { return; } - auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_); - aggregation->Aggregate(value); + attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } void RecordLong(long value, @@ -57,9 +56,8 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage return; } - auto attr = attributes_processor_->process(attributes); - auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_); - aggregation->Aggregate(value); + auto attr = attributes_processor_->process(attributes); + attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } void RecordDouble(double value) noexcept override @@ -69,8 +67,7 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage return; } - auto aggregation = attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_); - aggregation->Aggregate(value); + attributes_hashmap_->GetOrSetDefault({}, create_default_aggregation_)->Aggregate(value); } void RecordDouble(double value, @@ -81,9 +78,8 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage return; } - auto attr = attributes_processor_->process(attributes); - auto aggregation = attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_); - aggregation->Aggregate(value); + auto attr = attributes_processor_->process(attributes); + attributes_hashmap_->GetOrSetDefault(attr, create_default_aggregation_)->Aggregate(value); } bool Collect( @@ -91,10 +87,10 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage nostd::span collectors, opentelemetry::sdk::instrumentationlibrary::InstrumentationLibrary *instrumentation_library, opentelemetry::sdk::resource::Resource *resource, - nostd::function_ref callback) noexcept override + nostd::function_ref callback) noexcept override { - - if (callback(MetricData())) + MetricData metric_data; + if (callback(metric_data)) { return true; } @@ -107,48 +103,6 @@ class SyncMetricStorage : public MetricStorage, public WritableMetricStorage std::unique_ptr attributes_hashmap_; const AttributesProcessor *attributes_processor_; std::function()> create_default_aggregation_; - - std::unique_ptr create_aggregation() - { - switch (aggregation_type_) - { - case AggregationType::kDrop: - return std::move(std::unique_ptr(new DropAggregation())); - break; - case AggregationType::kHistogram: - if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong) - { - return std::move(std::unique_ptr(new LongHistogramAggregation())); - } - else - { - return std::move(std::unique_ptr(new DoubleHistogramAggregation())); - } - break; - case AggregationType::kLastValue: - if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong) - { - return std::move(std::unique_ptr(new LongLastValueAggregation())); - } - else - { - return std::move(std::unique_ptr(new DoubleLastValueAggregation())); - } - break; - case AggregationType::kSum: - if (instrument_descriptor_.value_type_ == InstrumentValueType::kLong) - { - return std::move(std::unique_ptr(new LongSumAggregation(true))); - } - else - { - return std::move(std::unique_ptr(new DoubleSumAggregation(true))); - } - break; - default: - return std::move(DefaultAggregation::CreateAggregation(instrument_descriptor_)); - } - } }; } // namespace metrics diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index d49d855d8f..cf780b1120 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -64,6 +64,38 @@ cc_test( ], ) +cc_test( + name = "async_metric_storage_test", + srcs = [ + "async_metric_storage_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "//sdk/src/resource", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "observer_result_test", + srcs = [ + "observer_result_test.cc", + ], + tags = [ + "metrics", + "test", + ], + deps = [ + "//sdk/src/metrics", + "//sdk/src/resource", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "multi_metric_storage_test", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index fb823feaca..a2080b6e46 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -6,7 +6,9 @@ foreach( attributes_processor_test attributes_hashmap_test sync_metric_storage_test + async_metric_storage_test multi_metric_storage_test + observer_result_test sync_instruments_test async_instruments_test) add_executable(${testname} "${testname}.cc") diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc new file mode 100644 index 0000000000..fd2a41d325 --- /dev/null +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/state/async_metric_storage.h" +# include "opentelemetry/common/key_value_iterable_view.h" +# include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/observer_result.h" + +# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h" +# include "opentelemetry/sdk/resource/resource.h" + +# include +# include + +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::sdk::instrumentationlibrary; +using namespace opentelemetry::sdk::resource; + +void measurement_fetch(opentelemetry::metrics::ObserverResult &observer_result) +{ + observer_result.Observe(20l); + observer_result.Observe(10l); +} + +TEST(AsyncMetricStorageTest, BasicTests) +{ + auto metric_callback = [](MetricData &metric_data) { return true; }; + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + InstrumentValueType::kLong}; + auto instrumentation_library = InstrumentationLibrary::Create("instr_lib"); + auto resource = Resource::Create({}); + MetricCollector collector; + std::vector collectors; + + opentelemetry::sdk::metrics::AsyncMetricStorage storage( + instr_desc, AggregationType::kSum, &measurement_fetch, new DefaultAttributesProcessor()); + EXPECT_NO_THROW(storage.Collect(&collector, collectors, instrumentation_library.get(), &resource, + metric_callback)); +} +#endif \ No newline at end of file diff --git a/sdk/test/metrics/observer_result_test.cc b/sdk/test/metrics/observer_result_test.cc new file mode 100644 index 0000000000..a4cc28ae54 --- /dev/null +++ b/sdk/test/metrics/observer_result_test.cc @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/observer_result.h" +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" + +# include + +using namespace opentelemetry::sdk::metrics; +TEST(ObserverResult, BasicTests) +{ + const AttributesProcessor *attributes_processor = new DefaultAttributesProcessor(); + + ObserverResult observer_result(attributes_processor); + + observer_result.Observe(10l); + observer_result.Observe(20l); + EXPECT_EQ(observer_result.GetMeasurements().size(), 1); + + std::map m1 = {{"k2", 12}}; + observer_result.Observe( + 30l, opentelemetry::common::KeyValueIterableView>(m1)); + EXPECT_EQ(observer_result.GetMeasurements().size(), 2); + + observer_result.Observe( + 40l, opentelemetry::common::KeyValueIterableView>(m1)); + EXPECT_EQ(observer_result.GetMeasurements().size(), 2); + + std::map m2 = {{"k2", 12}, {"k4", 12}}; + observer_result.Observe( + 40l, opentelemetry::common::KeyValueIterableView>(m2)); + EXPECT_EQ(observer_result.GetMeasurements().size(), 3); + + delete attributes_processor; +} + +#endif \ No newline at end of file diff --git a/sdk/test/metrics/sync_metric_storage_test.cc b/sdk/test/metrics/sync_metric_storage_test.cc index 916425dfa1..23ef20b11d 100644 --- a/sdk/test/metrics/sync_metric_storage_test.cc +++ b/sdk/test/metrics/sync_metric_storage_test.cc @@ -5,6 +5,7 @@ # include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" # include "opentelemetry/common/key_value_iterable_view.h" # include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/view/attributes_processor.h" # include # include @@ -17,7 +18,8 @@ TEST(WritableMetricStorageTest, BasicTests) InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, InstrumentValueType::kLong}; - opentelemetry::sdk::metrics::SyncMetricStorage storage(instr_desc, AggregationType::kSum); + opentelemetry::sdk::metrics::SyncMetricStorage storage(instr_desc, AggregationType::kSum, + new DefaultAttributesProcessor()); EXPECT_NO_THROW(storage.RecordLong(10l)); EXPECT_NO_THROW(storage.RecordDouble(10.10)); EXPECT_NO_THROW(storage.RecordLong(