Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/examples/complete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ def format_result(prefix_candidates):

class TopPerPrefix(beam.PTransform):
def __init__(self, count):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self._count = count

def expand(self, words):
Expand Down
14 changes: 5 additions & 9 deletions sdks/python/apache_beam/examples/complete/game/game_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ class ParseGameEventFn(beam.DoFn):
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

def process(self, elem):
Expand All @@ -131,9 +129,8 @@ class ExtractAndSumScore(beam.PTransform):
extracted.
"""
def __init__(self, field):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.field = field

def expand(self, pcoll):
Expand Down Expand Up @@ -171,9 +168,8 @@ def __init__(self, table_name, dataset, schema, project):
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
"""
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.table_name = table_name
self.dataset = dataset
self.schema = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ class ParseGameEventFn(beam.DoFn):
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

def process(self, elem):
Expand All @@ -131,9 +129,8 @@ class ExtractAndSumScore(beam.PTransform):
extracted.
"""
def __init__(self, field):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.field = field

def expand(self, pcoll):
Expand Down Expand Up @@ -171,9 +168,8 @@ def __init__(self, table_name, dataset, schema, project):
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
"""
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.table_name = table_name
self.dataset = dataset
self.schema = schema
Expand All @@ -196,9 +192,8 @@ def expand(self, pcoll):
# [START main]
class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.start_timestamp = str2timestamp(start_min)
self.stop_timestamp = str2timestamp(stop_min)
self.window_duration_in_seconds = window_duration * 60
Expand Down
24 changes: 9 additions & 15 deletions sdks/python/apache_beam/examples/complete/game/leader_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ class ParseGameEventFn(beam.DoFn):
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

def process(self, elem):
Expand All @@ -140,9 +138,8 @@ class ExtractAndSumScore(beam.PTransform):
extracted.
"""
def __init__(self, field):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.field = field

def expand(self, pcoll):
Expand Down Expand Up @@ -180,9 +177,8 @@ def __init__(self, table_name, dataset, schema, project):
schema: Dictionary in the format {'column_name': 'bigquery_type'}
project: Name of the Cloud project containing BigQuery table.
"""
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.table_name = table_name
self.dataset = dataset
self.schema = schema
Expand Down Expand Up @@ -210,9 +206,8 @@ class CalculateTeamScores(beam.PTransform):
default.
"""
def __init__(self, team_window_duration, allowed_lateness):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.team_window_duration = team_window_duration * 60
self.allowed_lateness_seconds = allowed_lateness * 60

Expand Down Expand Up @@ -242,9 +237,8 @@ class CalculateUserScores(beam.PTransform):
global windowing. Get periodic updates on all users' running scores.
"""
def __init__(self, allowed_lateness):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.allowed_lateness_seconds = allowed_lateness * 60

def expand(self, pcoll):
Expand Down
9 changes: 3 additions & 6 deletions sdks/python/apache_beam/examples/complete/game/user_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ class ParseGameEventFn(beam.DoFn):
The human-readable time string is not used here.
"""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

def process(self, elem):
Expand All @@ -124,9 +122,8 @@ class ExtractAndSumScore(beam.PTransform):
extracted.
"""
def __init__(self, field):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.field = field

def expand(self, pcoll):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ def format_output(element, window=beam.DoFn.WindowParam):
class ComputeTopSessions(beam.PTransform):
"""Computes the top user sessions for each month."""
def __init__(self, sampling_threshold):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()

self.sampling_threshold = sampling_threshold

def expand(self, pcoll):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class GenerateTestRows(beam.PTransform):

"""
def __init__(self, number, project_id=None, instance_id=None, table_id=None):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
super().__init__()
self.number = number
self.rand = random.choice(string.ascii_letters + string.digits)
self.column_family_id = 'cf1'
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/examples/wordcount_debugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@
class FilterTextFn(beam.DoFn):
"""A DoFn that filters for a specific key based on a regular expression."""
def __init__(self, pattern):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.pattern = pattern
# A custom metric can track values in your pipeline as it runs. Those
# values will be available in the monitoring system of the runner used
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/examples/wordcount_with_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.DoFn.__init__(self)
super().__init__()
self.words_counter = Metrics.counter(self.__class__, 'words')
self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
self.word_lengths_dist = Metrics.distribution(
Expand Down
Loading