Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import urllib.parse
from typing import TYPE_CHECKING

from airflow.providers.common.compat.assets import Asset
Expand All @@ -26,10 +27,12 @@

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

# Preserve the empty-authority "hdfs:///path" (fs.defaultFS) form through urlunsplit, like "file".
if "hdfs" not in urllib.parse.uses_netloc:
urllib.parse.uses_netloc.append("hdfs")
Comment on lines +30 to +32

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s a good idea to do this; uses_netloc is undocumented and should be considered private.

When is this needed?

@stegololz stegololz Jun 13, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can drop the line or implement a similar behaviour somewhere else, it is not mandatory.

The SDK normalizes asset URIs through urlunsplit here:

if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None:
parsed = normalizer(parsed)
return urllib.parse.urlunsplit(parsed)

That makes hdfs:///apps/x normalize to hdfs:/apps/x. I'd like to keep the canonical hdfs:/// form, which is why the if is there.

Caveat: as long as I stay on the provider, I won't be able to make a distinction between hdfs:/path and hdfs:///path (both end up as hdfs:///path), because the normalizer only sees the parsed result where both already have an empty netloc. To keep them distinct I'd have to also modify the Task SDK.

What would be the preference here?



def sanitize_uri(uri: SplitResult) -> SplitResult:
if not uri.netloc:
raise ValueError("URI format hdfs:// must contain a namenode host")
if not uri.path:
raise ValueError("URI format hdfs:// must contain a path")
return uri
Expand Down
35 changes: 33 additions & 2 deletions providers/apache/hdfs/tests/unit/apache/hdfs/assets/test_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@
"hdfs://namenode/data/file.csv",
id="no-explicit-port",
),
# hdfs:///path (fs.defaultFS) preserves the empty authority.
pytest.param(
"hdfs:///apps/myapp/data/bronze/raw/table.parquet",
"hdfs:///apps/myapp/data/bronze/raw/table.parquet",
id="default-fs-no-host",
),
pytest.param(
"hdfs:///data/file.csv",
"hdfs:///data/file.csv",
id="default-fs-short-path",
),
# Bare hdfs:/path canonicalizes to the empty-authority form.
pytest.param(
"hdfs:/data/file.csv",
"hdfs:///data/file.csv",
id="default-fs-single-slash",
),
],
)
def test_sanitize_uri_pass(original: str, normalized: str) -> None:
Expand All @@ -54,12 +71,12 @@ def test_sanitize_uri_pass(original: str, normalized: str) -> None:
"value",
[
pytest.param("hdfs://", id="blank"),
pytest.param("hdfs:///path/to/file", id="no-host"),
pytest.param("hdfs://namenode:8020", id="no-path"),
],
)
def test_sanitize_uri_fail(value: str) -> None:
uri_i = urllib.parse.urlsplit(value)
with pytest.raises(ValueError, match="URI format hdfs:// must contain"):
with pytest.raises(ValueError, match="URI format hdfs:// must contain a path"):
sanitize_uri(uri_i)


Expand Down Expand Up @@ -90,3 +107,17 @@ def test_convert_asset_to_openlineage(expected_name, uri) -> None:
ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None)
assert ol_dataset.namespace == "hdfs://namenode:8020"
assert ol_dataset.name == expected_name


@pytest.mark.parametrize(
("expected_name", "uri"),
[
pytest.param("apps/myapp/data.parquet", "hdfs:///apps/myapp/data.parquet", id="default-fs"),
pytest.param("data/file.csv", "hdfs:///data/file.csv", id="default-fs-short"),
],
)
def test_convert_asset_to_openlineage_default_fs(expected_name, uri) -> None:
asset = Asset(uri=uri)
ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None)
assert ol_dataset.namespace == "hdfs://"
assert ol_dataset.name == expected_name
Loading