diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto index 97edb710dd94..28c755950249 100644 --- a/model/fn-execution/src/main/proto/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/beam_fn_api.proto @@ -307,6 +307,12 @@ message Metrics { int64 max = 4; } + // Data associated with a Gauge metric. + message GaugeData { + int64 value = 1; + google.protobuf.Timestamp timestamp = 2; + } + // (Required) The identifier for this metric. MetricName metric_name = 1; @@ -314,6 +320,7 @@ message Metrics { oneof data { CounterData counter_data = 1001; DistributionData distribution_data = 1002; + GaugeData gauge_data = 1003; } } diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index b601050c6d72..8271a5c64d52 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -28,6 +28,8 @@ import threading import time +from google.protobuf import timestamp_pb2 + from apache_beam.metrics.metricbase import Counter from apache_beam.metrics.metricbase import Distribution from apache_beam.metrics.metricbase import Gauge @@ -348,7 +350,17 @@ def combine(self, other): def singleton(value, timestamp=None): return GaugeData(value, timestamp=timestamp) - #TODO(pabloem) - Add to_runner_api, and from_runner_api + def to_runner_api(self): + seconds = int(self.timestamp) + nanos = int((self.timestamp - seconds) * 10**9) + gauge_timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) + return beam_fn_api_pb2.Metrics.User.GaugeData( + value=self.value, timestamp=gauge_timestamp) + + @staticmethod + def from_runner_api(proto): + gauge_timestamp = proto.timestamp.seconds + proto.timestamp.nanos / 10**9 + return GaugeData(proto.value, timestamp=gauge_timestamp) class DistributionData(object): diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index c697926fac2c..f6c790de5d4b 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -212,8 +212,11 @@ def to_runner_api(self): [beam_fn_api_pb2.Metrics.User( metric_name=k.to_runner_api(), distribution_data=v.get_cumulative().to_runner_api()) - for k, v in self.distributions.items()] - #TODO(pabloem): Add gauge metrics. + for k, v in self.distributions.items()] + + [beam_fn_api_pb2.Metrics.User( + metric_name=k.to_runner_api(), + gauge_data=v.get_cumulative().to_runner_api()) + for k, v in self.gauges.items()] ) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 6cacd907e602..2e0bc8209ecf 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -170,12 +170,8 @@ def _get_metric_value(self, metric): lambda x: x.key == 'min').value.integer_value dist_max = _get_match(metric.distribution.object_value.properties, lambda x: x.key == 'max').value.integer_value - dist_mean = _get_match(metric.distribution.object_value.properties, - lambda x: x.key == 'mean').value.integer_value - # Calculating dist_sum with a hack, as distribution sum is not yet - # available in the Dataflow API. - # TODO(pabloem) Switch to "sum" field once it's available in the API - dist_sum = dist_count * dist_mean + dist_sum = _get_match(metric.distribution.object_value.properties, + lambda x: x.key == 'sum').value.integer_value return DistributionResult( DistributionData( dist_sum, dist_count, dist_min, dist_max)) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 63e4a68536eb..7e89d9aa7575 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1181,6 +1181,7 @@ class FnApiMetrics(metrics.metric.MetricResults): def __init__(self, step_metrics): self._counters = {} self._distributions = {} + self._gauges = {} for step_metric in step_metrics.values(): for ptransform_id, ptransform in step_metric.ptransforms.items(): for proto in ptransform.user: @@ -1194,6 +1195,11 @@ def __init__(self, step_metrics): key] = metrics.cells.DistributionResult( metrics.cells.DistributionData.from_runner_api( proto.distribution_data)) + elif proto.HasField('gauge_data'): + self._gauges[ + key] = metrics.cells.GaugeResult( + metrics.cells.GaugeData.from_runner_api( + proto.gauge_data)) def query(self, filter=None): counters = [metrics.execution.MetricResult(k, v, v) @@ -1202,9 +1208,13 @@ def query(self, filter=None): distributions = [metrics.execution.MetricResult(k, v, v) for k, v in self._distributions.items() if self.matches(filter, k)] + gauges = [metrics.execution.MetricResult(k, v, v) + for k, v in self._gauges.items() + if self.matches(filter, k)] return {'counters': counters, - 'distributions': distributions} + 'distributions': distributions, + 'gauges': gauges} class RunnerResult(runner.PipelineResult): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 50dc8f837962..e7b865cb631c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -125,11 +125,14 @@ def test_metrics(self): counter = beam.metrics.Metrics.counter('ns', 'counter') distribution = beam.metrics.Metrics.distribution('ns', 'distribution') + gauge = beam.metrics.Metrics.gauge('ns', 'gauge') + pcoll = p | beam.Create(['a', 'zzz']) # pylint: disable=expression-not-assigned pcoll | 'count1' >> beam.FlatMap(lambda x: counter.inc()) pcoll | 'count2' >> beam.FlatMap(lambda x: counter.inc(len(x))) pcoll | 'dist' >> beam.FlatMap(lambda x: distribution.update(len(x))) + pcoll | 'gauge' >> beam.FlatMap(lambda x: gauge.set(len(x))) res = p.run() res.wait_until_finish() @@ -141,9 +144,12 @@ def test_metrics(self): self.assertEqual(c2.committed, 4) dist, = res.metrics().query(beam.metrics.MetricsFilter().with_step('dist'))[ 'distributions'] + gaug, = res.metrics().query( + beam.metrics.MetricsFilter().with_step('gauge'))['gauges'] self.assertEqual( dist.committed.data, beam.metrics.cells.DistributionData(4, 2, 1, 3)) self.assertEqual(dist.committed.mean, 2.0) + self.assertEqual(gaug.committed.value, 3) def test_progress_metrics(self): p = self.create_pipeline()