diff --git a/providers/sftp/docs/filesystems/index.rst b/providers/sftp/docs/filesystems/index.rst new file mode 100644 index 0000000000000..eb0036177a3fa --- /dev/null +++ b/providers/sftp/docs/filesystems/index.rst @@ -0,0 +1,26 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Filesystems +=========== + +.. toctree:: + :maxdepth: 1 + :caption: Filesystem Providers + :glob: + + * diff --git a/providers/sftp/docs/filesystems/sftp.rst b/providers/sftp/docs/filesystems/sftp.rst new file mode 100644 index 0000000000000..5b1e402d6ceb3 --- /dev/null +++ b/providers/sftp/docs/filesystems/sftp.rst @@ -0,0 +1,63 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SFTP Filesystem +=============== + +Use ``ObjectStoragePath`` with SFTP/SSH servers via the `sshfs `__ library. + +.. code-block:: bash + + pip install apache-airflow-providers-sftp[sshfs] + +URL format: ``sftp://connection_id@hostname/path/to/file`` (also supports ``ssh://``). + +Configuration +------------- + +Uses the standard SFTP connection. The following extras are supported: + +* ``key_file`` - path to private key file +* ``private_key`` - private key content (PEM format) +* ``private_key_passphrase`` - passphrase for encrypted keys +* ``no_host_key_check`` - set to ``true`` to skip host key verification + +See :doc:`/connections/sftp` for details. + +Example +------- + +.. code-block:: python + + from airflow.sdk import ObjectStoragePath + + path = ObjectStoragePath("sftp://my_conn@myserver/data/file.csv") + + # read + with path.open() as f: + data = f.read() + + # write + with path.open("w") as f: + f.write("content") + + # list + for p in path.parent.iterdir(): + print(p.name) + + # copy + path.copy(ObjectStoragePath("file:///tmp/local.csv")) diff --git a/providers/sftp/docs/index.rst b/providers/sftp/docs/index.rst index 3b139178161ec..9c41b3f7cce0e 100644 --- a/providers/sftp/docs/index.rst +++ b/providers/sftp/docs/index.rst @@ -29,6 +29,13 @@ Changelog Security +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Filesystems + .. toctree:: :hidden: :maxdepth: 1 diff --git a/providers/sftp/provider.yaml b/providers/sftp/provider.yaml index 47ecfc6044378..ceaa67f702198 100644 --- a/providers/sftp/provider.yaml +++ b/providers/sftp/provider.yaml @@ -123,3 +123,6 @@ triggers: - integration-name: SSH File Transfer Protocol (SFTP) python-modules: - airflow.providers.sftp.triggers.sftp + +filesystems: + - airflow.providers.sftp.fs.sftp diff --git a/providers/sftp/pyproject.toml b/providers/sftp/pyproject.toml index 4d05ea7ed5c72..d562e123d6680 100644 --- a/providers/sftp/pyproject.toml +++ b/providers/sftp/pyproject.toml @@ -72,6 +72,9 @@ dependencies = [ "openlineage" = [ "apache-airflow-providers-openlineage" ] +"sshfs" = [ + "sshfs>=2023.1.0", +] [dependency-groups] dev = [ diff --git a/providers/sftp/src/airflow/providers/sftp/fs/__init__.py b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/sftp/src/airflow/providers/sftp/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/sftp/src/airflow/providers/sftp/fs/sftp.py b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py new file mode 100644 index 0000000000000..6a915db67d931 --- /dev/null +++ b/providers/sftp/src/airflow/providers/sftp/fs/sftp.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.sdk.bases.hook import BaseHook + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +schemes = ["sftp", "ssh"] + + +def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem: + try: + from sshfs import SSHFileSystem + except ImportError: + raise ImportError( + "Airflow FS SFTP/SSH protocol requires the sshfs library. " + "Install with: pip install apache-airflow-providers-sftp[sshfs]" + ) + + if conn_id is None: + return SSHFileSystem(**(storage_options or {})) + + conn = BaseHook.get_connection(conn_id) + extras = conn.extra_dejson + + options: dict[str, Any] = { + "host": conn.host, + "port": conn.port or 22, + "username": conn.login, + } + + if conn.password: + options["password"] = conn.password + + if key_file := extras.get("key_file"): + options["client_keys"] = [key_file] + + if private_key := extras.get("private_key"): + options["client_keys"] = [private_key] + if passphrase := extras.get("private_key_passphrase"): + options["passphrase"] = passphrase + + if str(extras.get("no_host_key_check", "")).lower() == "true": + options["known_hosts"] = None + + options.update(storage_options or {}) + return SSHFileSystem(**options) diff --git a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py index 09b5746802637..87ce10a7af529 100644 --- a/providers/sftp/src/airflow/providers/sftp/get_provider_info.py +++ b/providers/sftp/src/airflow/providers/sftp/get_provider_info.py @@ -71,4 +71,5 @@ def get_provider_info(): "python-modules": ["airflow.providers.sftp.triggers.sftp"], } ], + "filesystems": ["airflow.providers.sftp.fs.sftp"], } diff --git a/providers/sftp/tests/unit/sftp/fs/__init__.py b/providers/sftp/tests/unit/sftp/fs/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/sftp/tests/unit/sftp/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/sftp/tests/unit/sftp/fs/test_sftp.py b/providers/sftp/tests/unit/sftp/fs/test_sftp.py new file mode 100644 index 0000000000000..7f8da2bb3e495 --- /dev/null +++ b/providers/sftp/tests/unit/sftp/fs/test_sftp.py @@ -0,0 +1,222 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +pytest.importorskip("sshfs") + +TEST_CONN_ID = "sftp_test_conn" + + +@pytest.fixture(scope="module", autouse=True) +def _setup_connections(): + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + f"AIRFLOW_CONN_{TEST_CONN_ID}".upper(), + "sftp://testuser:testpass@testhost:2222", + ) + yield + + +class TestSftpFilesystem: + def test_schemes(self): + from airflow.providers.sftp.fs.sftp import schemes + + assert "sftp" in schemes + assert "ssh" in schemes + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_connection(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + get_fs(conn_id=TEST_CONN_ID) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["host"] == "testhost" + assert call_kwargs["port"] == 2222 + assert call_kwargs["username"] == "testuser" + assert call_kwargs["password"] == "testpass" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_without_connection(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + # When conn_id is None, storage_options are passed directly to SSHFileSystem + storage_options = {"host": "manual-host", "username": "manual-user"} + get_fs(conn_id=None, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["host"] == "manual-host" + assert call_kwargs["username"] == "manual-user" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_storage_options_merge(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + storage_options = {"custom_option": "custom_value"} + get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["custom_option"] == "custom_value" + assert call_kwargs["host"] == "testhost" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_storage_options_override(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + storage_options = {"port": 3333} + get_fs(conn_id=TEST_CONN_ID, storage_options=storage_options) + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["port"] == 3333 + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_key_file(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_KEY_FILE", + "sftp://testuser@testhost?key_file=%2Fpath%2Fto%2Fkey", + ) + + get_fs(conn_id="sftp_key_file") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["/path/to/key"] + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_private_key(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_PRIVATE_KEY", + "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT&private_key_passphrase=secret", + ) + + get_fs(conn_id="sftp_private_key") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"] + assert call_kwargs["passphrase"] == "secret" + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_private_key_no_passphrase(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_PRIVATE_KEY_NO_PASS", + "sftp://testuser@testhost?private_key=PRIVATE_KEY_CONTENT", + ) + + get_fs(conn_id="sftp_private_key_no_pass") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["client_keys"] == ["PRIVATE_KEY_CONTENT"] + assert "passphrase" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_with_no_host_key_check(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_NO_HOST_CHECK", + "sftp://testuser@testhost?no_host_key_check=true", + ) + + get_fs(conn_id="sftp_no_host_check") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["known_hosts"] is None + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_default_port(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_DEFAULT_PORT", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_default_port") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert call_kwargs["port"] == 22 + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_without_password(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_NO_PASSWORD", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_no_password") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "password" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_host_key_check_enabled_by_default(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_HOST_CHECK_DEFAULT", + "sftp://testuser@testhost", + ) + + get_fs(conn_id="sftp_host_check_default") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "known_hosts" not in call_kwargs + + @patch("sshfs.SSHFileSystem", autospec=True) + def test_get_fs_host_key_check_explicit_false(self, mock_sshfs): + from airflow.providers.sftp.fs.sftp import get_fs + + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv( + "AIRFLOW_CONN_SFTP_HOST_CHECK_FALSE", + "sftp://testuser@testhost?no_host_key_check=false", + ) + + get_fs(conn_id="sftp_host_check_false") + + mock_sshfs.assert_called_once() + call_kwargs = mock_sshfs.call_args.kwargs + assert "known_hosts" not in call_kwargs