diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py index b21cd08852f31..7bf0fb34ceb2d 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/assets/hdfs.py @@ -17,6 +17,7 @@ from __future__ import annotations +import urllib.parse from typing import TYPE_CHECKING from airflow.providers.common.compat.assets import Asset @@ -26,10 +27,12 @@ from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset +# Preserve the empty-authority "hdfs:///path" (fs.defaultFS) form through urlunsplit, like "file". +if "hdfs" not in urllib.parse.uses_netloc: + urllib.parse.uses_netloc.append("hdfs") + def sanitize_uri(uri: SplitResult) -> SplitResult: - if not uri.netloc: - raise ValueError("URI format hdfs:// must contain a namenode host") if not uri.path: raise ValueError("URI format hdfs:// must contain a path") return uri diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py index a55e60fa07c9a..daae2c5d4bf16 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py @@ -42,6 +42,23 @@ "hdfs://namenode/data/file.csv", id="no-explicit-port", ), + # hdfs:///path (fs.defaultFS) preserves the empty authority. + pytest.param( + "hdfs:///apps/myapp/data/bronze/raw/table.parquet", + "hdfs:///apps/myapp/data/bronze/raw/table.parquet", + id="default-fs-no-host", + ), + pytest.param( + "hdfs:///data/file.csv", + "hdfs:///data/file.csv", + id="default-fs-short-path", + ), + # Bare hdfs:/path canonicalizes to the empty-authority form. + pytest.param( + "hdfs:/data/file.csv", + "hdfs:///data/file.csv", + id="default-fs-single-slash", + ), ], ) def test_sanitize_uri_pass(original: str, normalized: str) -> None: @@ -54,12 +71,12 @@ def test_sanitize_uri_pass(original: str, normalized: str) -> None: "value", [ pytest.param("hdfs://", id="blank"), - pytest.param("hdfs:///path/to/file", id="no-host"), + pytest.param("hdfs://namenode:8020", id="no-path"), ], ) def test_sanitize_uri_fail(value: str) -> None: uri_i = urllib.parse.urlsplit(value) - with pytest.raises(ValueError, match="URI format hdfs:// must contain"): + with pytest.raises(ValueError, match="URI format hdfs:// must contain a path"): sanitize_uri(uri_i) @@ -90,3 +107,17 @@ def test_convert_asset_to_openlineage(expected_name, uri) -> None: ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None) assert ol_dataset.namespace == "hdfs://namenode:8020" assert ol_dataset.name == expected_name + + +@pytest.mark.parametrize( + ("expected_name", "uri"), + [ + pytest.param("apps/myapp/data.parquet", "hdfs:///apps/myapp/data.parquet", id="default-fs"), + pytest.param("data/file.csv", "hdfs:///data/file.csv", id="default-fs-short"), + ], +) +def test_convert_asset_to_openlineage_default_fs(expected_name, uri) -> None: + asset = Asset(uri=uri) + ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None) + assert ol_dataset.namespace == "hdfs://" + assert ol_dataset.name == expected_name