Skip to content

Resolve Beam Dataflow job id by name after launcher returns#67711

Closed
evgeniy-b wants to merge 1 commit into
apache:mainfrom
evgeniy-b:fix-beam-dataflow-job-id-resolve-by-name
Closed

Resolve Beam Dataflow job id by name after launcher returns#67711
evgeniy-b wants to merge 1 commit into
apache:mainfrom
evgeniy-b:fix-beam-dataflow-job-id-resolve-by-name

Conversation

@evgeniy-b

Copy link
Copy Markdown
Contributor

Resolve dataflow_job_id on BeamRun{Python,Java,Go}PipelineOperator by looking it up via the Dataflow API after the Beam launcher subprocess returns, instead of relying on the Beam SDK stdout regex (JOB_ID_PATTERN) which silently leaves the id as None when the line is missing or formatted differently and breaks deferred polling, on_kill, and xcom consumers downstream.

Adds DataflowHook.fetch_job_id_by_name alongside the existing name-based lookups (is_job_dataflow_running, cancel_job, get_job): lists active jobs whose name starts with the configured dataflow_job_name and returns the id when exactly one match is found. Lookup failures are logged and swallowed.


Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code)

Generated-by: Claude Code following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@evgeniy-b evgeniy-b requested a review from shahar1 as a code owner May 29, 2026 12:01
@boring-cyborg boring-cyborg Bot added area:providers provider:apache-beam provider:google Google (including GCP) related issues labels May 29, 2026
`process_line_and_extract_dataflow_job_id_callback` in
`airflow.providers.google.cloud.hooks.dataflow` extracts the Dataflow
job id from the Beam SDK's stdout via `JOB_ID_PATTERN`. When the line is
missing or formatted differently, `dataflow_job_id` stays `None` and any
downstream call that requires it (deferred polling, on_kill, xcom
consumers) fails.

Drop the stdout scrape from
`BeamRunPythonPipelineOperator.execute_on_dataflow`,
`BeamRunJavaPipelineOperator.execute_on_dataflow`, and
`BeamRunGoPipelineOperator.execute_on_dataflow`, and look the job id up
once via the Dataflow API after the Beam launcher subprocess returns.
Add `DataflowHook.fetch_job_id_by_name` alongside the other name-based
lookups (`is_job_dataflow_running`, `cancel_job`, `get_job`): list
active jobs whose name starts with the configured `dataflow_job_name`
and return the id when exactly one match is found. Lookup failures are
logged and swallowed.
@evgeniy-b evgeniy-b force-pushed the fix-beam-dataflow-job-id-resolve-by-name branch from 18b7035 to ebf385d Compare May 29, 2026 12:40

@MaksYermak MaksYermak left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evgeniy-b could you run system tests for Dataflow and provide screenshots from Airflow UI that there are passed successfully?

Comment on lines +1148 to +1149
if len(jobs) != 1:
return None

@MaksYermak MaksYermak May 29, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evgeniy-b as I understand in case when users run in parallel 2 or more Jobs with the same name or on Dataflow the Job with this name already present than this code returns None as JobID value, please correct me if I am wrong?

In the current logic with callbacks the code parse Apache Beam logs for availability of JobID and when getting it then starts the waiting process in deferrable or non-deferable mode. It means that we always have unique Job ID.

This new logic looks for me as a breaking change because returns None as JobID in case when in Dataflow the users have 2 or more Jobs with the same name. It is possible scenario for the most of our users because in Dataflow is impossible to remove finished Jobs the user can only archived it. And our _fetch_all_jobs method does not sort Jobs by finished or running and returns all Jobs with the same name.

@evgeniy-b evgeniy-b May 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain a bit how I arrived here. On an airflow cluster I maintain I noticed python beam jobs running with deferrable=False, so I switched that flag to true to not waste worker resources. On the next day the jobs failed while transitioning to async triggers because their STDOUT didn't contain the job ID. In the sync mode a missing job ID doesn't prevent the task from succeeding:

_DataflowJobsController.wait_for_done polls self._refresh_jobs():

def wait_for_done(self) -> None:
"""Wait for result of submitted job."""
self.log.info("Start waiting for done.")
self._refresh_jobs()
while self._jobs and not all(
self.job_reached_terminal_state(job, self._wait_until_finished, self._expected_terminal_state)
for job in self._jobs
):
self.log.info("Waiting for done. Sleep %s s", self._poll_sleep)
time.sleep(self._poll_sleep)
self._refresh_jobs()

_refresh_jobs calls self._get_current_jobs():

def _refresh_jobs(self) -> None:
"""
Get all jobs by name.
:return: jobs
"""
self._jobs = self._get_current_jobs()

_get_current_jobs — with no _job_id — calls self._fetch_jobs_by_prefix_name(self._job_name.lower()):

def _get_current_jobs(self) -> list[dict]:
"""
Get list of jobs that start with job name or id.
:return: list of jobs including id's
"""
if not self._multiple_jobs and self._job_id:
return [self.fetch_job_by_id(self._job_id)]
if self._jobs:
return [self.fetch_job_by_id(job["id"]) for job in self._jobs]
if self._job_name:
jobs = self._fetch_jobs_by_prefix_name(self._job_name.lower())

_fetch_jobs_by_prefix_name calls self._fetch_all_jobs() and returns every prefix-matched job (archived + running, no terminal-state filter):

def _fetch_jobs_by_prefix_name(self, prefix_name: str) -> list[dict]:
jobs = self._fetch_all_jobs()
jobs = [job for job in jobs if job["name"].startswith(prefix_name)]
return jobs

So today's sync path already silently picks up every prefix-matched job whenever the regex misses.

With default append_job_name=True the job name will be unique and job ID will be retrieved.
But you are right, it is a degradation: for jobs without unique names but printing out their IDs to console, the job ID will become missing.

I guess an alternative could be to replicate the sync mode's behavior in the async path which currently fails without job_id. However it means that xcom and a link to the job will stay broken.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think job ID in output detection should be reverted. While it is awkward in principle, it is the only way (?) to reliably get ID when job names are not unique. Then name-based ID detection can be used as a fallback but only when append_job_name=True. And if the trigger receives empty job ID it should fallback to polling status of all jobs matching the name (and not in terminal status).
@MaksYermak what's your take on this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evgeniy-b I do not like idea using job name for checking job status, because, as I already mentioned, is not unique and all manipulation with a code looks like workarounds when we try to introduce additional parameters for making job name kind unique, but it still not.

For example when user start two parallel tasks with Jobs which will have the same Job name and unique JobIDs for this case what Job this code grab for checking the status? As I understand not a single one or, maybe, the first JobID from the job list then both task will monitor the same job which is wrong. I do not see any solution how we can distinguish two Job with the same name between the tasks in parallel run and how task should understand what Job to pick. This solution with callbacks was introduced in the beginner of life for Apache Beam operators and removing it completely is breaking change for users.

About problem which you mentioned.
What version of Apache Beam provider do you use on your Airflow cluster? Because problem which you described should not happened because of this code. This code use callback for getting Job ID from STDOUT for Dataflow runner before stating to wait in non-deferrable or deferrable modes. It means that changing value for deferrable flag from False to True does not apply to callback logic at all, because the code always use callbacks for Dataflow runner. And only after getting Job ID decides in what mode wait for result in deferrable or non-deferrable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that job names are not unique and totally agree that using names for status checks is awkward.

This solution with callbacks was introduced in the beginner of life for Apache Beam operators and removing it completely is breaking change for users.

Fair. I'm not proposing to remove it anymore because it would be a regression.

This code use callback for getting Job ID from STDOUT for Dataflow runner before stating to wait in non-deferrable or deferrable modes.

It works only when STDOUT contains the job ID. In my case the job's output didn't include it. So jobId=None. The divergence is in how deferrable/worker mode treat missing job IDs. Trigger fails immediately while the sync worker path lists all jobs by the name and monitors statuses of all matching jobs (I linked exact code lines in the previous comment).
It's a real bug: all tasks in deferrable mode whose job outputs didn't match Job ID regex will fail.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works only when STDOUT contains the job ID. In my case the job's output didn't include it. So jobId=None. The divergence is in how deferrable/worker mode treat missing job IDs. Trigger fails immediately while the sync worker path lists all jobs by the name and monitors statuses of all matching jobs (I linked exact code lines in the previous comment).
It's a real bug: all tasks in deferrable mode whose job outputs didn't match Job ID regex will fail.

Hmm I am still do not understand how it can be possible, in your case, to start deferrable mode without JobID. Because in the current code we have this logic for process_fd and this logic for run_beam_command. As you can see, in the code we have while True loop which reads logs from Beam run process till the Job finished. And only in case, when JobID presents the code leaves this loop and starts waiting process using Dataflow API via deferrable or non-deferrable modes. Otherwise, if you do not have JobID then the code runs your Job in non-deferrable mode till the end and never use Dataflow API for checkin status.

I see only one scenario when the fail in deferrable mode can be possible when without JobID this infinite loop goes to the end and successfully finished the Job. And after that Operator tries to start deferrable mode and failed because the JobID is empty. And in non-deferrable mode everything is fine because for wait_for_done, the JobID can be None. I think this can be your's scenario, but I need equivalent of your's Pipeline script for reproduction.

Could you please share Apache Beam provider version which you use and the code for reproduction this issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see only one scenario when the fail in deferrable mode can be possible when without JobID this infinite loop goes to the end and successfully finished the Job. And after that Operator tries to start deferrable mode and failed because the JobID is empty.

Right. I think this is exactly what did happen. It matches the logs: job starts at 03:19, completes at 04:49 and defers. The error is raised only at 05:30 because the task runs in a pool with limited concurrency.

2026-05-28T03:19:52.878371238Z	2	INFO	Beam version: 2.71.0
2026-05-28T03:19:52.878596782Z	2	INFO	Running command: python3 /tmp/xxx.py --runner=DataflowRunner --job_name=xxx-e8245706 --service_account=xxx@yyy.iam.gserviceaccount.com --project=xxx --region=europe-west1 --labels=airflow-version=v3-1-7-composer ...
2026-05-28T03:19:52.879618883Z	2	INFO	Start waiting for Apache Beam process to complete.
2026-05-28T03:19:54.525101900Z	2	WARNING	WARNING:root:crcmod package not found. This package is required if python-snappy or google-crc32c are not installed. To ensure crcmod is installed, install the tfrecord extra: pip install apache-beam[tfrecord]
2026-05-28T04:15:13.948502540Z	2	WARNING	WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
2026-05-28T04:15:13.949025630Z	2	WARNING	WARNING:google_auth_httplib2:httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
2026-05-28T04:49:18.090466022Z	2	INFO	Process exited with return code: 0
2026-05-28T04:49:18.126811027Z	2	INFO	Pausing task as DEFERRED.  [dag_id=yyy] [task_id=xxx] [run_id=scheduled__2026-05-26T00:00:00+00:00]
2026-05-28T04:49:18.309519529Z	2	INFO	Task finished [task_instance_id=019e6c93-6986-78e0-8590-72a67d0c1bf9] [exit_code=0] [duration=5392.726224065] [final_state=deferred]
2026-05-28T05:30:02.155142307Z	2	INFO	Getting connection using `google.auth.default()` since no explicit credentials are provided.
2026-05-28T05:30:02.159145593Z	2	INFO	Secrets backends loaded for worker [count=2] [backend_classes=['CloudSecretManagerBackend', 'EnvironmentVariablesBackend']]
2026-05-28T05:30:04.757926940Z	2	INFO	DAG bundles loaded: dags-folder
2026-05-28T05:30:04.758570909Z	2	INFO	Filling up the DagBag from /home/airflow/gcs/dags/recommendations/yyy/yyy.py
2026-05-28T05:30:24.896252393Z	2	ERROR	Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 1068, in run
    result = _execute_task(context=context, ti=ti, log=log)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py", line 1472, in _execute_task
    result = ctx.run(execute, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/bases/operator.py", line 1633, in resume_execution
    return execute_callable(context, **next_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 300, in execute_complete
    raise AirflowException(event["message"])
airflow.exceptions.AirflowException: 400 Request must contain a job and project id.

I can prepare an example to reproduce. Should I open it as a new bug ticket?
I can also create a PR right away if we agree on the fix approach. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evgeniy-b in my opinion it makes sense to create an airflow issue with all reproduction steps and then continue discussion about fix

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Here is the ticket #68279

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:apache-beam provider:google Google (including GCP) related issues ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants