Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/programming_guide_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import unittest

import apache_beam as beam
from apache_beam import metrics
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner


class ProgrammingGuideTest(unittest.TestCase):
def test_metrics_example(self):
class MyMetricsDoFn(beam.DoFn):
def __init__(self):
super().__init__()
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
self.counter.inc()
yield element

with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
_ = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyMetricsDoFn()))

metrics_filter = metrics.MetricsFilter().with_name("counter1")
query_result = p.result.metrics().query(metrics_filter)

for metric in query_result["counters"]:
print(metric)

# Not in example but just to confirm that anything is returned
assert query_result["counters"]


if __name__ == '__main__':
unittest.main()
16 changes: 7 additions & 9 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -6324,22 +6324,20 @@ public class MyMetricsDoFn extends DoFn<Integer, Integer> {
{{< highlight py >}}
class MyMetricsDoFn(beam.DoFn):
def __init__(self):
super().__init__()
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
counter.inc()
self.counter.inc()
yield element

pipeline = beam.Pipeline()

pipeline | beam.ParDo(MyMetricsDoFn())

result = pipeline.run().wait_until_finish()
with beam.Pipeline() as p:
p | beam.Create([1, 2, 3]) | beam.ParDo(MyMetricsDoFn())

metrics = result.metrics().query(
metrics.MetricsFilter.with_namespace("namespace").with_name("counter1"))
metrics_filter = metrics.MetricsFilter().with_name("counter1")
query_result = p.result.metrics().query(metrics_filter)

for metric in metrics["counters"]:
for metric in query_result["counters"]:
print(metric)
{{< /highlight >}}

Expand Down
Loading