From 021566c6ef7935c61fe208991ededf853a801494 Mon Sep 17 00:00:00 2001 From: Anish Date: Sun, 18 Jan 2026 15:50:59 -0600 Subject: [PATCH 1/4] 60558 (feature) : added ssh sftp object store --- providers/sftp/docs/filesystems/index.rst | 26 ++ providers/sftp/docs/filesystems/sftp.rst | 244 ++++++++++++++++++ providers/sftp/docs/index.rst | 7 + providers/sftp/provider.yaml | 3 + providers/sftp/pyproject.toml | 3 + .../src/airflow/providers/sftp/fs/__init__.py | 16 ++ .../src/airflow/providers/sftp/fs/sftp.py | 65 +++++ .../providers/sftp/get_provider_info.py | 1 + providers/sftp/tests/unit/sftp/fs/__init__.py | 16 ++ .../sftp/tests/unit/sftp/fs/test_sftp.py | 222 ++++++++++++++++ 10 files changed, 603 insertions(+) create mode 100644 providers/sftp/docs/filesystems/index.rst create mode 100644 providers/sftp/docs/filesystems/sftp.rst create mode 100644 providers/sftp/src/airflow/providers/sftp/fs/__init__.py create mode 100644 providers/sftp/src/airflow/providers/sftp/fs/sftp.py create mode 100644 providers/sftp/tests/unit/sftp/fs/__init__.py create mode 100644 providers/sftp/tests/unit/sftp/fs/test_sftp.py diff --git a/providers/sftp/docs/filesystems/index.rst b/providers/sftp/docs/filesystems/index.rst new file mode 100644 index 0000000000000..eb0036177a3fa --- /dev/null +++ b/providers/sftp/docs/filesystems/index.rst @@ -0,0 +1,26 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Filesystems +=========== + +.. toctree:: + :maxdepth: 1 + :caption: Filesystem Providers + :glob: + + * diff --git a/providers/sftp/docs/filesystems/sftp.rst b/providers/sftp/docs/filesystems/sftp.rst new file mode 100644 index 0000000000000..0b9ce20003777 --- /dev/null +++ b/providers/sftp/docs/filesystems/sftp.rst @@ -0,0 +1,244 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SFTP / SSH Filesystem +===================== + +The SFTP filesystem provides access to remote servers via SSH File Transfer Protocol (SFTP) through +Airflow's ``ObjectStoragePath`` interface. This allows you to perform file operations on remote SSH +servers using the same API as other object storage backends like S3 or GCS. + +Supported URL formats: + +* ``sftp://connection_id@hostname/path/to/file`` +* ``ssh://connection_id@hostname/path/to/file`` + +Installation +------------ + +The SFTP filesystem requires the ``sshfs`` library. Install it with: + +.. code-block:: bash + + pip install apache-airflow-providers-sftp[sshfs] + +Connection Configuration +------------------------ + +The SFTP filesystem uses Airflow's SFTP connection type. Create a connection with the following parameters: + +* **Connection Type**: sftp +* **Host**: Remote server hostname or IP address +* **Port**: SSH port (default: 22) +* **Login**: SSH username +* **Password**: SSH password (if using password authentication) + +Additional configuration via connection extras: + +* **key_file**: Path to the private SSH key file for key-based authentication +* **private_key**: Content of the private key (PEM format) for key-based authentication +* **private_key_passphrase**: Passphrase for the private key (if encrypted) +* **no_host_key_check**: Set to ``true`` to disable host key verification (not recommended for production) + +For more details on connection configuration, see :doc:`/connections/sftp`. + +Connection extra field configuration examples: + +**Using password authentication:** + +.. code-block:: json + + {} + +**Using key file:** + +.. code-block:: json + + { + "key_file": "/path/to/private_key" + } + +**Using private key content:** + +.. code-block:: json + + { + "private_key": "", + "private_key_passphrase": "optional_passphrase" + } + +**Disabling host key verification (use with caution):** + +.. code-block:: json + + { + "no_host_key_check": "true" + } + +Usage Examples +-------------- + +Basic File Operations +^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from airflow.sdk import ObjectStoragePath + + # Access a file on a remote SFTP server + path = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/data.csv") + + # Read file content + with path.open("r") as f: + content = f.read() + + # Write to a file + output_path = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/output.txt") + with output_path.open("w") as f: + f.write("Hello from Airflow!") + +Directory Operations +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # List directory contents + remote_dir = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/data/") + + for item in remote_dir.iterdir(): + print(f"Found: {item.name}") + if item.is_file(): + print(f" Size: {item.stat().st_size} bytes") + + # Create a directory + new_dir = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/new_folder/") + new_dir.mkdir(parents=True, exist_ok=True) + +Copying Files +^^^^^^^^^^^^^ + +.. code-block:: python + + # Copy from SFTP to local + remote_file = ObjectStoragePath("sftp://my_sftp_conn@remote-server/data/input.csv") + local_file = ObjectStoragePath("file:///tmp/input.csv") + remote_file.copy(local_file) + + # Copy from local to SFTP + local_output = ObjectStoragePath("file:///tmp/output.csv") + remote_output = ObjectStoragePath("sftp://my_sftp_conn@remote-server/data/output.csv") + local_output.copy(remote_output) + + # Copy between different SFTP servers + source = ObjectStoragePath("sftp://server1_conn@server1/data/file.txt") + target = ObjectStoragePath("sftp://server2_conn@server2/backup/file.txt") + source.copy(target) + +Cross-Backend Operations +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # Copy from SFTP to S3 + sftp_file = ObjectStoragePath("sftp://my_sftp_conn@remote-server/exports/data.parquet") + s3_file = ObjectStoragePath("s3://aws_conn@my-bucket/imports/data.parquet") + sftp_file.copy(s3_file) + + # Copy from GCS to SFTP + gcs_file = ObjectStoragePath("gs://gcp_conn@my-bucket/reports/report.pdf") + sftp_target = ObjectStoragePath("sftp://my_sftp_conn@remote-server/incoming/report.pdf") + gcs_file.copy(sftp_target) + +Using in Tasks +^^^^^^^^^^^^^^ + +.. code-block:: python + + from airflow.sdk import ObjectStoragePath + from airflow.decorators import task + + + @task + def process_remote_file(remote_path: ObjectStoragePath) -> dict: + """Process a file from an SFTP server.""" + with remote_path.open("r") as f: + content = f.read() + + return { + "size": remote_path.stat().st_size, + "lines": len(content.splitlines()), + } + + + @task + def upload_results(local_path: ObjectStoragePath, remote_path: ObjectStoragePath): + """Upload processed results to SFTP server.""" + local_path.copy(remote_path) + +Storage Options +--------------- + +You can pass additional options to the underlying ``SSHFileSystem`` via storage options: + +.. code-block:: python + + path = ObjectStoragePath( + "sftp://my_sftp_conn@remote-server/data/file.txt", + storage_options={ + "connect_timeout": 30, # Connection timeout in seconds + }, + ) + +Storage options are merged with connection settings, with storage options taking precedence. + +Security Considerations +----------------------- + +* **Host key verification**: By default, host key verification is enabled. Only disable it + (``no_host_key_check: true``) in development environments or when you understand the security + implications. + +* **Key-based authentication**: Prefer key-based authentication over password authentication + for better security. + +* **Private key storage**: When using ``private_key`` in connection extras, ensure your Airflow + metadata database is properly secured, as the key content is stored there. + +Requirements +------------ + +The SFTP filesystem requires: + +* ``sshfs`` Python package (``>=2023.1.0``) +* Valid SSH/SFTP server access +* Appropriate authentication credentials + +Cross-References +---------------- + +* :doc:`/connections/sftp` - SFTP connection configuration +* :doc:`/sensors/sftp_sensor` - SFTP file sensors +* :doc:`apache-airflow:core-concepts/objectstorage` - ObjectStoragePath documentation + +Reference +--------- + +For further information, see: + +* `sshfs Python package `__ +* `asyncssh documentation `__ +* `SFTP protocol specification `__ diff --git a/providers/sftp/docs/index.rst b/providers/sftp/docs/index.rst index 3b139178161ec..9c41b3f7cce0e 100644 --- a/providers/sftp/docs/index.rst +++ b/providers/sftp/docs/index.rst @@ -29,6 +29,13 @@ Changelog Security +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Filesystems + .. toctree:: :hidden: :maxdepth: 1 diff --git a/providers/sftp/provider.yaml b/providers/sftp/provider.yaml index 47ecfc6044378..ceaa67f702198 100644 --- a/providers/sftp/provider.yaml +++ b/providers/sftp/provider.yaml @@ -123,3 +123,6 @@ triggers: - integration-name: SSH File Transfer Protocol (SFTP) python-modules: - airflow.providers.sftp.triggers.sftp + +filesystems: + - airflow.providers.sftp.fs.sftp diff --git a/providers/sftp/pyproject.toml b/providers/sftp/pyproject.toml index 4d05ea7ed5c72..d562e123d6680 100644 --- a/providers/sftp/pyproject.toml +++ b/providers/sftp/pyproject.toml @@ -72,6 +72,9 @@ dependencies = [ "openlineage" = [ "apache-airflow-providers-openlineage" ] +"sshfs" = [ + "sshfs>=2023.1.0", +] [dependency-groups] dev = [ diff --git a/providers/sftp/src/airflow/providers/sftp/fs/__init__.py b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py new file mode 100644 index 0000000000000..69c6056fb45a6 --- /dev/null +++ b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.hooks.base import BaseHook + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +schemes = ["sftp", "ssh"] + + +def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem: + try: + from sshfs import SSHFileSystem + except ImportError: + raise ImportError( + "Airflow FS SFTP/SSH protocol requires the sshfs library. " + "Install with: pip install apache-airflow-providers-sftp[sshfs]" + ) + + if conn_id is None: + return SSHFileSystem(**(storage_options or {})) + + conn = BaseHook.get_connection(conn_id) + extras = conn.extra_dejson + + options: dict[str, Any] = { + "host": conn.host, + "port": conn.port or 22, + "username": conn.login, + } + + if conn.password: + options["password"] = conn.password + + if key_file := extras.get("key_file"): + options["client_keys"] = [key_file] + + if private_key := extras.get("private_key"): + options["client_keys"] = [private_key] + if passphrase := extras.get("private_key_passphrase"): + options["passphrase"] = passphrase + + if str(extras.get("no_host_key_check", "")).lower() == "true": + options["known_hosts"] = None + + options.update(storage_options or {}) + return SSHFileSystem(**options) diff --git a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py index 09b5746802637..87ce10a7af529 100644 --- a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py +++ b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py @@ -71,4 +71,5 @@ def get_provider_info(): "python-modules": ["airflow.providers.sftp.triggers.sftp"], } ], + "filesystems": ["airflow.providers.sftp.fs.sftp"], } diff --git a/providers/sftp/tests/unit/sftp/fs/__init__.py b/providers/sftp/tests/unit/sftp/fs/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/sftp/tests/unit/sftp/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/sftp/tests/unit/sftp/fs/test_sftp.py b/providers/sftp/tests/unit/sftp/fs/test_sftp.py new file mode 100644 index 0000000000000..7f8da2bb3e495 --- /dev/null +++ b/providers/sftp/tests/unit/sftp/fs/test_sftp.py @@ -0,0 +1,222 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +pytest.importorskip("sshfs") + +TEST_CONN_ID = "sftp_test_conn" + + +@pytest.fixture(scope="module", autouse=True) +def _setup_connections(): + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + f"AIRFLOW_CONN_{TEST_CONN_ID}".upper(), + "sftp://testuser:testpass@testhost:2222", + ) + yield + + +class TestSftpFilesystem: + def test_schemes(self): + from airflow.providers.sftp.fs.sftp import schemes + + assert "sftp" in schemes + assert "ssh" in schemes + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_connection(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + get_fs(conn_id=TEST_CONN_ID) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["host"] == "testhost" + assert call_kwargs["port"] == 2222 + assert call_kwargs["username"] == "testuser" + assert call_kwargs["password"] == "testpass" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_without_connection(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + # When conn_id is None, storage_options are passed directly to SSHFileSystem + storage_options = {"host": "manual-host", "username": "manual-user"} + get_fs(conn_id=None, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["host"] == "manual-host" + assert call_kwargs["username"] == "manual-user" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_storage_options_merge(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + storage_options = {"custom_option": "custom_value"} + get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["custom_option"] == "custom_value" + assert call_kwargs["host"] == "testhost" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_storage_options_override(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + storage_options = {"port": 3333} + get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["port"] == 3333 + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_key_file(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_KEY_FILE", + "sftp://testuser@testhost?key_file=%2Fpath%2Fto%2Fkey", + ) + + get_fs(conn_id="sftp_key_file") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["/path/to/key"] + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_private_key(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_PRIVATE_KEY", + "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT&private_key_passphrase=secret", + ) + + get_fs(conn_id="sftp_private_key") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"] + assert call_kwargs["passphrase"] == "secret" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_private_key_no_passphrase(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_PRIVATE_KEY_NO_PASS", + "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT", + ) + + get_fs(conn_id="sftp_private_key_no_pass") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"] + assert "passphrase" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_no_host_key_check(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_NO_HOST_CHECK", + "sftp://testuser@testhost?no_host_key_check=true", + ) + + get_fs(conn_id="sftp_no_host_check") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["known_hosts"] is None + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_default_port(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_DEFAULT_PORT", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_default_port") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["port"] == 22 + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_without_password(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_NO_PASSWORD", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_no_password") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "password" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_host_key_check_enabled_by_default(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_HOST_CHECK_DEFAULT", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_host_check_default") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "known_hosts" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_host_key_check_explicit_false(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_HOST_CHECK_FALSE", + "sftp://testuser@testhost?no_host_key_check=false", + ) + + get_fs(conn_id="sftp_host_check_false") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "known_hosts" not in call_kwargs From 3851a126decfcb4c00d9df67fad02c38a9373cf8 Mon Sep 17 00:00:00 2001 From: Anish Date: Sun, 18 Jan 2026 15:57:12 -0600 Subject: [PATCH 2/4] fix depreciation warning --- providers/sftp/src/airflow/providers/sftp/fs/sftp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py index 69c6056fb45a6..6a915db67d931 100644 --- a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py +++ b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Any -from airflow.hooks.base import BaseHook +from airflow.sdk.bases.hook import BaseHook if TYPE_CHECKING: from fsspec import AbstractFileSystem From 71e7d1e473656f38e5e2ea46a87d2d471e8a0986 Mon Sep 17 00:00:00 2001 From: Anish Date: Sun, 18 Jan 2026 16:07:52 -0600 Subject: [PATCH 3/4] documentation clean ups --- providers/sftp/docs/filesystems/sftp.rst | 233 +++-------------------- 1 file changed, 26 insertions(+), 207 deletions(-) diff --git a/providers/sftp/docs/filesystems/sftp.rst b/providers/sftp/docs/filesystems/sftp.rst index 0b9ce20003777..4617cc7b86721 100644 --- a/providers/sftp/docs/filesystems/sftp.rst +++ b/providers/sftp/docs/filesystems/sftp.rst @@ -15,230 +15,49 @@ specific language governing permissions and limitations under the License. -SFTP / SSH Filesystem -===================== +SFTP Filesystem +=============== -The SFTP filesystem provides access to remote servers via SSH File Transfer Protocol (SFTP) through -Airflow's ``ObjectStoragePath`` interface. This allows you to perform file operations on remote SSH -servers using the same API as other object storage backends like S3 or GCS. - -Supported URL formats: - -* ``sftp://connection_id@hostname/path/to/file`` -* ``ssh://connection_id@hostname/path/to/file`` - -Installation ------------- - -The SFTP filesystem requires the ``sshfs`` library. Install it with: +Use ``ObjectStoragePath`` with SFTP/SSH servers via the `sshfs `__ library. .. code-block:: bash pip install apache-airflow-providers-sftp[sshfs] -Connection Configuration ------------------------- - -The SFTP filesystem uses Airflow's SFTP connection type. Create a connection with the following parameters: - -* **Connection Type**: sftp -* **Host**: Remote server hostname or IP address -* **Port**: SSH port (default: 22) -* **Login**: SSH username -* **Password**: SSH password (if using password authentication) - -Additional configuration via connection extras: - -* **key_file**: Path to the private SSH key file for key-based authentication -* **private_key**: Content of the private key (PEM format) for key-based authentication -* **private_key_passphrase**: Passphrase for the private key (if encrypted) -* **no_host_key_check**: Set to ``true`` to disable host key verification (not recommended for production) - -For more details on connection configuration, see :doc:`/connections/sftp`. - -Connection extra field configuration examples: - -**Using password authentication:** - -.. code-block:: json - - {} - -**Using key file:** - -.. code-block:: json - - { - "key_file": "/path/to/private_key" - } - -**Using private key content:** - -.. code-block:: json - - { - "private_key": "", - "private_key_passphrase": "optional_passphrase" - } - -**Disabling host key verification (use with caution):** - -.. code-block:: json - - { - "no_host_key_check": "true" - } - -Usage Examples --------------- - -Basic File Operations -^^^^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - from airflow.sdk import ObjectStoragePath - - # Access a file on a remote SFTP server - path = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/data.csv") - - # Read file content - with path.open("r") as f: - content = f.read() - - # Write to a file - output_path = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/output.txt") - with output_path.open("w") as f: - f.write("Hello from Airflow!") +URL format: ``sftp://connection_id@hostname/path/to/file`` (also supports ``ssh://``) -Directory Operations -^^^^^^^^^^^^^^^^^^^^ +Configuration +------------- -.. code-block:: python - - # List directory contents - remote_dir = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/data/") - - for item in remote_dir.iterdir(): - print(f"Found: {item.name}") - if item.is_file(): - print(f" Size: {item.stat().st_size} bytes") - - # Create a directory - new_dir = ObjectStoragePath("sftp://my_sftp_conn@remote-server/home/user/new_folder/") - new_dir.mkdir(parents=True, exist_ok=True) - -Copying Files -^^^^^^^^^^^^^ - -.. code-block:: python - - # Copy from SFTP to local - remote_file = ObjectStoragePath("sftp://my_sftp_conn@remote-server/data/input.csv") - local_file = ObjectStoragePath("file:///tmp/input.csv") - remote_file.copy(local_file) +Uses the standard SFTP connection. The following extras are supported: - # Copy from local to SFTP - local_output = ObjectStoragePath("file:///tmp/output.csv") - remote_output = ObjectStoragePath("sftp://my_sftp_conn@remote-server/data/output.csv") - local_output.copy(remote_output) +* ``key_file`` - path to private key file +* ``private_key`` - private key content (PEM format) +* ``private_key_passphrase`` - passphrase for encrypted keys +* ``no_host_key_check`` - set to ``true`` to skip host key verification - # Copy between different SFTP servers - source = ObjectStoragePath("sftp://server1_conn@server1/data/file.txt") - target = ObjectStoragePath("sftp://server2_conn@server2/backup/file.txt") - source.copy(target) +See :doc:`/connections/sftp` for details. -Cross-Backend Operations -^^^^^^^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - # Copy from SFTP to S3 - sftp_file = ObjectStoragePath("sftp://my_sftp_conn@remote-server/exports/data.parquet") - s3_file = ObjectStoragePath("s3://aws_conn@my-bucket/imports/data.parquet") - sftp_file.copy(s3_file) - - # Copy from GCS to SFTP - gcs_file = ObjectStoragePath("gs://gcp_conn@my-bucket/reports/report.pdf") - sftp_target = ObjectStoragePath("sftp://my_sftp_conn@remote-server/incoming/report.pdf") - gcs_file.copy(sftp_target) - -Using in Tasks -^^^^^^^^^^^^^^ +Example +------- .. code-block:: python from airflow.sdk import ObjectStoragePath - from airflow.decorators import task - - - @task - def process_remote_file(remote_path: ObjectStoragePath) -> dict: - """Process a file from an SFTP server.""" - with remote_path.open("r") as f: - content = f.read() - - return { - "size": remote_path.stat().st_size, - "lines": len(content.splitlines()), - } - - - @task - def upload_results(local_path: ObjectStoragePath, remote_path: ObjectStoragePath): - """Upload processed results to SFTP server.""" - local_path.copy(remote_path) - -Storage Options ---------------- - -You can pass additional options to the underlying ``SSHFileSystem`` via storage options: - -.. code-block:: python - - path = ObjectStoragePath( - "sftp://my_sftp_conn@remote-server/data/file.txt", - storage_options={ - "connect_timeout": 30, # Connection timeout in seconds - }, - ) - -Storage options are merged with connection settings, with storage options taking precedence. - -Security Considerations ------------------------ - -* **Host key verification**: By default, host key verification is enabled. Only disable it - (``no_host_key_check: true``) in development environments or when you understand the security - implications. - -* **Key-based authentication**: Prefer key-based authentication over password authentication - for better security. - -* **Private key storage**: When using ``private_key`` in connection extras, ensure your Airflow - metadata database is properly secured, as the key content is stored there. - -Requirements ------------- - -The SFTP filesystem requires: - -* ``sshfs`` Python package (``>=2023.1.0``) -* Valid SSH/SFTP server access -* Appropriate authentication credentials -Cross-References ----------------- + path = ObjectStoragePath("sftp://my_conn@myserver/data/file.csv") -* :doc:`/connections/sftp` - SFTP connection configuration -* :doc:`/sensors/sftp_sensor` - SFTP file sensors -* :doc:`apache-airflow:core-concepts/objectstorage` - ObjectStoragePath documentation + # read + with path.open() as f: + data = f.read() -Reference ---------- + # write + with path.open("w") as f: + f.write("content") -For further information, see: + # list + for p in path.parent.iterdir(): + print(p.name) -* `sshfs Python package `__ -* `asyncssh documentation `__ -* `SFTP protocol specification `__ + # copy + path.copy(ObjectStoragePath("file:///tmp/local.csv")) From e9c5d9e690f1a55f8d32ce910a68fc31c4203f96 Mon Sep 17 00:00:00 2001 From: Anish Date: Sun, 18 Jan 2026 20:31:52 -0600 Subject: [PATCH 4/4] clean ups --- providers/sftp/docs/filesystems/sftp.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/sftp/docs/filesystems/sftp.rst b/providers/sftp/docs/filesystems/sftp.rst index 4617cc7b86721..5b1e402d6ceb3 100644 --- a/providers/sftp/docs/filesystems/sftp.rst +++ b/providers/sftp/docs/filesystems/sftp.rst @@ -24,7 +24,7 @@ Use ``ObjectStoragePath`` with SFTP/SSH servers via the `sshfs