[AIRFLOW-198] Implement 'only_run_latest' feature in BaseOperator#1562
[AIRFLOW-198] Implement 'only_run_latest' feature in BaseOperator#1562PeterAttardo wants to merge 4 commits into
Conversation
| ).first() | ||
| if ti: | ||
| if self.state in State.runnable(): | ||
| self.set_state(State.FUTURE_SUCCEEDED, session) |
There was a problem hiding this comment.
Having an is_ method change state is a little unintuitive. @bolkedebruin just did some scheduler work to clean up this kind of pattern. Can you split the is_ method from the set call, so that change state is explicit, rather than done via what appears to be a read method?
There was a problem hiding this comment.
Agreed that it's not the cleanest, but I'm not sure how easily it can be extracted. It behaves very similarly to evaluate_trigger_rule() which gets called from are_dependencies_met() in the same chain. Is there a work in progress branch anywhere that shows how state assignments in evaluate_trigger_rule() were brought out of the is_ stack that I could reference? The other change I could make would be to add a flag similar to flag_upstream_failed so that state would only change if the original caller explicitly passed through the flag enabling it.
There was a problem hiding this comment.
you can also just change the name of the method to reflect what is taking place
|
One thing that I notice about this PR is that it doesn't appear to try to execute the most recent execution_date first in a case where there are multiple DAG runs that need to be run. For example, if you have 10 DAG runs that all need to be run, and only_run_latest=True, then it makes sense to run the most recent one first and skip the other 9. @bolkedebruin @jlowin @mistercrunch Do we want to introduce a new state for this (FUTURE_SUCCEEDED) or just use SKIPPED? This PR should include some tests as well. |
|
I considered the issue to be two related issues:
This PR only addresses the first of those. The second piece raised many more architectural questions and potential changes to the way Airflow schedules jobs. I didn't want to commit to a potentially large architectural change before seeing how the maintainers were thinking about designing for this feature. |
| TaskInstance.task_id.in_(task.downstream_task_ids), | ||
| TaskInstance.execution_date == self.execution_date, | ||
| TaskInstance.state == State.SUCCESS, | ||
| TaskInstance.state == State.SUCCESS or TaskInstance.state == State.FUTURE_SUCCEEDED, |
There was a problem hiding this comment.
TaskInstance.state.in_((State.SUCCESS, State.FUTURE_SUCCEEDED))
|
Yes it conflicts, thanks for finding this Max. Under the new model (you can see in my PR that max linked #1525 ) you would create a dependency class for future succeeded. |
|
This has been resolved by add a LatestOnlyOperator in #1752 |
Dear Airflow Maintainers,
Please accept this PR that addresses the following issues: