diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 94a467d5a249..c386d90da19a 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -316,6 +316,10 @@ def _optimize_pipeline( # Eventually remove the 'lift_combiners' phase from 'default'. translations.pack_combiners, translations.lift_combiners, + # Expand SDF so that portable runners that don't support SDFs + # natively (e.g. Spark) can still parallelize Read transforms. + # See https://github.com/apache/beam/issues/24422 + translations.expand_sdf, translations.sort_stages ] partial = True @@ -332,7 +336,7 @@ def _optimize_pipeline( phases = [] for phase_name in pre_optimize.split(','): # For now, these are all we allow. - if phase_name in ('pack_combiners', 'lift_combiners'): + if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'): phases.append(getattr(translations, phase_name)) else: raise ValueError( diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 31293a4d43ec..b1197e8141ee 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -460,6 +460,155 @@ def create_options(self): return options +class PortableRunnerOptimizationTest(unittest.TestCase): + """Tests for PortableRunner._optimize_pipeline.""" + def test_default_optimize_expands_sdf(self): + """Verify that expand_sdf is applied in the default pre_optimize setting. + + See https://github.com/apache/beam/issues/24422. + """ + from apache_beam.io import restriction_trackers + from apache_beam.portability import common_urns + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): + def initial_restriction(self, element): + return restriction_trackers.OffsetRange(0, len(element)) + + def create_tracker(self, restriction): + return restriction_trackers.OffsetRestrictionTracker(restriction) + + def restriction_size(self, element, restriction): + return restriction.size() + + class ExpandingStringsDoFn(beam.DoFn): + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ExpandStringsProvider())): + cur = restriction_tracker.current_restriction().start + while restriction_tracker.try_claim(cur): + yield element[cur] + cur += 1 + + p = beam.Pipeline() + _ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn())) + proto = p.to_runner_api() + + # Default options (no pre_optimize experiment set). + options = PipelineOptions() + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + + def test_custom_optimize_expand_sdf(self): + """Verify that expand_sdf can be requested explicitly.""" + from apache_beam.io import restriction_trackers + from apache_beam.portability import common_urns + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): + def initial_restriction(self, element): + return restriction_trackers.OffsetRange(0, len(element)) + + def create_tracker(self, restriction): + return restriction_trackers.OffsetRestrictionTracker(restriction) + + def restriction_size(self, element, restriction): + return restriction.size() + + class ExpandingStringsDoFn(beam.DoFn): + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ExpandStringsProvider())): + cur = restriction_tracker.current_restriction().start + while restriction_tracker.try_claim(cur): + yield element[cur] + cur += 1 + + p = beam.Pipeline() + _ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn())) + proto = p.to_runner_api() + + options = PipelineOptions(['--experiments=pre_optimize=expand_sdf']) + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + + def test_default_optimize_expands_bounded_read(self): + """Verify that iobase.Read(BoundedSource) is expanded by default. + + This is the end-to-end scenario from + https://github.com/apache/beam/issues/24422: Read transforms like + ReadFromParquet use SDFs internally. Without expand_sdf in the default + optimization, these arrive at the Spark job server as a single ParDo, + executing on one partition with no parallelization. + """ + from apache_beam.io import iobase + from apache_beam.portability import common_urns + + class _FakeBoundedSource(iobase.BoundedSource): + def get_range_tracker(self, start_position, stop_position): + return None + + def read(self, range_tracker): + return iter([]) + + def estimate_size(self): + return 0 + + p = beam.Pipeline() + _ = p | beam.io.Read(_FakeBoundedSource()) + proto = p.to_runner_api() + + # Default options (no pre_optimize experiment set). + options = PipelineOptions() + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + # The SDFBoundedSourceReader DoFn should have been expanded into + # SDF component stages. + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + # Reshuffle should be present to enable parallelization. + self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()