From 5f6d16cfda208b257e97b85d6dc7d81cfa4a50fb Mon Sep 17 00:00:00 2001 From: Mathieu Monet Date: Thu, 4 Jun 2026 16:07:04 +0200 Subject: [PATCH 1/2] HDFS Asset URI: allow empty netloc for Hadoop fs.defaultFS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hdfs asset URI sanitizer rejected hdfs:///path as missing a namenode host. Per RFC 3986 the authority component is optional; per Hadoop semantics an empty authority means 'resolve via fs.defaultFS from core-site.xml' — i.e. hdfs:///apps/x is the canonical form for jobs that must not hard-code a namenode. Relax sanitize_uri to require only a non-empty path, and add positive + negative parametrized tests covering the default-fs form and the corresponding OpenLineage conversion. --- .../providers/apache/hdfs/assets/hdfs.py | 2 -- .../unit/apache/hdfs/assets/test_hdfs.py | 29 +++++++++++++++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py index b21cd08852f31..d01adda3cdbbc 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py @@ -28,8 +28,6 @@ def sanitize_uri(uri: SplitResult) -> SplitResult: - if not uri.netloc: - raise ValueError("URI format hdfs:// must contain a namenode host") if not uri.path: raise ValueError("URI format hdfs:// must contain a path") return uri diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py index a55e60fa07c9a..98fdbe3e3139a 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py @@ -42,6 +42,17 @@ "hdfs://namenode/data/file.csv", id="no-explicit-port", ), + # ``hdfs:///path`` (Hadoop ``fs.defaultFS``) accepted; ``urlunsplit`` collapses the empty authority to ``hdfs:/path``. + pytest.param( + "hdfs:///apps/myapp/data/bronze/raw/table.parquet", + "hdfs:/apps/myapp/data/bronze/raw/table.parquet", + id="default-fs-no-host", + ), + pytest.param( + "hdfs:///data/file.csv", + "hdfs:/data/file.csv", + id="default-fs-short-path", + ), ], ) def test_sanitize_uri_pass(original: str, normalized: str) -> None: @@ -54,12 +65,12 @@ def test_sanitize_uri_pass(original: str, normalized: str) -> None: "value", [ pytest.param("hdfs://", id="blank"), - pytest.param("hdfs:///path/to/file", id="no-host"), + pytest.param("hdfs://namenode:8020", id="no-path"), ], ) def test_sanitize_uri_fail(value: str) -> None: uri_i = urllib.parse.urlsplit(value) - with pytest.raises(ValueError, match="URI format hdfs:// must contain"): + with pytest.raises(ValueError, match="URI format hdfs:// must contain a path"): sanitize_uri(uri_i) @@ -90,3 +101,17 @@ def test_convert_asset_to_openlineage(expected_name, uri) -> None: ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None) assert ol_dataset.namespace == "hdfs://namenode:8020" assert ol_dataset.name == expected_name + + +@pytest.mark.parametrize( + ("expected_name", "uri"), + [ + pytest.param("apps/myapp/data.parquet", "hdfs:///apps/myapp/data.parquet", id="default-fs"), + pytest.param("data/file.csv", "hdfs:///data/file.csv", id="default-fs-short"), + ], +) +def test_convert_asset_to_openlineage_default_fs(expected_name, uri) -> None: + asset = Asset(uri=uri) + ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None) + assert ol_dataset.namespace == "hdfs://" + assert ol_dataset.name == expected_name From 495f0f33dab5e3abbffd3f73055fb604dea02077 Mon Sep 17 00:00:00 2001 From: Mathieu Monet Date: Sat, 13 Jun 2026 13:10:27 +0200 Subject: [PATCH 2/2] Allow HDFS assets to use hdfs:/// (fs.defaultFS) and preserve the form --- .../src/airflow/providers/apache/hdfs/assets/hdfs.py | 5 +++++ .../hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py | 12 +++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py index d01adda3cdbbc..7bf0fb34ceb2d 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py @@ -17,6 +17,7 @@ from __future__ import annotations +import urllib.parse from typing import TYPE_CHECKING from airflow.providers.common.compat.assets import Asset @@ -26,6 +27,10 @@ from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset +# Preserve the empty-authority "hdfs:///path" (fs.defaultFS) form through urlunsplit, like "file". +if "hdfs" not in urllib.parse.uses_netloc: + urllib.parse.uses_netloc.append("hdfs") + def sanitize_uri(uri: SplitResult) -> SplitResult: if not uri.path: diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py index 98fdbe3e3139a..daae2c5d4bf16 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py @@ -42,17 +42,23 @@ "hdfs://namenode/data/file.csv", id="no-explicit-port", ), - # ``hdfs:///path`` (Hadoop ``fs.defaultFS``) accepted; ``urlunsplit`` collapses the empty authority to ``hdfs:/path``. + # hdfs:///path (fs.defaultFS) preserves the empty authority. pytest.param( "hdfs:///apps/myapp/data/bronze/raw/table.parquet", - "hdfs:/apps/myapp/data/bronze/raw/table.parquet", + "hdfs:///apps/myapp/data/bronze/raw/table.parquet", id="default-fs-no-host", ), pytest.param( "hdfs:///data/file.csv", - "hdfs:/data/file.csv", + "hdfs:///data/file.csv", id="default-fs-short-path", ), + # Bare hdfs:/path canonicalizes to the empty-authority form. + pytest.param( + "hdfs:/data/file.csv", + "hdfs:///data/file.csv", + id="default-fs-single-slash", + ), ], ) def test_sanitize_uri_pass(original: str, normalized: str) -> None: