Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def log_url(self) -> str:
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
map_index = f"/mapped/{self.map_index}" if self.map_index >= 0 else ""
try_number = f"?try_number={self.try_number}" if self.try_number > 0 else ""
_log_uri = f"{base_url}dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
_log_uri = f"{base_url.rstrip('/')}/dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"

return _log_uri

Expand Down
19 changes: 1 addition & 18 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def log_url(self) -> str:
try_number = (
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
)
_log_uri = f"{base_url}dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
_log_uri = f"{base_url.rstrip('/')}/dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
return _log_uri

@property
Expand Down Expand Up @@ -599,23 +599,6 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None:
)


def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str:
from urllib.parse import quote

from airflow.configuration import conf

run_id = quote(ti.run_id)
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
map_index_value = getattr(ti, "map_index", -1)
map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else ""
try_number_value = getattr(ti, "try_number", 0)
try_number = (
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
)
_log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}"
return _log_uri


def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using DagBag here is about 98% wrong, but it'll do for now
Expand Down
25 changes: 25 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,31 @@ class CustomOperator(BaseOperator):
assert state == expected_state
assert collected_results == expected_results

@pytest.mark.parametrize(
["base_url", "expected_url"],
[
("http://localhost:8080/", "http://localhost:8080/dags/test_dag/runs/test_run/tasks/test_task"),
("http://localhost:8080", "http://localhost:8080/dags/test_dag/runs/test_run/tasks/test_task"),
(
"https://airflow.example.com/",
"https://airflow.example.com/dags/test_dag/runs/test_run/tasks/test_task",
),
(
"https://airflow.example.com",
"https://airflow.example.com/dags/test_dag/runs/test_run/tasks/test_task",
),
],
ids=["localhost_with_slash", "localhost_no_slash", "domain_with_slash", "domain_no_slash"],
)
def test_runtime_task_instance_log_url_property(self, create_runtime_ti, base_url, expected_url):
"""Test that RuntimeTaskInstance.log_url property correctly handles various base_url formats."""
task = BaseOperator(task_id="test_task")
runtime_ti = create_runtime_ti(task=task, dag_id="test_dag", run_id="test_run", try_number=0)

with patch("airflow.configuration.conf.get", return_value=base_url):
log_url = runtime_ti.log_url
assert log_url == expected_url

def test_task_runner_on_failure_callback_context(self, create_runtime_ti):
"""Test that on_failure_callback context has end_date and duration."""
from airflow.exceptions import AirflowException
Expand Down