Skip to content

Commit 2641d1f

Browse files
committed
[python] Fix formatting for PortableRunner SDF optimization
1 parent 0dcc8fd commit 2641d1f

File tree

2 files changed

+8
-14
lines changed

2 files changed

+8
-14
lines changed

sdks/python/apache_beam/runners/portability/portable_runner.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,7 @@ def _optimize_pipeline(
336336
phases = []
337337
for phase_name in pre_optimize.split(','):
338338
# For now, these are all we allow.
339-
if phase_name in (
340-
'pack_combiners', 'lift_combiners', 'expand_sdf'):
339+
if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'):
341340
phases.append(getattr(translations, phase_name))
342341
else:
343342
raise ValueError(

sdks/python/apache_beam/runners/portability/portable_runner_test.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -462,14 +462,13 @@ def create_options(self):
462462

463463
class PortableRunnerOptimizationTest(unittest.TestCase):
464464
"""Tests for PortableRunner._optimize_pipeline."""
465-
466465
def test_default_optimize_expands_sdf(self):
467466
"""Verify that expand_sdf is applied in the default pre_optimize setting.
468467
469468
See https://github.com/apache/beam/issues/24422.
470469
"""
471-
from apache_beam.portability import common_urns
472470
from apache_beam.io import restriction_trackers
471+
from apache_beam.portability import common_urns
473472

474473
class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
475474
def initial_restriction(self, element):
@@ -511,14 +510,13 @@ def process(
511510
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
512511
transform_urns)
513512
self.assertIn(
514-
common_urns.sdf_components
515-
.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
513+
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
516514
transform_urns)
517515

518516
def test_custom_optimize_expand_sdf(self):
519517
"""Verify that expand_sdf can be requested explicitly."""
520-
from apache_beam.portability import common_urns
521518
from apache_beam.io import restriction_trackers
519+
from apache_beam.portability import common_urns
522520

523521
class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
524522
def initial_restriction(self, element):
@@ -559,8 +557,7 @@ def process(
559557
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
560558
transform_urns)
561559
self.assertIn(
562-
common_urns.sdf_components
563-
.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
560+
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
564561
transform_urns)
565562

566563
def test_default_optimize_expands_bounded_read(self):
@@ -572,8 +569,8 @@ def test_default_optimize_expands_bounded_read(self):
572569
optimization, these arrive at the Spark job server as a single ParDo,
573570
executing on one partition with no parallelization.
574571
"""
575-
from apache_beam.portability import common_urns
576572
from apache_beam.io import iobase
573+
from apache_beam.portability import common_urns
577574

578575
class _FakeBoundedSource(iobase.BoundedSource):
579576
def get_range_tracker(self, start_position, stop_position):
@@ -606,12 +603,10 @@ def estimate_size(self):
606603
common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn,
607604
transform_urns)
608605
self.assertIn(
609-
common_urns.sdf_components
610-
.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
606+
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn,
611607
transform_urns)
612608
# Reshuffle should be present to enable parallelization.
613-
self.assertIn(
614-
common_urns.composites.RESHUFFLE.urn, transform_urns)
609+
self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns)
615610

616611

617612
if __name__ == '__main__':

0 commit comments

Comments
 (0)