diff --git a/docs/getting_started/prometheus_example.py b/docs/getting_started/prometheus_example.py index 0377e570b86..d7111df969c 100644 --- a/docs/getting_started/prometheus_example.py +++ b/docs/getting_started/prometheus_example.py @@ -27,9 +27,9 @@ # Start Prometheus client start_http_server(port=8000, addr="localhost") -batcher_mode = "stateful" +processor_mode = "stateful" metrics.set_meter_provider(MeterProvider()) -meter = metrics.get_meter(__name__, batcher_mode == "stateful") +meter = metrics.get_meter(__name__, processor_mode == "stateful") exporter = PrometheusMetricsExporter("MyAppPrefix") controller = PushController(meter, exporter, 5) diff --git a/docs/sdk/metrics.export.batcher.rst b/docs/sdk/metrics.export.processor.rst similarity index 56% rename from docs/sdk/metrics.export.batcher.rst rename to docs/sdk/metrics.export.processor.rst index dab2dd3415b..cdae8b2fbe7 100644 --- a/docs/sdk/metrics.export.batcher.rst +++ b/docs/sdk/metrics.export.processor.rst @@ -1,11 +1,11 @@ -opentelemetry.sdk.metrics.export.batcher +opentelemetry.sdk.metrics.export.processor ========================================== .. toctree:: metrics.export -.. automodule:: opentelemetry.sdk.metrics.export.batcher +.. automodule:: opentelemetry.sdk.metrics.export.processor :members: :undoc-members: :show-inheritance: diff --git a/docs/sdk/metrics.rst b/docs/sdk/metrics.rst index 7030285982e..8e34be5a4bc 100644 --- a/docs/sdk/metrics.rst +++ b/docs/sdk/metrics.rst @@ -7,7 +7,7 @@ Submodules .. toctree:: metrics.export.aggregate - metrics.export.batcher + metrics.export.processor util.instrumentation .. automodule:: opentelemetry.sdk.metrics diff --git a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py index 204a7c5476d..db7af753aa7 100644 --- a/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-opencensus/src/opentelemetry/exporter/opencensus/metrics_exporter/__init__.py @@ -139,7 +139,7 @@ def translate_to_collector( # If cumulative and stateful, explicitly set the start_timestamp to # exporter start time. - if metric_record.instrument.meter.batcher.stateful: + if metric_record.instrument.meter.processor.stateful: start_timestamp = exporter_start_timestamp else: start_timestamp = None diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 71864282885..d2834986122 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -10,6 +10,8 @@ ([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120)) - Allow for Custom Trace and Span IDs Generation - `IdsGenerator` for TracerProvider ([#1153](https://github.com/open-telemetry/opentelemetry-python/pull/1153)) +- Renaming metrics Batcher to Processor + ([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203)) ## Version 0.13b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 092f456fafe..01242b7d074 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -23,8 +23,8 @@ MetricsExporter, ) from opentelemetry.sdk.metrics.export.aggregate import Aggregator -from opentelemetry.sdk.metrics.export.batcher import Batcher from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.processor import Processor from opentelemetry.sdk.metrics.view import ( ViewData, ViewManager, @@ -325,7 +325,7 @@ class ValueObserver(Observer, metrics_api.ValueObserver): class Record: - """Container class used for processing in the `Batcher`""" + """Container class used for processing in the `Processor`""" def __init__( self, @@ -352,7 +352,7 @@ def __init__( instrumentation_info: "InstrumentationInfo", ): self.instrumentation_info = instrumentation_info - self.batcher = Batcher(source.stateful) + self.processor = Processor(source.stateful) self.resource = source.resource self.metrics = set() self.observers = set() @@ -363,7 +363,7 @@ def __init__( def collect(self) -> None: """Collects all the metrics created with this `Meter` for export. - Utilizes the batcher to create checkpoints of the current values in + Utilizes the processor to create checkpoints of the current values in each aggregator belonging to the metrics that were created with this meter instance. """ @@ -385,7 +385,7 @@ def _collect_metrics(self) -> None: record = Record( metric, view_data.labels, view_data.aggregator ) - self.batcher.process(record) + self.processor.process(record) if bound_instrument.ref_count() == 0: to_remove.append(labels) @@ -405,7 +405,7 @@ def _collect_observers(self) -> None: for labels, aggregator in observer.aggregators.items(): record = Record(observer, labels, aggregator) - self.batcher.process(record) + self.processor.process(record) def record_batch( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py index 7448f353c45..e095ebbb72c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py @@ -57,7 +57,7 @@ def tick(self): self.meter.collect() # Export the collected metrics token = attach(set_value("suppress_instrumentation", True)) - self.exporter.export(self.meter.batcher.checkpoint_set()) + self.exporter.export(self.meter.processor.checkpoint_set()) detach(token) # Perform post-exporting logic - self.meter.batcher.finished_collection() + self.meter.processor.finished_collection() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py similarity index 88% rename from opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py rename to opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py index 1c1858ebbae..c012d7382bf 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/processor.py @@ -18,18 +18,18 @@ from opentelemetry.sdk.util import get_dict_as_key -class Batcher: - """Base class for all batcher types. +class Processor: + """Base class for all processor types. - The batcher is responsible for storing the aggregators and aggregated + The processor is responsible for storing the aggregators and aggregated values received from updates from metrics in the meter. The stored values will be sent to an exporter for exporting. """ def __init__(self, stateful: bool): self._batch_map = {} - # stateful=True indicates the batcher computes checkpoints from over - # the process lifetime. False indicates the batcher computes + # stateful=True indicates the processor computes checkpoints from over + # the process lifetime. False indicates the processor computes # checkpoints which describe the updates of a single collection period # (deltas) self.stateful = stateful @@ -38,7 +38,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]: """Returns a list of MetricRecords used for exporting. The list of MetricRecords is a snapshot created from the current - data in all of the aggregators in this batcher. + data in all of the aggregators in this processor. """ metric_records = [] # pylint: disable=W0612 @@ -52,7 +52,7 @@ def checkpoint_set(self) -> Sequence[MetricRecord]: def finished_collection(self): """Performs certain post-export logic. - For batchers that are stateless, resets the batch map. + For processors that are stateless, resets the batch map. """ if not self.stateful: self._batch_map = {} diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 99aa9c4a629..efa6bcd24e1 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -29,8 +29,8 @@ SumAggregator, ValueObserverAggregator, ) -from opentelemetry.sdk.metrics.export.batcher import Batcher from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.processor import Processor # pylint: disable=protected-access @@ -61,10 +61,10 @@ def test_export(self): mock_stdout.write.assert_any_call(result) -class TestBatcher(unittest.TestCase): +class TestProcessor(unittest.TestCase): def test_checkpoint_set(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(True) + processor = Processor(True) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -73,21 +73,21 @@ def test_checkpoint_set(self): labels = () _batch_map = {} _batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator - batcher._batch_map = _batch_map - records = batcher.checkpoint_set() + processor._batch_map = _batch_map + records = processor.checkpoint_set() self.assertEqual(len(records), 1) self.assertEqual(records[0].instrument, metric) self.assertEqual(records[0].labels, labels) self.assertEqual(records[0].aggregator, aggregator) def test_checkpoint_set_empty(self): - batcher = Batcher(True) - records = batcher.checkpoint_set() + processor = Processor(True) + records = processor.checkpoint_set() self.assertEqual(len(records), 0) def test_finished_collection_stateless(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(False) + processor = Processor(False) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -96,13 +96,13 @@ def test_finished_collection_stateless(self): labels = () _batch_map = {} _batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator - batcher._batch_map = _batch_map - batcher.finished_collection() - self.assertEqual(len(batcher._batch_map), 0) + processor._batch_map = _batch_map + processor.finished_collection() + self.assertEqual(len(processor._batch_map), 0) def test_finished_collection_stateful(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(True) + processor = Processor(True) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -111,13 +111,13 @@ def test_finished_collection_stateful(self): labels = () _batch_map = {} _batch_map[(metric, SumAggregator, tuple(), labels)] = aggregator - batcher._batch_map = _batch_map - batcher.finished_collection() - self.assertEqual(len(batcher._batch_map), 1) + processor._batch_map = _batch_map + processor.finished_collection() + self.assertEqual(len(processor._batch_map), 1) - def test_batcher_process_exists(self): + def test_processor_process_exists(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(True) + processor = Processor(True) aggregator = SumAggregator() aggregator2 = SumAggregator() metric = metrics.Counter( @@ -128,17 +128,17 @@ def test_batcher_process_exists(self): batch_key = (metric, SumAggregator, tuple(), labels) _batch_map[batch_key] = aggregator aggregator2.update(1.0) - batcher._batch_map = _batch_map + processor._batch_map = _batch_map record = metrics.Record(metric, labels, aggregator2) - batcher.process(record) - self.assertEqual(len(batcher._batch_map), 1) - self.assertIsNotNone(batcher._batch_map.get(batch_key)) - self.assertEqual(batcher._batch_map.get(batch_key).current, 0) - self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0) + processor.process(record) + self.assertEqual(len(processor._batch_map), 1) + self.assertIsNotNone(processor._batch_map.get(batch_key)) + self.assertEqual(processor._batch_map.get(batch_key).current, 0) + self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0) - def test_batcher_process_not_exists(self): + def test_processor_process_not_exists(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(True) + processor = Processor(True) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -147,17 +147,17 @@ def test_batcher_process_not_exists(self): _batch_map = {} batch_key = (metric, SumAggregator, tuple(), labels) aggregator.update(1.0) - batcher._batch_map = _batch_map + processor._batch_map = _batch_map record = metrics.Record(metric, labels, aggregator) - batcher.process(record) - self.assertEqual(len(batcher._batch_map), 1) - self.assertIsNotNone(batcher._batch_map.get(batch_key)) - self.assertEqual(batcher._batch_map.get(batch_key).current, 0) - self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0) + processor.process(record) + self.assertEqual(len(processor._batch_map), 1) + self.assertIsNotNone(processor._batch_map.get(batch_key)) + self.assertEqual(processor._batch_map.get(batch_key).current, 0) + self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0) - def test_batcher_process_not_stateful(self): + def test_processor_process_not_stateful(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher = Batcher(True) + processor = Processor(True) aggregator = SumAggregator() metric = metrics.Counter( "available memory", "available memory", "bytes", int, meter @@ -166,13 +166,13 @@ def test_batcher_process_not_stateful(self): _batch_map = {} batch_key = (metric, SumAggregator, tuple(), labels) aggregator.update(1.0) - batcher._batch_map = _batch_map + processor._batch_map = _batch_map record = metrics.Record(metric, labels, aggregator) - batcher.process(record) - self.assertEqual(len(batcher._batch_map), 1) - self.assertIsNotNone(batcher._batch_map.get(batch_key)) - self.assertEqual(batcher._batch_map.get(batch_key).current, 0) - self.assertEqual(batcher._batch_map.get(batch_key).checkpoint, 1.0) + processor.process(record) + self.assertEqual(len(processor._batch_map), 1) + self.assertIsNotNone(processor._batch_map.get(batch_key)) + self.assertEqual(processor._batch_map.get(batch_key).current, 0) + self.assertEqual(processor._batch_map.get(batch_key).checkpoint, 1.0) class TestSumAggregator(unittest.TestCase): diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 8e412f3c5cb..2f833f47820 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -28,7 +28,7 @@ class TestMeterProvider(unittest.TestCase): def test_stateful(self): meter_provider = metrics.MeterProvider(stateful=False) meter = meter_provider.get_meter(__name__) - self.assertIs(meter.batcher.stateful, False) + self.assertIs(meter.processor.stateful, False) def test_resource(self): resource = resources.Resource.create({}) @@ -74,8 +74,8 @@ def test_extends_api(self): def test_collect_metrics(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher_mock = mock.Mock() - meter.batcher = batcher_mock + processor_mock = mock.Mock() + meter.processor = processor_mock counter = meter.create_metric( "name", "desc", "unit", float, metrics.Counter ) @@ -83,40 +83,40 @@ def test_collect_metrics(self): meter.register_view(View(counter, SumAggregator)) counter.add(1.0, labels) meter.collect() - self.assertTrue(batcher_mock.process.called) + self.assertTrue(processor_mock.process.called) def test_collect_no_metrics(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher_mock = mock.Mock() - meter.batcher = batcher_mock + processor_mock = mock.Mock() + meter.processor = processor_mock meter.collect() - self.assertFalse(batcher_mock.process.called) + self.assertFalse(processor_mock.process.called) def test_collect_not_registered(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher_mock = mock.Mock() - meter.batcher = batcher_mock + processor_mock = mock.Mock() + meter.processor = processor_mock counter = metrics.Counter("name", "desc", "unit", float, meter) labels = {"key1": "value1"} counter.add(1.0, labels) meter.collect() - self.assertFalse(batcher_mock.process.called) + self.assertFalse(processor_mock.process.called) def test_collect_disabled_metric(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher_mock = mock.Mock() - meter.batcher = batcher_mock + processor_mock = mock.Mock() + meter.processor = processor_mock counter = metrics.Counter("name", "desc", "unit", float, meter, False) labels = {"key1": "value1"} meter.register_view(View(counter, SumAggregator)) counter.add(1.0, labels) meter.collect() - self.assertFalse(batcher_mock.process.called) + self.assertFalse(processor_mock.process.called) def test_collect_observers(self): meter = metrics.MeterProvider().get_meter(__name__) - batcher_mock = mock.Mock() - meter.batcher = batcher_mock + processor_mock = mock.Mock() + meter.processor = processor_mock def callback(observer): self.assertIsInstance(observer, metrics_api.Observer) @@ -128,7 +128,7 @@ def callback(observer): meter.observers.add(observer) meter.collect() - self.assertTrue(batcher_mock.process.called) + self.assertTrue(processor_mock.process.called) def test_record_batch(self): meter = metrics.MeterProvider().get_meter(__name__)