Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
49489ba
refactor: Refactored HttpAsyncHook to easily support session based ru…
dabla Jan 13, 2026
493e237
Merge branch 'main' into feature/add-session-async-http-hook
dabla Jan 13, 2026
d23a94d
fix: Fixed import of LoggingMixin
dabla Jan 13, 2026
da1bfd6
Merge branch 'main' into feature/add-session-async-http-hook
dabla Jan 17, 2026
27e30e9
refactor: LivyAsyncHook now reuses logic from HttpAsyncHook which is …
dabla Jan 30, 2026
d19c593
refactor: Reformatted HttpAsyncHook
dabla Jan 30, 2026
9203bd7
Merge branch 'main' into feature/add-session-async-http-hook
dabla Jan 30, 2026
d833458
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 1, 2026
a2cecc6
refactor: Fixed possible None types for merged_headers
dabla Feb 2, 2026
c233e8e
refactor: Changed type of _retryable_error_async method
dabla Feb 2, 2026
15754af
refactor: Removed unused import
dabla Feb 2, 2026
f9c503d
refactor: Moved SessionConfig inside AsyncHttpSession
dabla Feb 2, 2026
7e382b4
refactor: Reformatted run method of HttpAsyncHook
dabla Feb 2, 2026
7a109d8
refactor: Removed unused import from LivyHook module
dabla Feb 2, 2026
c8a10b9
Revert "refactor: Moved SessionConfig inside AsyncHttpSession"
dabla Feb 2, 2026
491e365
refactor: Added docstring for retry_limit and retry_delay parameters
dabla Feb 2, 2026
c77715e
refactor: Reformatted docstring in _retryable_error_async method
dabla Feb 2, 2026
53cb8b3
refactor: Added docstring for SessionConfig and AsyncHttpSession
dabla Feb 2, 2026
5eda002
refactor: Added warning logging when run attempt fails
dabla Feb 3, 2026
10620f4
refactor: Refactored run_method of LivyAsyncHook
dabla Feb 3, 2026
8a1f675
refactor: Refactored unit tests for LivyAsyncHook
dabla Feb 3, 2026
0beab47
refactor: Reformatted AsyncHttpSession
dabla Feb 3, 2026
d649850
refactor: Reformatted run_method of LivyAsyncHook
dabla Feb 3, 2026
bd93295
refactor: Escape aiohttp.ClientSession in docstring of session contex…
dabla Feb 3, 2026
232f0b2
refactor: Also take into extra_options from connection when building …
dabla Feb 3, 2026
d8918ef
refactor: Fixed mocking of test_run_method_success
dabla Feb 3, 2026
eb6ff25
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 3, 2026
068493f
refactor: Removed unused imports
dabla Feb 3, 2026
3ce90aa
refactor: Reorganized imports
dabla Feb 3, 2026
2280100
refactor: Run method of LivyAsyncHook must internally use session fro…
dabla Feb 3, 2026
db2a036
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 3, 2026
ed98651
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 3, 2026
ff2b4b1
refactor: Escape reserved words in HttpAsyncHook
dabla Feb 3, 2026
6e96278
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 3, 2026
ffdae83
refactor: Mock get_async_connection in TestLivyAsyncHook
dabla Feb 4, 2026
1719a39
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 4, 2026
4af843d
refactor: Mock get_async_connection in TestLivyAsyncHook should be pa…
dabla Feb 4, 2026
2fe9900
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 4, 2026
5d798ee
refactor: Mock get_async_connection in TestLivyAsyncHook should be pa…
Feb 5, 2026
235bf27
refactor: Make sure get_async_connection is mocked with real Connection
dabla Feb 6, 2026
e07ae93
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 6, 2026
9c41596
refactor: Reformatted Livy unit test
dabla Feb 6, 2026
72e1a7a
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 6, 2026
90e36dd
refactor: Add get_async_connection mock in test_run_put_method_with_t…
dabla Feb 6, 2026
db16958
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 7, 2026
24c5129
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 9, 2026
f6d5fb8
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 10, 2026
5715206
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 10, 2026
a7a87e2
refactor: Make sure http provider dependency is set to next release w…
dabla Feb 11, 2026
4fb1436
refactor: Added TODO on asgiref dependency as I can probably be remov…
dabla Feb 11, 2026
5a4bb97
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 11, 2026
77d28b7
refactor: Removed asgiref dependency in livy provider
dabla Feb 11, 2026
117d7ae
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 11, 2026
888e88d
refactor: Removed asgiref reference from docs
dabla Feb 11, 2026
adb9763
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 11, 2026
eb06f26
Merge branch 'main' into feature/add-session-async-http-hook
dabla Feb 23, 2026
fee2c21
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 3, 2026
ff4cb70
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 3, 2026
9ce5aae
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 3, 2026
91231be
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 3, 2026
a06e5bf
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 3, 2026
2a0d73c
refactor: Fixed assertion of Connection type in test_build_get_hook o…
dabla Mar 4, 2026
250bf74
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 4, 2026
9f3f6d6
refactor: Don't need to assert connections anymore in test_build_get_…
dabla Mar 4, 2026
811dc40
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 4, 2026
7c4cb3a
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 4, 2026
8022953
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 5, 2026
6f5ed82
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 5, 2026
81f5197
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 5, 2026
6260f73
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 5, 2026
483fd04
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 6, 2026
232f078
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 10, 2026
d420050
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 10, 2026
5cd28ac
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 10, 2026
607f791
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 10, 2026
70290a5
Merge branch 'main' into feature/add-session-async-http-hook
dabla Mar 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion providers/apache/livy/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ PIP package Version required
``apache-airflow-providers-http`` ``>=5.1.0``
``apache-airflow-providers-common-compat`` ``>=1.12.0``
``aiohttp`` ``>=3.9.2``
``asgiref`` ``>=2.3.0``
========================================== ==================

Cross provider package dependencies
Expand Down
3 changes: 1 addition & 2 deletions providers/apache/livy/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-http>=5.1.0",
"apache-airflow-providers-http>=5.1.0", # use next version
"apache-airflow-providers-common-compat>=1.12.0",
"aiohttp>=3.9.2",
"asgiref>=2.3.0",
]

[dependency-groups]
Expand Down
128 changes: 22 additions & 106 deletions providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,18 @@
# under the License.
from __future__ import annotations

import asyncio
import json
import re
from collections.abc import Sequence
from enum import Enum
from typing import TYPE_CHECKING, Any
from typing import Any

import aiohttp
import requests
from aiohttp import ClientResponseError

from airflow.providers.common.compat.connection import get_async_connection
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook

if TYPE_CHECKING:
from airflow.models import Connection


class BatchState(Enum):
"""Batch session states."""
Expand Down Expand Up @@ -502,101 +496,10 @@ def __init__(
self.extra_options = extra_options or {}
self.endpoint_prefix = sanitize_endpoint_prefix(endpoint_prefix)

async def _do_api_call_async(
self,
endpoint: str | None = None,
data: dict[str, Any] | str | None = None,
headers: dict[str, Any] | None = None,
extra_options: dict[str, Any] | None = None,
) -> Any:
"""
Perform an asynchronous HTTP request call.

:param endpoint: the endpoint to be called i.e. resource/v1/query?
:param data: payload to be uploaded or request parameters
:param headers: additional headers to be passed through as a dictionary
:param extra_options: Additional kwargs to pass when creating a request.
For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)``
"""
extra_options = extra_options or {}

# headers may be passed through directly or in the "extra" field in the connection
# definition
_headers = {}
auth = None

if self.http_conn_id:
conn = await get_async_connection(self.http_conn_id)

self.base_url = self._generate_base_url(conn) # type: ignore[arg-type]
if conn.login:
auth = self.auth_type(conn.login, conn.password)
if conn.extra:
try:
_headers.update(conn.extra_dejson)
except TypeError:
self.log.warning("Connection to %s has invalid extra field.", conn.host)
if headers:
_headers.update(headers)

if self.base_url and not self.base_url.endswith("/") and endpoint and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = (self.base_url or "") + (endpoint or "")

async with aiohttp.ClientSession() as session:
if self.method == "GET":
request_func = session.get
elif self.method == "POST":
request_func = session.post
elif self.method == "PATCH":
request_func = session.patch
else:
return {"Response": f"Unexpected HTTP Method: {self.method}", "status": "error"}

for attempt_num in range(1, 1 + self.retry_limit):
response = await request_func(
url,
json=data if self.method in ("POST", "PATCH") else None,
params=data if self.method == "GET" else None,
headers=_headers or None,
auth=auth,
**extra_options,
)
try:
response.raise_for_status()
return await response.json()
except ClientResponseError as e:
self.log.warning(
"[Try %d of %d] Request to %s failed.",
attempt_num,
self.retry_limit,
url,
)
if not self._retryable_error_async(e) or attempt_num == self.retry_limit:
self.log.exception("HTTP error, status code: %s", e.status)
# In this case, the user probably made a mistake.
# Don't retry.
return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"}

await asyncio.sleep(self.retry_delay)

def _generate_base_url(self, conn: Connection) -> str:
if conn.host and "://" in conn.host:
base_url: str = conn.host
else:
# schema defaults to HTTP
schema = conn.schema if conn.schema else "http"
host = conn.host if conn.host else ""
base_url = f"{schema}://{host}"
if conn.port:
base_url = f"{base_url}:{conn.port}"
return base_url

async def run_method(
self,
endpoint: str,
method: str = "GET",
method: str | None = None,
data: Any | None = None,
headers: dict[str, Any] | None = None,
) -> Any:
Expand All @@ -609,16 +512,29 @@ async def run_method(
:param headers: headers
:return: http response
"""
if method not in ("GET", "POST", "PUT", "DELETE", "HEAD"):
method = method or self.method
if method not in {"GET", "PATCH", "POST", "PUT", "DELETE", "HEAD"}:
return {"status": "error", "response": f"Invalid http method {method}"}

back_method = self.method
self.method = method
endpoint = (
f"{self.endpoint_prefix}/{endpoint}"
if self.endpoint_prefix and endpoint
else endpoint or self.endpoint_prefix
)

try:
result = await self._do_api_call_async(endpoint, data, headers, self.extra_options)
finally:
self.method = back_method
return {"status": "success", "response": result}
async with self.session() as session:
response = await session.run(
endpoint=endpoint,
data=data,
headers={**self._def_headers, **self.extra_headers, **(headers or {})},
extra_options=self.extra_options,
)

result = await response.json()
return {"status": "success", "response": result}
except ClientResponseError as e:
return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"}

async def get_batch_state(self, session_id: int | str) -> Any:
"""
Expand Down
Loading
Loading