Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,20 @@ message Metrics {
int64 max = 4;
}

// Data associated with a Gauge metric.
message GaugeData {
int64 value = 1;
google.protobuf.Timestamp timestamp = 2;
}

// (Required) The identifier for this metric.
MetricName metric_name = 1;

// (Required) The data for this metric.
oneof data {
CounterData counter_data = 1001;
DistributionData distribution_data = 1002;
GaugeData gauge_data = 1003;
}
}

Expand Down
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import threading
import time

from google.protobuf import timestamp_pb2

from apache_beam.metrics.metricbase import Counter
from apache_beam.metrics.metricbase import Distribution
from apache_beam.metrics.metricbase import Gauge
Expand Down Expand Up @@ -348,7 +350,17 @@ def combine(self, other):
def singleton(value, timestamp=None):
return GaugeData(value, timestamp=timestamp)

#TODO(pabloem) - Add to_runner_api, and from_runner_api
def to_runner_api(self):
seconds = int(self.timestamp)
nanos = int((self.timestamp - seconds) * 10**9)
gauge_timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
return beam_fn_api_pb2.Metrics.User.GaugeData(
value=self.value, timestamp=gauge_timestamp)

@staticmethod
def from_runner_api(proto):
gauge_timestamp = proto.timestamp.seconds + proto.timestamp.nanos / 10**9
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this could be doing a truncating division so that the resulting gauge_timestamp is an integer and drops the nanosecond values. Do we want to preserve nanoseconds by potentially doing something like float(proto.timestamp.nanos) before doing the division?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct Charles. I'll get that fixed now.

return GaugeData(proto.value, timestamp=gauge_timestamp)


class DistributionData(object):
Expand Down
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,11 @@ def to_runner_api(self):
[beam_fn_api_pb2.Metrics.User(
metric_name=k.to_runner_api(),
distribution_data=v.get_cumulative().to_runner_api())
for k, v in self.distributions.items()]
#TODO(pabloem): Add gauge metrics.
for k, v in self.distributions.items()] +
[beam_fn_api_pb2.Metrics.User(
metric_name=k.to_runner_api(),
gauge_data=v.get_cumulative().to_runner_api())
for k, v in self.gauges.items()]
)


Expand Down
8 changes: 2 additions & 6 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,8 @@ def _get_metric_value(self, metric):
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_mean = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'mean').value.integer_value
# Calculating dist_sum with a hack, as distribution sum is not yet
# available in the Dataflow API.
# TODO(pabloem) Switch to "sum" field once it's available in the API
dist_sum = dist_count * dist_mean
dist_sum = _get_match(metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
return DistributionResult(
DistributionData(
dist_sum, dist_count, dist_min, dist_max))
Expand Down
12 changes: 11 additions & 1 deletion sdks/python/apache_beam/runners/portability/fn_api_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ class FnApiMetrics(metrics.metric.MetricResults):
def __init__(self, step_metrics):
self._counters = {}
self._distributions = {}
self._gauges = {}
for step_metric in step_metrics.values():
for ptransform_id, ptransform in step_metric.ptransforms.items():
for proto in ptransform.user:
Expand All @@ -1194,6 +1195,11 @@ def __init__(self, step_metrics):
key] = metrics.cells.DistributionResult(
metrics.cells.DistributionData.from_runner_api(
proto.distribution_data))
elif proto.HasField('gauge_data'):
self._gauges[
key] = metrics.cells.GaugeResult(
metrics.cells.GaugeData.from_runner_api(
proto.gauge_data))

def query(self, filter=None):
counters = [metrics.execution.MetricResult(k, v, v)
Expand All @@ -1202,9 +1208,13 @@ def query(self, filter=None):
distributions = [metrics.execution.MetricResult(k, v, v)
for k, v in self._distributions.items()
if self.matches(filter, k)]
gauges = [metrics.execution.MetricResult(k, v, v)
for k, v in self._gauges.items()
if self.matches(filter, k)]

return {'counters': counters,
'distributions': distributions}
'distributions': distributions,
'gauges': gauges}


class RunnerResult(runner.PipelineResult):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,14 @@ def test_metrics(self):

counter = beam.metrics.Metrics.counter('ns', 'counter')
distribution = beam.metrics.Metrics.distribution('ns', 'distribution')
gauge = beam.metrics.Metrics.gauge('ns', 'gauge')

pcoll = p | beam.Create(['a', 'zzz'])
# pylint: disable=expression-not-assigned
pcoll | 'count1' >> beam.FlatMap(lambda x: counter.inc())
pcoll | 'count2' >> beam.FlatMap(lambda x: counter.inc(len(x)))
pcoll | 'dist' >> beam.FlatMap(lambda x: distribution.update(len(x)))
pcoll | 'gauge' >> beam.FlatMap(lambda x: gauge.set(len(x)))

res = p.run()
res.wait_until_finish()
Expand All @@ -141,9 +144,12 @@ def test_metrics(self):
self.assertEqual(c2.committed, 4)
dist, = res.metrics().query(beam.metrics.MetricsFilter().with_step('dist'))[
'distributions']
gaug, = res.metrics().query(
beam.metrics.MetricsFilter().with_step('gauge'))['gauges']
self.assertEqual(
dist.committed.data, beam.metrics.cells.DistributionData(4, 2, 1, 3))
self.assertEqual(dist.committed.mean, 2.0)
self.assertEqual(gaug.committed.value, 3)

def test_progress_metrics(self):
p = self.create_pipeline()
Expand Down