Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def process_fd(
fd,
log: Logger,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
*,
read_all_available: bool = False,
):
"""
Print output to logs.
Expand All @@ -134,20 +135,30 @@ def process_fd(
:param process_line_callback: Optional callback which can be used to process
stdout and stderr to detect job id.
:param log: logger.
:param read_all_available: If ``True``, read and process all currently buffered lines
until EOF. If ``False``, process a single line and return.
"""
if fd not in (proc.stdout, proc.stderr):
raise AirflowException("No data in stderr or in stdout.")

fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info}
func_log = fd_to_log[fd]

for line_raw in iter(fd.readline, b""):
if read_all_available:
line_iter = iter(fd.readline, b"")
else:
line_raw = fd.readline()
line_iter = iter((line_raw,)) if line_raw else iter(())

for line_raw in line_iter:
line = line_raw.decode()
if process_line_callback:
process_line_callback(line)
func_log(line.rstrip("\n"))
if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback():
return


def _should_return_early(callback: Callable[[], bool] | None) -> bool:
return bool(callback and callback())


def run_beam_command(
Expand Down Expand Up @@ -180,23 +191,27 @@ def run_beam_command(
log.info("Start waiting for Apache Beam process to complete.")
reads = [proc.stderr, proc.stdout]
while True:
if _should_return_early(is_dataflow_job_id_exist_callback):
return

# Wait for at least one available fd.
readable_fds, _, _ = select.select(reads, [], [], 5)
if readable_fds is None:
log.info("Waiting for Apache Beam process to complete.")
continue

for readable_fd in readable_fds:
process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback)
if is_dataflow_job_id_exist_callback and is_dataflow_job_id_exist_callback():
process_fd(proc, readable_fd, log, process_line_callback)

if _should_return_early(is_dataflow_job_id_exist_callback):
return

if _should_return_early(is_dataflow_job_id_exist_callback):
return

if proc.poll() is not None:
break

# Corner case: check if more output was created between the last read and the process termination
for readable_fd in reads:
process_fd(proc, readable_fd, log, process_line_callback, is_dataflow_job_id_exist_callback)
process_fd(proc, readable_fd, log, process_line_callback, read_all_available=True)

log.info("Process exited with return code: %s", proc.returncode)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,27 @@ def set_current_dataflow_job_id(job_id):

def __is_dataflow_job_id_exist_callback(self) -> Callable[[], bool]:
def is_dataflow_job_id_exist() -> bool:
return True if self.dataflow_job_id else False
if self.dataflow_job_id:
return True

if not self.dataflow_hook:
return False

resolved_job_id = self.dataflow_hook.get_job_id_by_name(
job_name=self.dataflow_job_name,
location=self.dataflow_config.location or DEFAULT_DATAFLOW_LOCATION,
project_id=self.dataflow_config.project_id,
)
if not resolved_job_id:
return False

self.dataflow_job_id = resolved_job_id
self.log.info(
"Resolved Dataflow job ID %s by name prefix '%s' while waiting for the launcher output.",
resolved_job_id,
self.dataflow_job_name,
)
return True

return is_dataflow_job_id_exist

Expand Down Expand Up @@ -468,6 +488,31 @@ def execute_on_dataflow(self, context: Context):
)

if self.deferrable:
if not self.dataflow_job_id:
# The Beam launcher's stdout did not contain the job-id line (e.g. the pipeline did
# not configure INFO-level logging so the Beam SDK's "Created job with id: [...]" line
# was never emitted). Resolve the ID via the Dataflow API before deferring so the
# trigger has a valid job_id to poll — matching the fallback already used by
# wait_for_done() in the synchronous path.
resolved_id = self.dataflow_hook.get_job_id_by_name(
job_name=self.dataflow_job_name,
location=location,
project_id=self.dataflow_config.project_id,
)
if resolved_id:
self.dataflow_job_id = resolved_id
self.log.info(
"Resolved Dataflow job ID %s by name prefix '%s' "
"(job-id line was absent from launcher stdout).",
resolved_id,
self.dataflow_job_name,
)
else:
self.log.warning(
"Could not resolve a Dataflow job ID for name prefix '%s'. "
"Deferring without a job ID; the trigger may fail.",
self.dataflow_job_name,
)
trigger_args = {
"job_id": self.dataflow_job_id,
"project_id": self.dataflow_config.project_id,
Expand Down Expand Up @@ -662,6 +707,28 @@ def execute_on_dataflow(self, context: Context):
project_id=self.dataflow_config.project_id,
)
if self.deferrable:
if not self.dataflow_job_id:
# Same stdout-scraping fallback as BeamRunPythonPipelineOperator: resolve
# by name when the Beam launcher produced no job-id line.
resolved_id = self.dataflow_hook.get_job_id_by_name(
job_name=self.dataflow_job_name,
location=self.dataflow_config.location,
project_id=self.dataflow_config.project_id,
)
if resolved_id:
self.dataflow_job_id = resolved_id
self.log.info(
"Resolved Dataflow job ID %s by name prefix '%s' "
"(job-id line was absent from launcher stdout).",
resolved_id,
self.dataflow_job_name,
)
else:
self.log.warning(
"Could not resolve a Dataflow job ID for name prefix '%s'. "
"Deferring without a job ID; the trigger may fail.",
self.dataflow_job_name,
)
trigger_args = {
"job_id": self.dataflow_job_id,
"project_id": self.dataflow_config.project_id,
Expand Down
43 changes: 28 additions & 15 deletions providers/apache/beam/tests/unit/apache/beam/hooks/test_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,19 +405,11 @@ def test_beam_wait_for_done_logging(self, mock_select, mock_popen, caplog):

mock_proc.stderr = fake_stderr_fd
mock_proc.stdout = fake_stdout_fd
fake_stderr_fd.readline.side_effect = [
b"apache-beam-stderr-1",
b"apache-beam-stderr-2",
StopIteration,
b"apache-beam-stderr-3",
StopIteration,
b"apache-beam-other-stderr",
]
fake_stdout_fd.readline.side_effect = [b"apache-beam-stdout", StopIteration]
fake_stderr_fd.readline.side_effect = [b"apache-beam-stderr-1", b""]
fake_stdout_fd.readline.side_effect = [b"apache-beam-stdout", b""]
mock_select.side_effect = [
([fake_stderr_fd], None, None),
(None, None, None),
([fake_stderr_fd], None, None),
([fake_stderr_fd, fake_stdout_fd], None, None),
([], None, None),
]
mock_proc.poll.side_effect = [None, True]
mock_proc.returncode = 1
Expand All @@ -436,9 +428,30 @@ def test_beam_wait_for_done_logging(self, mock_select, mock_popen, caplog):

warn_messages = [rt[2] for rt in caplog.record_tuples if rt[0] == logger_name and rt[1] == 30]
assert "apache-beam-stderr-1" in warn_messages
assert "apache-beam-stderr-2" in warn_messages
assert "apache-beam-stderr-3" in warn_messages
assert "apache-beam-other-stderr" in warn_messages

@mock.patch("subprocess.Popen")
@mock.patch("select.select", return_value=([], None, None))
def test_beam_returns_when_job_id_is_resolved_without_output(self, mock_select, mock_popen):
logger_name = "fake-beam-resolved-job-id-logger"
fake_logger = logging.getLogger(logger_name)

cmd = ["fake", "cmd"]
mock_proc = MagicMock(name="FakeProc")
mock_proc.stderr = MagicMock(name="FakeStderr")
mock_proc.stdout = MagicMock(name="FakeStdout")
mock_proc.poll.return_value = None
mock_popen.return_value = mock_proc
is_dataflow_job_id_exist_callback = MagicMock(side_effect=[False, True])

run_beam_command(
cmd,
fake_logger,
is_dataflow_job_id_exist_callback=is_dataflow_job_id_exist_callback,
)

mock_select.assert_called_once_with([mock_proc.stderr, mock_proc.stdout], [], [], 5)
mock_proc.poll.assert_not_called()
assert is_dataflow_job_id_exist_callback.call_count == 2


class TestBeamOptionsToArgs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,44 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __):
op.on_kill()
dataflow_cancel_job.assert_not_called()

@mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
def test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing(
self, dataflow_hook_mock, gcs_hook_mock, beam_hook_mock, persist_mock
):
"""Regression test for #68279: when the Beam launcher's stdout contains no job-id line
(e.g. the pipeline suppresses INFO logging), the deferrable path must resolve the job
ID from the Dataflow API by name before deferring instead of deferring with job_id=None.
"""
resolved_job_id = "resolved-job-id-from-api"
dataflow_hook_mock.return_value.get_job_id_by_name.return_value = resolved_job_id

dataflow_config = DataflowConfiguration(project_id=TEST_PROJECT)
op = BeamRunPythonPipelineOperator(
runner="DataflowRunner",
dataflow_config=dataflow_config,
**self.default_op_kwargs,
)
beam_hook_mock.return_value.start_python_pipeline.side_effect = lambda **kwargs: kwargs[
"is_dataflow_job_id_exist_callback"
]()
# Simulate launcher producing no job-id line: dataflow_job_id stays None.
assert op.dataflow_job_id is None

with pytest.raises(TaskDeferred) as exc_info:
op.execute_on_dataflow(context=mock.MagicMock())

dataflow_hook_mock.return_value.get_job_id_by_name.assert_called_once()
persist_mock.assert_called_once_with(
context=mock.ANY,
region=dataflow_config.location,
job_id=resolved_job_id,
project_id=dataflow_config.project_id,
)
assert exc_info.value.trigger.job_id == resolved_job_id


class TestBeamRunJavaPipelineOperatorAsync:
@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -1183,3 +1221,41 @@ def test_on_kill_direct_runner(self, _, dataflow_mock, __):
op.execute(mock.MagicMock())
op.on_kill()
dataflow_cancel_job.assert_not_called()

@mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist"))
@mock.patch(BEAM_OPERATOR_PATH.format("BeamHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("GCSHook"))
@mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))
def test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing(
self, dataflow_hook_mock, gcs_hook_mock, beam_hook_mock, persist_mock
):
"""Regression test for #68279: when the Beam launcher's stdout contains no job-id line,
the deferrable Java path must resolve the job ID from the Dataflow API before deferring.
"""
resolved_job_id = "resolved-java-job-id-from-api"
dataflow_hook_mock.return_value.is_job_dataflow_running.return_value = False
dataflow_hook_mock.return_value.get_job_id_by_name.return_value = resolved_job_id

dataflow_config = DataflowConfiguration(project_id=TEST_PROJECT)
op = BeamRunJavaPipelineOperator(
runner="DataflowRunner",
dataflow_config=dataflow_config,
**self.default_op_kwargs,
)
beam_hook_mock.return_value.start_java_pipeline.side_effect = lambda **kwargs: kwargs[
"is_dataflow_job_id_exist_callback"
]()
# Simulate no job-id captured from stdout.
assert op.dataflow_job_id is None

with pytest.raises(TaskDeferred) as exc_info:
op.execute_on_dataflow(context=mock.MagicMock())

dataflow_hook_mock.return_value.get_job_id_by_name.assert_called_once()
persist_mock.assert_called_once_with(
context=mock.ANY,
region=dataflow_config.location,
job_id=resolved_job_id,
project_id=dataflow_config.project_id,
)
assert exc_info.value.trigger.job_id == resolved_job_id
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,40 @@ def fetch_job_autoscaling_events_by_id(
)
return jobs_controller.fetch_job_autoscaling_events_by_id(job_id)

@GoogleBaseHook.fallback_to_default_project_id
def get_job_id_by_name(
self,
job_name: str,
location: str,
project_id: str,
) -> str | None:
"""
Return the Dataflow job ID for the most-recently submitted job whose name starts with *job_name*.

This mirrors the fallback performed by the synchronous ``wait_for_done`` path when the
Beam launcher's stdout does not contain the ``Created job with id: [...]`` line — e.g.
because the pipeline did not configure INFO-level logging and the Beam SDK's job-id log
line was therefore filtered out before it could be captured by ``JOB_ID_PATTERN``.

:param job_name: Job name prefix to search for (compared case-insensitively).
:param location: GCP region the job was submitted to.
:param project_id: GCP project ID.
:return: The job ID of the matched job, or ``None`` if no matching job was found.
"""
jobs_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
name=job_name,
location=location,
poll_sleep=self.poll_sleep,
num_retries=self.num_retries,
)
jobs = jobs_controller._fetch_jobs_by_prefix_name(job_name.lower())
if not jobs:
return None
# Return the most recently created job (last in the list returned by the API).
return jobs[-1]["id"]

@GoogleBaseHook.fallback_to_default_project_id
def wait_for_done(
self,
Expand Down
Loading