Skip to content

Add deferrable mode to SFTPOperator#68298

Open
sunildataengineer wants to merge 6 commits into
apache:mainfrom
sunildataengineer:sftp-deferrable-clean
Open

Add deferrable mode to SFTPOperator#68298
sunildataengineer wants to merge 6 commits into
apache:mainfrom
sunildataengineer:sftp-deferrable-clean

Conversation

@sunildataengineer

@sunildataengineer sunildataengineer commented Jun 9, 2026

Copy link
Copy Markdown

Closes: #65475


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@sunildataengineer

Copy link
Copy Markdown
Author

@srchilukoori @potiuk @dabla

I’m opening a new PR as a continuation/replacement for accidentally closed PR #65480. While rebasing and cleaning up the branch history against upstream/main, I mistakenly performed a reset/rebase sequence that rewrote the branch history and caused GitHub to show “0 commits,” which made the original PR impossible to reopen properly.

This was completely unintentional, and I sincerely apologize for the confusion and extra noise caused during review. Over the past couple of months, I worked extensively on this contribution — implementing deferrable support for SFTPOperator, adding async trigger support, moving transfer logic into hooks, addressing multiple rounds of review feedback, fixing CI/lint/test failures, updating docs/newsfragments, and continuously rebasing on latest main.

Thankfully, the commits were still recoverable through git reflog, so I recreated the contribution cleanly from a fresh branch based on current upstream main. This new PR contains the same intended changes and review fixes from #65480, but with a clean branch history and correct diff.

Thank you again for all the reviews, feedback, patience, and guidance throughout this process. I truly appreciate the maintainers and reviewers taking another look at the contribution 🙏

@sunildataengineer

Copy link
Copy Markdown
Author

@dabla @potiuk — the "200 changed files" in the GitHub Files tab
was showing all commits including upstream merge commits.

The actual diff vs apache:main is only 11 files:

providers/sftp/src/.../constants.py
providers/sftp/src/.../hooks/sftp.py
providers/sftp/src/.../operators/sftp.py
providers/sftp/src/.../triggers/sftp.py
providers/sftp/tests/.../test_sftp.py
providers/sftp/tests/.../test_constants.py
providers/sftp/pyproject.toml
providers/sftp/docs/index.rst
providers/sftp/newsfragments/65480.feature.rst
scripts/ci/prek/known_airflow_exceptions.txt
uv.lock
I'll squash the commits to make the PR cleaner. 🙏

@dabla

dabla commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

You could have simply renamed your old corrupt branch, and created a new branch with same name and force pushed it, that way the existing PR (with review and comments) would have been kept, which would make review for us easier.

Now the context is unfortunately lost when creating a new PR, and we reviewers, have to check back with original PR. So please take that into consideration for the future, everyone makes mistakes me included, have encountered the same myself multiple times, that’s how I know the above trick is ideal in such situations.

@sunildataengineer

Copy link
Copy Markdown
Author

@dabla — thank you for the explanation and guidance. That makes complete sense now, and I really appreciate you sharing the branch recovery approach. I wasn’t aware that recreating the branch with the same name would preserve the original PR context and review history.

I’ll definitely keep this workflow in mind going forward. Apologies again for the inconvenience and extra review overhead caused by the rewritten branch history.

Thank you again for the patience and for continuing to review the contribution 🙏

@sunildataengineer

Copy link
Copy Markdown
Author

@dabla @potiuk @srchilukoori — I'm aware of PR #68520 implementing
the same feature. My PR #68298 represents 3+ months of work with
all your review feedback already incorporated across multiple cycles.

I've now fixed the branch cleanly — single commit, correct diff
(11 files), no conflicts. CI is running.

I respectfully ask that this PR be reviewed ahead of #68520 given
the prior review history and incorporated feedback. 🙏

- Add deferrable=True parameter to SFTPOperator
- Implement SFTPTrigger for async file transfers via asgiref sync_to_async
- Add transfer() method to SFTPHook and SFTPHookAsync (DRY principle)
- Operator and trigger delegate to hook.transfer()
- Add asgiref>=3.5.2 dependency
- Add newsfragment and update docs requirements table
- Add tests for deferrable mode and SFTPTrigger
Continuation of work from PR apache#65480
- Add deferrable=True parameter to SFTPOperator
- Implement SFTPTrigger for async file transfers via asgiref sync_to_async
- Add transfer() method to SFTPHook and SFTPHookAsync (DRY principle)
- Operator and trigger delegate to hook.transfer()
- Add asgiref>=3.5.2 dependency
- Add newsfragment and update docs requirements table
- Add tests for deferrable mode and SFTPTrigger
@sunildataengineer

Copy link
Copy Markdown
Author

@dabla @potiuk — branch fixed. All files now present and clean:
constants.py, hooks/sftp.py, operators/sftp.py, triggers/sftp.py,
test_sftp.py, test_constants.py, pyproject.toml, docs/index.rst,
newsfragment. No conflict markers, single clean commit. CI running now.

Given the 3+ months of review history already incorporated here,
I'd really appreciate a review pass once CI is green. Thank you
for your patience 🙏

Comment thread providers/sftp/src/airflow/providers/sftp/hooks/sftp.py
Comment thread providers/sftp/src/airflow/providers/sftp/triggers/sftp.py Outdated
@sunildataengineer

Copy link
Copy Markdown
Author

@dabla — both addressed:

  1. SFTPHookAsync.transfer() now uses native async I/O (retrieve_file/
    store_file/sftp.unlink) instead of the sync_to_async wrapper around
    SFTPHook.

  2. SFTPOperatorTrigger.run() now delegates directly to
    SFTPHookAsync.transfer() — _do_transfer removed entirely.

Newsfragment renamed to match this PR (68298.feature.rst).

Ready for re-review 🙏

@sunildataengineer sunildataengineer requested a review from dabla June 22, 2026 23:28
@dabla

dabla commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

@dabla — both addressed:

  1. SFTPHookAsync.transfer() now uses native async I/O (retrieve_file/
    store_file/sftp.unlink) instead of the sync_to_async wrapper around
    SFTPHook.
  2. SFTPOperatorTrigger.run() now delegates directly to
    SFTPHookAsync.transfer() — _do_transfer removed entirely.

Newsfragment renamed to match this PR (68298.feature.rst).

Ready for re-review 🙏

Do not forget to resolve comments once addressed...

Comment thread providers/sftp/src/airflow/providers/sftp/triggers/sftp.py Outdated
Comment thread providers/sftp/src/airflow/providers/sftp/hooks/sftp.py Outdated
Comment thread providers/sftp/src/airflow/providers/sftp/hooks/sftp.py Outdated
Comment thread providers/sftp/src/airflow/providers/sftp/hooks/sftp.py Outdated
Comment thread providers/sftp/src/airflow/providers/sftp/operators/sftp.py Outdated
…ion concurrent transfer() with asyncio.gather, use __name__ for method_name
@sunildataengineer

Copy link
Copy Markdown
Author

@dabla — all 3 addressed:

  1. Renamed SFTPOperatorTrigger → SFTPOperationTrigger across all files
  2. transfer() now opens a single SSH connection per call, with
    asyncio.Semaphore(concurrency) + asyncio.gather for bounded
    concurrent transfers — exactly per your suggested implementation
  3. method_name=self.execute_complete.name instead of the string literal

Ready for re-review 🙏

@eladkal eladkal requested a review from dabla June 25, 2026 08:40
@sunildataengineer

Copy link
Copy Markdown
Author

@ashb @dabla @potiuk @bugraoz93 @gopidesupavan @amoghrajesh @jason810496 @jscheffl

Thank you for the detailed feedback! I'm addressing the async optimization suggestions from @dabla. I'll update the implementation to:

  1. Use asyncio.gather() and semaphores for better concurrent transfers
  2. Reuse connections instead of creating new ones per transfer
  3. Rename the trigger class accordingly

Pushing updates shortly. Appreciate the thorough review!

@bugraoz93 bugraoz93 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR and updates! I have two things

One is package version. The other is enum approach. After finishing and seeing even their values are not used nor regenerated this should be definitely enum. If others hold, dataclass

Comment thread providers/sftp/pyproject.toml
Comment thread providers/sftp/src/airflow/providers/sftp/constants.py
Comment thread providers/sftp/src/airflow/providers/sftp/hooks/sftp.py
Comment thread providers/sftp/src/airflow/providers/sftp/triggers/sftp.py
@sunildataengineer

Copy link
Copy Markdown
Author

@bugraoz93 Thank you for the detailed review I've addressed all your concerns:

Issue 1: asgiref Version Bump
Issue 2: SFTPOperation Should Be an Enum

  • Changed: asgiref>=3.5.2asgiref>=3.11.1

  • Location: providers/sftp/pyproject.toml line 67

  • Reason: You're right, we should use the latest stable version instead of the 2022 release
    Created `providers/sftp/src/airflow/providers/sftp/constants.py
    python
    from enum import Enum

    class SFTPOperation(str, Enum):
    """Operation that can be used with SFTP."""

     PUT = "put"
     GET = "get"
     DELETE = "delete"
    

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add deferrable mode to SFTPOperator

3 participants