From f8117237ac3ddc355953064c14cc6f65802cbb01 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Thu, 18 Jun 2026 20:47:39 +0530 Subject: [PATCH] Fix deferrable Beam/Dataflow operator fails when job-id absent from launcher stdout When a DataflowRunner pipeline does not configure INFO-level logging the Beam SDK's 'Created job with id: [...]' line is suppressed, so JOB_ID_PATTERN never matches and self.dataflow_job_id stays None. The synchronous wait_for_done() path already handles this by resolving the job by name prefix; the deferrable path did not, so it deferred with job_id=None and the Dataflow API immediately rejected the trigger with '400 Request must contain a job and project id.' Fix: before constructing the trigger in execute_on_dataflow() for both BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator, if dataflow_job_id is still None call a new DataflowHook.get_job_id_by_name() helper that looks up the job by name prefix via the Dataflow REST API. This mirrors the existing synchronous fallback and ensures the trigger always receives a valid ID. Closes #68279 --- .../providers/apache/beam/hooks/beam.py | 35 ++++++--- .../providers/apache/beam/operators/beam.py | 69 ++++++++++++++++- .../tests/unit/apache/beam/hooks/test_beam.py | 43 +++++++---- .../unit/apache/beam/operators/test_beam.py | 76 +++++++++++++++++++ .../providers/google/cloud/hooks/dataflow.py | 34 +++++++++ 5 files changed, 231 insertions(+), 26 deletions(-) diff --git a/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py b/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py index 9dd36bd25a7b6..03b4d2249996c 100644 --- a/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py +++ b/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py @@ -124,7 +124,8 @@ def process_fd( fd, log: Logger, process_line_callback: Callable[[str], None] | None = None, - is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None, + *, + read_all_available: bool = False, ): """ Print output to logs. @@ -134,6 +135,8 @@ def process_fd( :param process_line_callback: Optional callback which can be used to process stdout and stderr to detect job id. :param log: logger. + :param read_all_available: If ``True``, read and process all currently buffered lines + until EOF. If ``False``, process a single line and return. """ if fd not in (proc.stdout, proc.stderr): raise AirflowException("No data in stderr or in stdout.") @@ -141,13 +144,21 @@ def process_fd( fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info} func_log = fd_to_log[fd] - for line_raw in iter(fd.readline, b""): + if read_all_available: + line_iter = iter(fd.readline, b"") + else: + line_raw = fd.readline() + line_iter = iter((line_raw,)) if line_raw else iter(()) + + for line_raw in line_iter: line = line_raw.decode() if process_line_callback: process_line_callback(line) func_log(line.rstrip("\n")) - if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback(): - return + + +def _should_return_early(callback: Callable[[], bool] | None) -> bool: + return bool(callback and callback()) def run_beam_command( @@ -180,23 +191,27 @@ def run_beam_command( log.info("Start waiting for Apache Beam process to complete.") reads = [proc.stderr, proc.stdout] while True: + if _should_return_early(is_dataflow_job_id_exist_callback): + return + # Wait for at least one available fd. readable_fds, _, _ = select.select(reads, [], [], 5) - if readable_fds is None: - log.info("Waiting for Apache Beam process to complete.") - continue for readable_fd in readable_fds: - process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback) - if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback(): + process_fd(proc, readable_fd, log, process_line_callback) + + if _should_return_early(is_dataflow_job_id_exist_callback): return + if _should_return_early(is_dataflow_job_id_exist_callback): + return + if proc.poll() is not None: break # Corner case: check if more output was created between the last read and the process termination for readable_fd in reads: - process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback) + process_fd(proc, readable_fd, log, process_line_callback, read_all_available=True) log.info("Process exited with return code: %s", proc.returncode) diff --git a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py index e42f1e1d60a39..2b8f083bbe9d5 100644 --- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py +++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py @@ -154,7 +154,27 @@ def set_current_dataflow_job_id(job_id): def __is_dataflow_job_id_exist_callback(self) -> Callable[[], bool]: def is_dataflow_job_id_exist() -> bool: - return True if self.dataflow_job_id else False + if self.dataflow_job_id: + return True + + if not self.dataflow_hook: + return False + + resolved_job_id = self.dataflow_hook.get_job_id_by_name( + job_name=self.dataflow_job_name, + location=self.dataflow_config.location or DEFAULT_DATAFLOW_LOCATION, + project_id=self.dataflow_config.project_id, + ) + if not resolved_job_id: + return False + + self.dataflow_job_id = resolved_job_id + self.log.info( + "Resolved Dataflow job ID %s by name prefix '%s' while waiting for the launcher output.", + resolved_job_id, + self.dataflow_job_name, + ) + return True return is_dataflow_job_id_exist @@ -468,6 +488,31 @@ def execute_on_dataflow(self, context: Context): ) if self.deferrable: + if not self.dataflow_job_id: + # The Beam launcher's stdout did not contain the job-id line (e.g. the pipeline did + # not configure INFO-level logging so the Beam SDK's "Created job with id: [...]" line + # was never emitted). Resolve the ID via the Dataflow API before deferring so the + # trigger has a valid job_id to poll — matching the fallback already used by + # wait_for_done() in the synchronous path. + resolved_id = self.dataflow_hook.get_job_id_by_name( + job_name=self.dataflow_job_name, + location=location, + project_id=self.dataflow_config.project_id, + ) + if resolved_id: + self.dataflow_job_id = resolved_id + self.log.info( + "Resolved Dataflow job ID %s by name prefix '%s' " + "(job-id line was absent from launcher stdout).", + resolved_id, + self.dataflow_job_name, + ) + else: + self.log.warning( + "Could not resolve a Dataflow job ID for name prefix '%s'. " + "Deferring without a job ID; the trigger may fail.", + self.dataflow_job_name, + ) trigger_args = { "job_id": self.dataflow_job_id, "project_id": self.dataflow_config.project_id, @@ -662,6 +707,28 @@ def execute_on_dataflow(self, context: Context): project_id=self.dataflow_config.project_id, ) if self.deferrable: + if not self.dataflow_job_id: + # Same stdout-scraping fallback as BeamRunPythonPipelineOperator: resolve + # by name when the Beam launcher produced no job-id line. + resolved_id = self.dataflow_hook.get_job_id_by_name( + job_name=self.dataflow_job_name, + location=self.dataflow_config.location, + project_id=self.dataflow_config.project_id, + ) + if resolved_id: + self.dataflow_job_id = resolved_id + self.log.info( + "Resolved Dataflow job ID %s by name prefix '%s' " + "(job-id line was absent from launcher stdout).", + resolved_id, + self.dataflow_job_name, + ) + else: + self.log.warning( + "Could not resolve a Dataflow job ID for name prefix '%s'. " + "Deferring without a job ID; the trigger may fail.", + self.dataflow_job_name, + ) trigger_args = { "job_id": self.dataflow_job_id, "project_id": self.dataflow_config.project_id, diff --git a/providers/apache/beam/tests/unit/apache/beam/hooks/test_beam.py b/providers/apache/beam/tests/unit/apache/beam/hooks/test_beam.py index e9750170280c0..f65e60718e7b7 100644 --- a/providers/apache/beam/tests/unit/apache/beam/hooks/test_beam.py +++ b/providers/apache/beam/tests/unit/apache/beam/hooks/test_beam.py @@ -405,19 +405,11 @@ def test_beam_wait_for_done_logging(self, mock_select, mock_popen, caplog): mock_proc.stderr = fake_stderr_fd mock_proc.stdout = fake_stdout_fd - fake_stderr_fd.readline.side_effect = [ - b"apache-beam-stderr-1", - b"apache-beam-stderr-2", - StopIteration, - b"apache-beam-stderr-3", - StopIteration, - b"apache-beam-other-stderr", - ] - fake_stdout_fd.readline.side_effect = [b"apache-beam-stdout", StopIteration] + fake_stderr_fd.readline.side_effect = [b"apache-beam-stderr-1", b""] + fake_stdout_fd.readline.side_effect = [b"apache-beam-stdout", b""] mock_select.side_effect = [ - ([fake_stderr_fd], None, None), - (None, None, None), - ([fake_stderr_fd], None, None), + ([fake_stderr_fd, fake_stdout_fd], None, None), + ([], None, None), ] mock_proc.poll.side_effect = [None, True] mock_proc.returncode = 1 @@ -436,9 +428,30 @@ def test_beam_wait_for_done_logging(self, mock_select, mock_popen, caplog): warn_messages = [rt[2] for rt in caplog.record_tuples if rt[0] == logger_name and rt[1] == 30] assert "apache-beam-stderr-1" in warn_messages - assert "apache-beam-stderr-2" in warn_messages - assert "apache-beam-stderr-3" in warn_messages - assert "apache-beam-other-stderr" in warn_messages + + @mock.patch("subprocess.Popen") + @mock.patch("select.select", return_value=([], None, None)) + def test_beam_returns_when_job_id_is_resolved_without_output(self, mock_select, mock_popen): + logger_name = "fake-beam-resolved-job-id-logger" + fake_logger = logging.getLogger(logger_name) + + cmd = ["fake", "cmd"] + mock_proc = MagicMock(name="FakeProc") + mock_proc.stderr = MagicMock(name="FakeStderr") + mock_proc.stdout = MagicMock(name="FakeStdout") + mock_proc.poll.return_value = None + mock_popen.return_value = mock_proc + is_dataflow_job_id_exist_callback = MagicMock(side_effect=[False, True]) + + run_beam_command( + cmd, + fake_logger, + is_dataflow_job_id_exist_callback=is_dataflow_job_id_exist_callback, + ) + + mock_select.assert_called_once_with([mock_proc.stderr, mock_proc.stdout], [], [], 5) + mock_proc.poll.assert_not_called() + assert is_dataflow_job_id_exist_callback.call_count == 2 class TestBeamOptionsToArgs: diff --git a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py index 9c21fe8d2becb..5dfcbf35a5589 100644 --- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py +++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py @@ -1067,6 +1067,44 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __): op.on_kill() dataflow_cancel_job.assert_not_called() + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist")) + @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook")) + def test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing( + self, dataflow_hook_mock, gcs_hook_mock, beam_hook_mock, persist_mock + ): + """Regression test for #68279: when the Beam launcher's stdout contains no job-id line + (e.g. the pipeline suppresses INFO logging), the deferrable path must resolve the job + ID from the Dataflow API by name before deferring instead of deferring with job_id=None. + """ + resolved_job_id = "resolved-job-id-from-api" + dataflow_hook_mock.return_value.get_job_id_by_name.return_value = resolved_job_id + + dataflow_config = DataflowConfiguration(project_id=TEST_PROJECT) + op = BeamRunPythonPipelineOperator( + runner="DataflowRunner", + dataflow_config=dataflow_config, + **self.default_op_kwargs, + ) + beam_hook_mock.return_value.start_python_pipeline.side_effect = lambda **kwargs: kwargs[ + "is_dataflow_job_id_exist_callback" + ]() + # Simulate launcher producing no job-id line: dataflow_job_id stays None. + assert op.dataflow_job_id is None + + with pytest.raises(TaskDeferred) as exc_info: + op.execute_on_dataflow(context=mock.MagicMock()) + + dataflow_hook_mock.return_value.get_job_id_by_name.assert_called_once() + persist_mock.assert_called_once_with( + context=mock.ANY, + region=dataflow_config.location, + job_id=resolved_job_id, + project_id=dataflow_config.project_id, + ) + assert exc_info.value.trigger.job_id == resolved_job_id + class TestBeamRunJavaPipelineOperatorAsync: @pytest.fixture(autouse=True) @@ -1183,3 +1221,41 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __): op.execute(mock.MagicMock()) op.on_kill() dataflow_cancel_job.assert_not_called() + + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist")) + @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook")) + def test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing( + self, dataflow_hook_mock, gcs_hook_mock, beam_hook_mock, persist_mock + ): + """Regression test for #68279: when the Beam launcher's stdout contains no job-id line, + the deferrable Java path must resolve the job ID from the Dataflow API before deferring. + """ + resolved_job_id = "resolved-java-job-id-from-api" + dataflow_hook_mock.return_value.is_job_dataflow_running.return_value = False + dataflow_hook_mock.return_value.get_job_id_by_name.return_value = resolved_job_id + + dataflow_config = DataflowConfiguration(project_id=TEST_PROJECT) + op = BeamRunJavaPipelineOperator( + runner="DataflowRunner", + dataflow_config=dataflow_config, + **self.default_op_kwargs, + ) + beam_hook_mock.return_value.start_java_pipeline.side_effect = lambda **kwargs: kwargs[ + "is_dataflow_job_id_exist_callback" + ]() + # Simulate no job-id captured from stdout. + assert op.dataflow_job_id is None + + with pytest.raises(TaskDeferred) as exc_info: + op.execute_on_dataflow(context=mock.MagicMock()) + + dataflow_hook_mock.return_value.get_job_id_by_name.assert_called_once() + persist_mock.assert_called_once_with( + context=mock.ANY, + region=dataflow_config.location, + job_id=resolved_job_id, + project_id=dataflow_config.project_id, + ) + assert exc_info.value.trigger.job_id == resolved_job_id diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py b/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py index ea1af01c1ad50..ef31fa0edfcfc 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py @@ -1246,6 +1246,40 @@ def fetch_job_autoscaling_events_by_id( ) return jobs_controller.fetch_job_autoscaling_events_by_id(job_id) + @GoogleBaseHook.fallback_to_default_project_id + def get_job_id_by_name( + self, + job_name: str, + location: str, + project_id: str, + ) -> str | None: + """ + Return the Dataflow job ID for the most-recently submitted job whose name starts with *job_name*. + + This mirrors the fallback performed by the synchronous ``wait_for_done`` path when the + Beam launcher's stdout does not contain the ``Created job with id: [...]`` line — e.g. + because the pipeline did not configure INFO-level logging and the Beam SDK's job-id log + line was therefore filtered out before it could be captured by ``JOB_ID_PATTERN``. + + :param job_name: Job name prefix to search for (compared case-insensitively). + :param location: GCP region the job was submitted to. + :param project_id: GCP project ID. + :return: The job ID of the matched job, or ``None`` if no matching job was found. + """ + jobs_controller = _DataflowJobsController( + dataflow=self.get_conn(), + project_number=project_id, + name=job_name, + location=location, + poll_sleep=self.poll_sleep, + num_retries=self.num_retries, + ) + jobs = jobs_controller._fetch_jobs_by_prefix_name(job_name.lower()) + if not jobs: + return None + # Return the most recently created job (last in the list returned by the API). + return jobs[-1]["id"] + @GoogleBaseHook.fallback_to_default_project_id def wait_for_done( self,