diff --git a/runners/google-cloud-dataflow-java/arm/build.gradle b/runners/google-cloud-dataflow-java/arm/build.gradle index 2e74d7727f21..531300bb63ea 100644 --- a/runners/google-cloud-dataflow-java/arm/build.gradle +++ b/runners/google-cloud-dataflow-java/arm/build.gradle @@ -118,7 +118,7 @@ task printrunnerV2PipelineOptionsARM { dependsOn buildAndPushDockerJavaMultiarchContainer doLast { - println "To run a Dataflow job with runner V2 on ARM, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Dataflow Portable Runner on ARM, add the following pipeline options to your command-line:" println runnerV2PipelineOptionsARM.join(' ') } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 93fe9cb227bb..943a01daa7d0 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -431,7 +431,7 @@ task printRunnerV2PipelineOptions { dependsOn buildAndPushDockerJavaContainer doLast { - println "To run a Dataflow job with runner V2, add the following pipeline options to your command-line:" + println "To run a Dataflow job with Dataflow Portable Runner, add the following pipeline options to your command-line:" println runnerV2PipelineOptions.join(' ') println "Please delete your image upon completion with the following command:" println "docker rmi ${dockerJavaImageName}; gcloud container images delete --force-delete-tags ${dockerJavaImageName}" @@ -471,7 +471,7 @@ def validatesRunnerStreamingConfig = [ excludedTests: [ // TODO(https://github.com/apache/beam/issues/21472) 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on streaming runner v1 + // GroupIntoBatches.withShardedKey not supported on Streaming Java Runner // https://github.com/apache/beam/issues/22592 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', @@ -570,7 +570,7 @@ createCrossLanguageValidatesRunnerTask( task validatesRunnerV2 { group = "Verification" - description = "Runs the ValidatesRunner tests on Dataflow Runner V2" + description = "Runs the ValidatesRunner tests on Dataflow Portable Runner" dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2Test', pipelineOptions: runnerV2PipelineOptions, @@ -610,7 +610,7 @@ task validatesRunnerV2 { task validatesRunnerV2Streaming { group = "Verification" - description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing streaming mode" + description = "Runs the ValidatesRunner tests on Dataflow Portable Runner forcing streaming mode" dependsOn(createRunnerV2ValidatesRunnerTest( name: 'validatesRunnerV2TestStreaming', pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], @@ -880,7 +880,7 @@ task postCommit { task postCommitRunnerV2 { group = "Verification" - description = "Various integration tests using the Dataflow runner V2." + description = "Various integration tests using the Dataflow Portable Runner." dependsOn googleCloudPlatformRunnerV2IntegrationTest dependsOn coreSDKJavaRunnerV2IntegrationTest } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 299e7fa21ed1..d04afb351e44 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1242,12 +1242,12 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) { @Override public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded - // to Runner v2. + // to Dataflow Portable Runner. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { if (!useUnifiedWorker(options)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); LOG.info( - "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + "Automatically enabling Dataflow Portable Runner since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); options.setExperiments( ImmutableList.builder().addAll(experiments).add("use_runner_v2").build()); @@ -1260,7 +1260,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { || hasExperiment(options, "disable_portable_runner") || hasExperiment(options, "enable_streaming_java_runner")) { throw new IllegalArgumentException( - "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); + "Dataflow Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } List experiments = new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true @@ -1374,10 +1374,13 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { - LOG.info("Skipping v1 transform replacements since job will run on v2."); + LOG.info( + "Skipping Dataflow Streaming Java Runner transform replacements since job will run on Dataflow Portable Runner."); } else { - // Now rewrite things to be as needed for v1 (mutates the pipeline) - // This way the job submitted is valid for v1 and v2, simultaneously + // Now rewrite things to be as needed for Dataflow Streaming Java Runner (mutates the + // pipeline) + // This way the job submitted is valid for Dataflow Streaming Java Runner and Dataflow + // Portable Runner, simultaneously replaceV1Transforms(pipeline); } // Capture the SdkComponents for look up during step translations @@ -1388,7 +1391,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - // No need to perform transform upgrading for the Runner v1 proto. + // No need to perform transform upgrading for the Dataflow Streaming Java Runner proto. RunnerApi.Pipeline dataflowV1PipelineProto = PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); @@ -1544,7 +1547,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setExperiments(experiments); LOG.warn( "The upload_graph experiment was specified, but it does not apply " - + "to runner v2 jobs. Option has been automatically removed."); + + "to Dataflow Portable Runner jobs. Option has been automatically removed."); } // Upload the job to GCS and remove the graph object from the API call. The graph diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ab3b62a0aa1b..9f812438451b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1847,7 +1847,10 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t ExperimentalOptions.addExperiment(options, disabledExperiment); Pipeline p = Pipeline.create(options); p.apply(Create.of("A")); - assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run); + assertThrows( + "Dataflow Portable Runner both disabled and enabled", + IllegalArgumentException.class, + p::run); } } } diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b7579cbacb8e..8a1f602105cc 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -979,7 +979,7 @@ enum ConnectivityType { CONNECTIVITY_TYPE_DIRECTPATH = 2; } -// Settings to control runtime behavior of the java runner v1 user worker. +// Settings to control runtime behavior of the Streaming Java Runner user worker. message UserWorkerRunnerV1Settings { optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3; diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index e968911fcca1..ce5e99b0a8de 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -334,7 +334,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs - // for runner v2. + // for Dataflow Portable Runner. var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { @@ -349,8 +349,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { - return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") + // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { + return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling Dataflow Portable Runner is no longer supported as of Beam version 2.45.0+") } } // Enable default experiments. @@ -368,7 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions } // Ensure that streaming specific experiments are set for streaming pipelines - // since runner v2 only supports using streaming engine. + // since Dataflow Portable Runner only supports using streaming engine. if streaming { if !seSet { experiments = append(experiments, "enable_streaming_engine") diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 23dcd034120a..83d3c0108439 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -20,6 +20,7 @@ import ( "flag" "reflect" "sort" + "strings" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/core" @@ -227,7 +228,7 @@ func TestGetJobOptions_NoExperimentsSetStreaming(t *testing.T) { } } -func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { +func TestGetJobOptions_DisableRunnerV2ExperimentsSetFailJob(t *testing.T) { resetGlobals() *stagingLocation = "gs://testStagingLocation" *gcpopts.Project = "testProject" @@ -238,6 +239,46 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { if err == nil { t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_DisablePortableRunnerExperimentsSetFailJob(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_portable_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSetFailJob(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "enable_streaming_java_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } else if !strings.Contains(err.Error(), "Disabling Dataflow Portable Runner is no longer supported") { + t.Errorf("getJobOptions() returned wrong error %q, want it to mention %q", err.Error(), "Disabling Dataflow Portable Runner is no longer supported") } if opts != nil { t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) diff --git a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py index aed1400bc4d8..97e93fb0bc3b 100644 --- a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py +++ b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py @@ -135,7 +135,7 @@ def _state_read_range(self, buffer_state, range_lo, range_hi): def _state_clear_range(self, buffer_state, range_lo, range_hi): """Clears a specified range of elements from the buffer state.""" - # TODO: Dataflow runner v2 gets stuck when MIN_TIMESTAMP is used + # TODO: Dataflow Portable Runner gets stuck when MIN_TIMESTAMP is used # as the lower bound for clear_range. Investigate this further. buffer_state.clear_range(range_lo, range_hi) diff --git a/sdks/python/apache_beam/examples/kafkataxi/README.md b/sdks/python/apache_beam/examples/kafkataxi/README.md index 86924200b1f2..ae1b6b9ba39e 100644 --- a/sdks/python/apache_beam/examples/kafkataxi/README.md +++ b/sdks/python/apache_beam/examples/kafkataxi/README.md @@ -67,7 +67,7 @@ Perform Beam runner specific setup. ℹ️ Note that cross-language transforms require portable implementations of Spark/Flink/Direct runners. Dataflow requires -[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). +[Dataflow Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). See [here](https://beam.apache.org/documentation/runners/dataflow/) for instructions for setting up Dataflow. diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index bc012bd7be9d..49725d54e990 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -563,7 +563,7 @@ def test_streaming_with_fixed_num_streams(self): @unittest.skip( "Streaming to the Storage Write API sink with autosharding is broken " - "with Dataflow Runner V2.") + "with Dataflow Portable Runner.") def test_streaming_with_auto_sharding(self): self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 87456124b816..8f72d45060ca 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -103,7 +103,7 @@ def _translate_step_name(self, internal_name): user_step_name = None if (self._job_graph and internal_name in self._job_graph.proto_pipeline.components.transforms.keys()): - # Dataflow Runner v2 with portable job submission uses proto transform map + # Dataflow Portable Runner with portable job submission uses proto transform map # IDs for step names. Also PTransform.unique_name maps to user step names. # Hence we lookup user step names based on the proto. user_step_name = self._job_graph.proto_pipeline.components.transforms[ diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0c23e6024dc6..c6e2a7bec3d8 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -65,7 +65,7 @@ BQ_SOURCE_UW_ERROR = ( 'The Read(BigQuerySource(...)) transform is not supported with newer stack ' - 'features (Fn API, Dataflow Runner V2, etc). Please use the transform ' + 'features (Fn API, Dataflow Portable Runner, etc). Please use the transform ' 'apache_beam.io.gcp.bigquery.ReadFromBigQuery instead.') @@ -320,7 +320,7 @@ def visit_transform(self, applied_transform): raise ValueError( 'CombineFn.setup and CombineFn.teardown are ' 'not supported with non-portable Dataflow ' - 'runner. Please use Dataflow Runner V2 instead.') + 'runner. Please use Dataflow Portable Runner instead.') @staticmethod def _overrides_setup_or_teardown(combinefn): @@ -342,7 +342,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" if _is_runner_v2_disabled(options): raise ValueError( - 'Disabling Runner V2 no longer supported ' + 'Disabling Dataflow Portable Runner no longer supported ' 'using Beam Python %s.' % beam.version.__version__) # Label goog-dataflow-notebook if job is started from notebook. @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options): debug_options.add_experiment('use_unified_worker') debug_options.add_experiment('use_runner_v2') debug_options.add_experiment('use_portable_job_submission') + # enable_portable_runner is not added by default as it is not documented. + # This behavior will be fixed in later versions. def _check_and_add_missing_options(options): @@ -648,8 +650,8 @@ def _check_and_add_missing_streaming_options(options): :param options: PipelineOptions for this pipeline. """ - # Streaming only supports using runner v2 (aka unified worker). - # Runner v2 only supports using streaming engine (aka windmill service) + # Streaming only supports using Dataflow Portable Runner (aka unified worker, runner v2). + # Dataflow Portable Runner only supports using streaming engine (aka windmill service) if options.view_as(StandardOptions).streaming: debug_options = options.view_as(DebugOptions) debug_options.add_experiment('enable_streaming_engine') @@ -659,9 +661,11 @@ def _check_and_add_missing_streaming_options(options): def _is_runner_v2_disabled(options): # Type: (PipelineOptions) -> bool - """Returns true if runner v2 is disabled.""" + """Returns true if Dataflow Portable Runner is disabled.""" debug_options = options.view_as(DebugOptions) return ( + debug_options.lookup_experiment('disable_portable_runner') or + debug_options.lookup_experiment('enable_streaming_java_runner') or debug_options.lookup_experiment('disable_runner_v2') or debug_options.lookup_experiment('disable_runner_v2_until_2023') or debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index e1b8be6682f9..d848b5ffe595 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -41,6 +41,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.internal import names from apache_beam.runners.runner import PipelineState @@ -733,6 +734,25 @@ def test_explicit_streaming_no_unbounded(self): p.result.job.proto.type, apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) + def test_runner_v2_disabled_experiments_raise(self): + disable_experiments = [ + 'disable_portable_runner', + 'enable_streaming_java_runner', + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_runner_v2_until_v2.50', + 'disable_prime_runner_v2', + ] + for experiment in disable_experiments: + options = PipelineOptions([f'--experiments={experiment}']) + self.assertTrue( + _is_runner_v2_disabled(options), + f'Expected {experiment} to disable Portable Runner') + with self.assertRaisesRegex( + ValueError, + 'Disabling Dataflow Portable Runner no longer supported.*'): + DataflowRunner().run_pipeline(None, options) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 4a7c61901de3..2cf0e79731ac 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -1153,7 +1153,7 @@ def to_split_int(n): # TODO: Used in legacy batch worker. Move under MetricUpdateTranslators -# after Runner V2 transition. +# after Dataflow Portable Runner transition. def translate_distribution(distribution_update, metric_update_proto): """Translate metrics DistributionUpdate to dataflow distribution update. @@ -1174,7 +1174,7 @@ def translate_distribution(distribution_update, metric_update_proto): metric_update_proto.distribution = dist_update_proto -# TODO: Used in legacy batch worker. Delete after Runner V2 transition. +# TODO: Used in legacy batch worker. Delete after Dataflow Portable Runner transition. def translate_value(value, metric_update_proto): metric_update_proto.integer = to_split_int(value) @@ -1203,8 +1203,8 @@ def get_container_image_from_options(pipeline_options): if worker_options.sdk_container_image: return worker_options.sdk_container_image - # Legacy and runner v2 exist in different repositories. - # Set to legacy format, override if runner v2 + # Dataflow Legacy and Portable Runner exist in different repositories. + # Set to legacy format, override if Dataflow Portable Runner container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( repository=container_repo, diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 55a2cf40b64a..cd01253cade7 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1096,7 +1096,7 @@ def setup(self, *args, **kwargs): before executing any of the other methods. The resources can then be disposed of in ``CombineFn.teardown``. - If you are using Dataflow, you need to enable Dataflow Runner V2 + If you are using Dataflow, you need to enable Dataflow Portable Runner before using this feature. Args: @@ -1194,7 +1194,7 @@ def extract_output(self, accumulator, *args, **kwargs): def teardown(self, *args, **kwargs): """Called to clean up an instance before it is discarded. - If you are using Dataflow, you need to enable Dataflow Runner V2 + If you are using Dataflow, you need to enable Dataflow Portable Runner before using this feature. Args: