From 49489ba6c45235babe0bbba083365cac517e8e14 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 13 Jan 2026 15:39:40 +0100 Subject: [PATCH 01/38] refactor: Refactored HttpAsyncHook to easily support session based run operations --- .../src/airflow/providers/http/hooks/http.py | 298 ++++++++++++------ 1 file changed, 195 insertions(+), 103 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 815b7ffadc910..ec512273e5ba4 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -18,20 +18,22 @@ from __future__ import annotations import copy -from collections.abc import Callable -from typing import TYPE_CHECKING, Any, cast +from collections.abc import Awaitable, Callable +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING, Any, cast, AsyncGenerator from urllib.parse import urlparse import aiohttp import tenacity from aiohttp import ClientResponseError -from asgiref.sync import sync_to_async +from pydantic import BaseModel from requests import PreparedRequest, Request, Response, Session from requests.auth import HTTPBasicAuth from requests.exceptions import ConnectionError, HTTPError from requests.models import DEFAULT_REDIRECT_LIMIT from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter +from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin from airflow.providers.common.compat.sdk import AirflowException, BaseHook from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException @@ -399,43 +401,54 @@ def test_connection(self): return False, str(e) -class HttpAsyncHook(BaseHook): - """ - Interact with HTTP servers asynchronously. +class SessionConfig(BaseModel): + base_url: str + headers: dict[str, Any] | None = None + auth: aiohttp.BasicAuth | None = None - :param method: the API method to be called - :param http_conn_id: http connection id that has the base - API url i.e https://www.google.com/ and optional authentication credentials. Default - headers can also be specified in the Extra field in json format. - :param auth_type: The auth type for the service - """ - - conn_name_attr = "http_conn_id" - default_conn_name = "http_default" - conn_type = "http" - hook_name = "HTTP" +class AsyncHttpSession(LoggingMixin): def __init__( self, - method: str = "POST", - http_conn_id: str = default_conn_name, - auth_type: Any = aiohttp.BasicAuth, - retry_limit: int = 3, - retry_delay: float = 1.0, + hook: HttpAsyncHook, + request: Callable[..., Awaitable[ClientResponse]], + config: SessionConfig, ) -> None: - self.http_conn_id = http_conn_id - self.method = method.upper() - self.base_url: str = "" - self._retry_obj: Callable[..., Any] - self.auth_type: Any = auth_type - if retry_limit < 1: - raise ValueError("Retry limit must be greater than equal to 1") - self.retry_limit = retry_limit - self.retry_delay = retry_delay + super().__init__() + self._hook = hook + self._request = request + self.config = config + + @property + def http_conn_id(self) -> str: + return self._hook.http_conn_id + + @property + def base_url(self) -> str: + return self.config.base_url + + @property + def method(self) -> str: + return self._hook.method + + @property + def retry_limit(self) -> int: + return self._hook.retry_limit + + @property + def retry_delay(self) -> float: + return self._hook.retry_delay + + @property + def headers(self) -> dict[str, Any]: + return self.config.headers + + @property + def auth(self) -> aiohttp.BasicAuth | None: + return self.config.auth async def run( self, - session: aiohttp.ClientSession, endpoint: str | None = None, data: dict[str, Any] | str | None = None, json: dict[str, Any] | str | None = None, @@ -453,89 +466,37 @@ async def run( For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)``. """ - extra_options = extra_options or {} - - # headers may be passed through directly or in the "extra" field in the connection - # definition - _headers = {} - auth = None - - if self.http_conn_id: - conn = await sync_to_async(self.get_connection)(self.http_conn_id) - - if conn.host and "://" in conn.host: - self.base_url = conn.host - else: - # schema defaults to HTTP - schema = conn.schema if conn.schema else "http" - host = conn.host if conn.host else "" - self.base_url = schema + "://" + host - - if conn.port: - self.base_url += f":{conn.port}" - if conn.login: - auth = self.auth_type(conn.login, conn.password) - if conn.extra: - conn_extra_options, extra_options = _process_extra_options_from_connection( - conn=conn, extra_options=extra_options - ) - - try: - _headers.update(conn_extra_options) - except TypeError: - self.log.warning("Connection to %s has invalid extra field.", conn.host) - if headers: - _headers.update(headers) + from tenacity import AsyncRetrying, stop_after_attempt, wait_fixed + extra_options = extra_options or {} url = _url_from_endpoint(self.base_url, endpoint) + merged_headers = {**self.headers, **(headers or {})} - if self.method == "GET": - request_func = session.get - elif self.method == "POST": - request_func = session.post - elif self.method == "PATCH": - request_func = session.patch - elif self.method == "HEAD": - request_func = session.head - elif self.method == "PUT": - request_func = session.put - elif self.method == "DELETE": - request_func = session.delete - elif self.method == "OPTIONS": - request_func = session.options - else: - raise HttpMethodException(f"Unexpected HTTP Method: {self.method}") - - for attempt in range(1, 1 + self.retry_limit): - response = await request_func( + async def request_func() -> ClientResponse: + response = await self._request( url, params=data if self.method == "GET" else None, data=data if self.method in ("POST", "PUT", "PATCH") else None, json=json, - headers=_headers, - auth=auth, + headers=merged_headers, + auth=self.auth, **extra_options, ) - try: - response.raise_for_status() - except ClientResponseError as e: - self.log.warning( - "[Try %d of %d] Request to %s failed.", - attempt, - self.retry_limit, - url, - ) - if not self._retryable_error_async(e) or attempt == self.retry_limit: - self.log.exception("HTTP error with status: %s", e.status) - # In this case, the user probably made a mistake. - # Don't retry. - raise HttpErrorException(f"{e.status}:{e.message}") - else: - return response + response.raise_for_status() + return response + + async for attempt in AsyncRetrying( + stop=stop_after_attempt(self.retry_limit), + wait=wait_fixed(self.retry_delay), + reraise=True, + ): + with attempt: + return await request_func() raise NotImplementedError # should not reach this, but makes mypy happy - def _retryable_error_async(self, exception: ClientResponseError) -> bool: + @classmethod + def _retryable_error_async(cls, exception: ClientResponseError) -> bool: """ Determine whether an exception may successful on a subsequent attempt. @@ -553,3 +514,134 @@ def _retryable_error_async(self, exception: ClientResponseError) -> bool: # don't retry for payload Too Large return False return exception.status >= 500 + + +class HttpAsyncHook(BaseHook): + """ + Interact with HTTP servers asynchronously. + + :param method: the API method to be called + :param http_conn_id: http connection id that has the base + API url i.e https://www.google.com/ and optional authentication credentials. Default + headers can also be specified in the Extra field in json format. + :param auth_type: The auth type for the service + """ + + conn_name_attr = "http_conn_id" + default_conn_name = "http_default" + conn_type = "http" + hook_name = "HTTP" + + def __init__( + self, + method: str = "POST", + http_conn_id: str = default_conn_name, + auth_type: Any = aiohttp.BasicAuth, + retry_limit: int = 3, + retry_delay: float = 1.0, + ) -> None: + self.http_conn_id = http_conn_id + self.method = method.upper() + self.base_url: str = "" + self._retry_obj: Callable[..., Any] + self.auth_type: Any = auth_type + if retry_limit < 1: + raise ValueError("Retry limit must be greater or equal to 1") + self.retry_limit = retry_limit + self.retry_delay = retry_delay + self._config: SessionConfig | None = None + + def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any]: + method = self.method + if method == "GET": + return session.get + if method == "POST": + return session.post + if method == "PATCH": + return session.patch + if method == "HEAD": + return session.head + if method == "PUT": + return session.put + if method == "DELETE": + return session.delete + if method == "OPTIONS": + return session.options + raise HttpMethodException(f"Unexpected HTTP Method: {method}") + + async def config(self) -> SessionConfig: + if not self._config: + from airflow.providers.common.compat.connection import get_async_connection + + base_url: str = self.base_url + auth: aiohttp.BasicAuth | None = None + headers: dict[str, Any] = {} + + if self.http_conn_id: + conn = await get_async_connection(conn_id=self.http_conn_id) + + if conn.host and "://" in conn.host: + base_url = conn.host + else: + schema = conn.schema or "http" + base_url = f"{schema}://{conn.host or ''}" + + if conn.port: + base_url += f":{conn.port}" + + if conn.login: + auth = self.auth_type(conn.login, conn.password) + + if conn.extra: + conn_extra_options, _ = _process_extra_options_from_connection( + conn=conn, extra_options={} + ) + headers.update(conn_extra_options) + + self._config = SessionConfig( + base_url=base_url, + headers=headers, + auth=auth, + ) + return self._config + + @asynccontextmanager + async def session(self) -> AsyncGenerator[AsyncHttpSession, None]: + """ + Create an AsyncHttpSession bound to a single aiohttp.ClientSession. + Airflow connection resolution happens exactly once here. + """ + async with aiohttp.ClientSession() as session: + request = self._get_request_func(session=session) + config = await self.config() + yield AsyncHttpSession(hook=self, request=request, config=config) + + async def run( + self, + session: aiohttp.ClientSession | None = None, + endpoint: str | None = None, + data: dict[str, Any] | str | None = None, + json: dict[str, Any] | str | None = None, + headers: dict[str, Any] | None = None, + extra_options: dict[str, Any] | None = None, + ) -> ClientResponse: + """ + Perform an asynchronous HTTP request call. + + :param session: aiohttp.ClientSession + :param endpoint: Endpoint to be called, i.e. ``resource/v1/query?``. + :param data: Payload to be uploaded or request parameters. + :param json: Payload to be uploaded as JSON. + :param headers: Additional headers to be passed through as a dict. + :param extra_options: Additional kwargs to pass when creating a request. + For example, ``run(json=obj)`` is passed as + ``aiohttp.ClientSession().get(json=obj)``. + """ + + if session is not None: + request = self._get_request_func(session=session) + config = await self.config() + return await AsyncHttpSession(hook=self, request=request, config=config).run(endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options) + + async with self.session() as http: + return await http.run(endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options) From d23a94df69e9fef44d8438b9a18ae714c18ee5c6 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 13 Jan 2026 19:10:38 +0100 Subject: [PATCH 02/38] fix: Fixed import of LoggingMixin --- providers/http/src/airflow/providers/http/hooks/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index ec512273e5ba4..b2766d7560ab3 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -33,9 +33,9 @@ from requests.models import DEFAULT_REDIRECT_LIMIT from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter -from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin from airflow.providers.common.compat.sdk import AirflowException, BaseHook from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException +from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: from aiohttp.client_reqrep import ClientResponse From 27e30e9bcd74dea791955f7233b94d1bb289cc5c Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 30 Jan 2026 20:13:10 +0100 Subject: [PATCH 03/38] refactor: LivyAsyncHook now reuses logic from HttpAsyncHook which is more DRY --- .../providers/apache/livy/hooks/livy.py | 108 +++--------------- .../src/airflow/providers/http/hooks/http.py | 44 +++---- 2 files changed, 38 insertions(+), 114 deletions(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index 5844995cb0529..1ff2e6b9bafcc 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -29,7 +29,7 @@ from asgiref.sync import sync_to_async from airflow.providers.common.compat.sdk import AirflowException -from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook +from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook, _retryable_error_async if TYPE_CHECKING: from airflow.models import Connection @@ -502,97 +502,6 @@ def __init__( self.extra_options = extra_options or {} self.endpoint_prefix = sanitize_endpoint_prefix(endpoint_prefix) - async def _do_api_call_async( - self, - endpoint: str | None = None, - data: dict[str, Any] | str | None = None, - headers: dict[str, Any] | None = None, - extra_options: dict[str, Any] | None = None, - ) -> Any: - """ - Perform an asynchronous HTTP request call. - - :param endpoint: the endpoint to be called i.e. resource/v1/query? - :param data: payload to be uploaded or request parameters - :param headers: additional headers to be passed through as a dictionary - :param extra_options: Additional kwargs to pass when creating a request. - For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)`` - """ - extra_options = extra_options or {} - - # headers may be passed through directly or in the "extra" field in the connection - # definition - _headers = {} - auth = None - - if self.http_conn_id: - conn = await sync_to_async(self.get_connection)(self.http_conn_id) - - self.base_url = self._generate_base_url(conn) # type: ignore[arg-type] - if conn.login: - auth = self.auth_type(conn.login, conn.password) - if conn.extra: - try: - _headers.update(conn.extra_dejson) - except TypeError: - self.log.warning("Connection to %s has invalid extra field.", conn.host) - if headers: - _headers.update(headers) - - if self.base_url and not self.base_url.endswith("/") and endpoint and not endpoint.startswith("/"): - url = self.base_url + "/" + endpoint - else: - url = (self.base_url or "") + (endpoint or "") - - async with aiohttp.ClientSession() as session: - if self.method == "GET": - request_func = session.get - elif self.method == "POST": - request_func = session.post - elif self.method == "PATCH": - request_func = session.patch - else: - return {"Response": f"Unexpected HTTP Method: {self.method}", "status": "error"} - - for attempt_num in range(1, 1 + self.retry_limit): - response = await request_func( - url, - json=data if self.method in ("POST", "PATCH") else None, - params=data if self.method == "GET" else None, - headers=_headers or None, - auth=auth, - **extra_options, - ) - try: - response.raise_for_status() - return await response.json() - except ClientResponseError as e: - self.log.warning( - "[Try %d of %d] Request to %s failed.", - attempt_num, - self.retry_limit, - url, - ) - if not self._retryable_error_async(e) or attempt_num == self.retry_limit: - self.log.exception("HTTP error, status code: %s", e.status) - # In this case, the user probably made a mistake. - # Don't retry. - return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"} - - await asyncio.sleep(self.retry_delay) - - def _generate_base_url(self, conn: Connection) -> str: - if conn.host and "://" in conn.host: - base_url: str = conn.host - else: - # schema defaults to HTTP - schema = conn.schema if conn.schema else "http" - host = conn.host if conn.host else "" - base_url = f"{schema}://{host}" - if conn.port: - base_url = f"{base_url}:{conn.port}" - return base_url - async def run_method( self, endpoint: str, @@ -615,7 +524,20 @@ async def run_method( back_method = self.method self.method = method try: - result = await self._do_api_call_async(endpoint, data, headers, self.extra_options) + endpoint = ( + f"{self.endpoint_prefix}/{endpoint}" + if self.endpoint_prefix and endpoint + else endpoint or self.endpoint_prefix + ) + + response = await self.run( + endpoint=endpoint, + data=data, + headers={**self._def_headers, **self.extra_headers, **(headers or {})}, + extra_options=self.extra_options, + ) + + result = await response.json() finally: self.method = back_method return {"status": "success", "response": result} diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index b2766d7560ab3..c203ffbd69490 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -32,6 +32,7 @@ from requests.exceptions import ConnectionError, HTTPError from requests.models import DEFAULT_REDIRECT_LIMIT from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter +from tenacity import retry_if_exception from airflow.providers.common.compat.sdk import AirflowException, BaseHook from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException @@ -97,6 +98,26 @@ def _process_extra_options_from_connection( return conn_extra_options, passed_extra_options +def _retryable_error_async(exception: ClientResponseError) -> bool: + """ + Determine whether an exception may successful on a subsequent attempt. + + It considers the following to be retryable: + - requests_exceptions.ConnectionError + - requests_exceptions.Timeout + - anything with a status code >= 500 + + Most retryable errors are covered by status code >= 500. + """ + if exception.status == 429: + # don't retry for too Many Requests + return False + if exception.status == 413: + # don't retry for payload Too Large + return False + return exception.status >= 500 + + class HttpHook(BaseHook): """ Interact with HTTP servers. @@ -440,7 +461,7 @@ def retry_delay(self) -> float: return self._hook.retry_delay @property - def headers(self) -> dict[str, Any]: + def headers(self) -> dict[str, Any] | None: return self.config.headers @property @@ -488,6 +509,7 @@ async def request_func() -> ClientResponse: async for attempt in AsyncRetrying( stop=stop_after_attempt(self.retry_limit), wait=wait_fixed(self.retry_delay), + retry=retry_if_exception(_retryable_error_async), reraise=True, ): with attempt: @@ -495,26 +517,6 @@ async def request_func() -> ClientResponse: raise NotImplementedError # should not reach this, but makes mypy happy - @classmethod - def _retryable_error_async(cls, exception: ClientResponseError) -> bool: - """ - Determine whether an exception may successful on a subsequent attempt. - - It considers the following to be retryable: - - requests_exceptions.ConnectionError - - requests_exceptions.Timeout - - anything with a status code >= 500 - - Most retryable errors are covered by status code >= 500. - """ - if exception.status == 429: - # don't retry for too Many Requests - return False - if exception.status == 413: - # don't retry for payload Too Large - return False - return exception.status >= 500 - class HttpAsyncHook(BaseHook): """ From d19c593c56f50723f1807a0b4623e24647bd27e3 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 30 Jan 2026 20:15:42 +0100 Subject: [PATCH 04/38] refactor: Reformatted HttpAsyncHook --- .../http/src/airflow/providers/http/hooks/http.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index c203ffbd69490..b6d9c6ac284cc 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -18,9 +18,9 @@ from __future__ import annotations import copy -from collections.abc import Awaitable, Callable +from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager -from typing import TYPE_CHECKING, Any, cast, AsyncGenerator +from typing import TYPE_CHECKING, Any, cast from urllib.parse import urlparse import aiohttp @@ -35,7 +35,7 @@ from tenacity import retry_if_exception from airflow.providers.common.compat.sdk import AirflowException, BaseHook -from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException +from airflow.providers.http.exceptions import HttpMethodException from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: @@ -643,7 +643,11 @@ async def run( if session is not None: request = self._get_request_func(session=session) config = await self.config() - return await AsyncHttpSession(hook=self, request=request, config=config).run(endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options) + return await AsyncHttpSession(hook=self, request=request, config=config).run( + endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options + ) async with self.session() as http: - return await http.run(endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options) + return await http.run( + endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options + ) From a2cecc686f0a1053eb44d38ca0b8d7169210684c Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 14:06:27 +0100 Subject: [PATCH 05/38] refactor: Fixed possible None types for merged_headers --- providers/http/src/airflow/providers/http/hooks/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 768869015cc67..6237ba8c629b2 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -492,7 +492,7 @@ async def run( extra_options = extra_options or {} url = _url_from_endpoint(self.base_url, endpoint) - merged_headers = {**self.headers, **(headers or {})} + merged_headers = {**(self.headers or {}), **(headers or {})} async def request_func() -> ClientResponse: response = await self._request( From c233e8e4008a179fce236f72c3aff3cdb6a6ab97 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 14:07:34 +0100 Subject: [PATCH 06/38] refactor: Changed type of _retryable_error_async method --- providers/http/src/airflow/providers/http/hooks/http.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 6237ba8c629b2..2fa7417f80486 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -99,7 +99,7 @@ def _process_extra_options_from_connection( return conn_extra_options, passed_extra_options -def _retryable_error_async(exception: ClientResponseError) -> bool: +def _retryable_error_async(exception: BaseException) -> bool: """ Determine whether an exception may successful on a subsequent attempt. @@ -110,6 +110,8 @@ def _retryable_error_async(exception: ClientResponseError) -> bool: Most retryable errors are covered by status code >= 500. """ + if not isinstance(exception, ClientResponseError): + return False if exception.status == 429: # don't retry for too Many Requests return False From 15754afecad6e8bf7ec395da06eddfa70c367db2 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 14:07:49 +0100 Subject: [PATCH 07/38] refactor: Removed unused import --- providers/http/src/airflow/providers/http/hooks/http.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 2fa7417f80486..0e7440cf795d6 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -34,7 +34,6 @@ from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter from tenacity import retry_if_exception -from airflow.providers.common.compat.connection import get_async_connection from airflow.providers.common.compat.sdk import AirflowException, BaseHook from airflow.providers.http.exceptions import HttpMethodException from airflow.utils.log.logging_mixin import LoggingMixin From f9c503d4b6aecfbda636d25daf0d6fa59ddd3dd4 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 14:38:10 +0100 Subject: [PATCH 08/38] refactor: Moved SessionConfig inside AsyncHttpSession --- .../src/airflow/providers/http/hooks/http.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 0e7440cf795d6..035499af192d9 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -424,13 +424,12 @@ def test_connection(self): return False, str(e) -class SessionConfig(BaseModel): - base_url: str - headers: dict[str, Any] | None = None - auth: aiohttp.BasicAuth | None = None - - class AsyncHttpSession(LoggingMixin): + class SessionConfig(BaseModel): + base_url: str + headers: dict[str, Any] | None = None + auth: aiohttp.BasicAuth | None = None + def __init__( self, hook: HttpAsyncHook, @@ -553,7 +552,7 @@ def __init__( raise ValueError("Retry limit must be greater or equal to 1") self.retry_limit = retry_limit self.retry_delay = retry_delay - self._config: SessionConfig | None = None + self._config: AsyncHttpSession.SessionConfig | None = None def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any]: method = self.method @@ -573,7 +572,7 @@ def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any return session.options raise HttpMethodException(f"Unexpected HTTP Method: {method}") - async def config(self) -> SessionConfig: + async def config(self) -> AsyncHttpSession.SessionConfig: if not self._config: from airflow.providers.common.compat.connection import get_async_connection @@ -602,7 +601,7 @@ async def config(self) -> SessionConfig: ) headers.update(conn_extra_options) - self._config = SessionConfig( + self._config = AsyncHttpSession.SessionConfig( base_url=base_url, headers=headers, auth=auth, From 7e382b4eae8d0f95964259f009cc3dbba04075be Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 15:12:25 +0100 Subject: [PATCH 09/38] refactor: Reformatted run method of HttpAsyncHook --- providers/http/src/airflow/providers/http/hooks/http.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 035499af192d9..99997533b24fb 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -640,7 +640,6 @@ async def run( For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)``. """ - if session is not None: request = self._get_request_func(session=session) config = await self.config() From 7a109d849c960ba00fc5437b939b026cebbbbd51 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 15:13:48 +0100 Subject: [PATCH 10/38] refactor: Removed unused import from LivyHook module --- .../src/airflow/providers/apache/livy/hooks/livy.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index ec5818baaefb4..e50697045193e 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -16,23 +16,16 @@ # under the License. from __future__ import annotations -import asyncio import json import re from collections.abc import Sequence from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import Any -import aiohttp import requests -from aiohttp import ClientResponseError -from airflow.providers.common.compat.connection import get_async_connection from airflow.providers.common.compat.sdk import AirflowException -from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook, _retryable_error_async - -if TYPE_CHECKING: - from airflow.models import Connection +from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook class BatchState(Enum): From c8a10b9f04ff078f073b38fb73223e86f2826c20 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 15:15:07 +0100 Subject: [PATCH 11/38] Revert "refactor: Moved SessionConfig inside AsyncHttpSession" This reverts commit f9c503d4b6aecfbda636d25daf0d6fa59ddd3dd4. --- .../src/airflow/providers/http/hooks/http.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 99997533b24fb..ee0df41ab80ba 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -424,12 +424,13 @@ def test_connection(self): return False, str(e) -class AsyncHttpSession(LoggingMixin): - class SessionConfig(BaseModel): - base_url: str - headers: dict[str, Any] | None = None - auth: aiohttp.BasicAuth | None = None +class SessionConfig(BaseModel): + base_url: str + headers: dict[str, Any] | None = None + auth: aiohttp.BasicAuth | None = None + +class AsyncHttpSession(LoggingMixin): def __init__( self, hook: HttpAsyncHook, @@ -552,7 +553,7 @@ def __init__( raise ValueError("Retry limit must be greater or equal to 1") self.retry_limit = retry_limit self.retry_delay = retry_delay - self._config: AsyncHttpSession.SessionConfig | None = None + self._config: SessionConfig | None = None def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any]: method = self.method @@ -572,7 +573,7 @@ def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any return session.options raise HttpMethodException(f"Unexpected HTTP Method: {method}") - async def config(self) -> AsyncHttpSession.SessionConfig: + async def config(self) -> SessionConfig: if not self._config: from airflow.providers.common.compat.connection import get_async_connection @@ -601,7 +602,7 @@ async def config(self) -> AsyncHttpSession.SessionConfig: ) headers.update(conn_extra_options) - self._config = AsyncHttpSession.SessionConfig( + self._config = SessionConfig( base_url=base_url, headers=headers, auth=auth, From 491e3653141c807289aace05a5e1c6564576cc4e Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 15:34:47 +0100 Subject: [PATCH 12/38] refactor: Added docstring for retry_limit and retry_delay parameters --- providers/http/src/airflow/providers/http/hooks/http.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index ee0df41ab80ba..90a1e3899b874 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -529,6 +529,8 @@ class HttpAsyncHook(BaseHook): API url i.e https://www.google.com/ and optional authentication credentials. Default headers can also be specified in the Extra field in json format. :param auth_type: The auth type for the service + :param retry_limit: Maximum number of times to retry this job if it fails (default is 3) + :param retry_delay: Delay between retry attempts (default is 1.0) """ conn_name_attr = "http_conn_id" From c77715e385ed47ffdbd1aa69a5375a21bed88ae4 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 15:37:16 +0100 Subject: [PATCH 13/38] refactor: Reformatted docstring in _retryable_error_async method --- providers/http/src/airflow/providers/http/hooks/http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 90a1e3899b874..7a4cc146f7b15 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -103,9 +103,9 @@ def _retryable_error_async(exception: BaseException) -> bool: Determine whether an exception may successful on a subsequent attempt. It considers the following to be retryable: - - requests_exceptions.ConnectionError - - requests_exceptions.Timeout - - anything with a status code >= 500 + - requests_exceptions.ConnectionError + - requests_exceptions.Timeout + - anything with a status code >= 500 Most retryable errors are covered by status code >= 500. """ From 53cb8b3a5f6e81044405e906b3eabfae467df130 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 2 Feb 2026 19:29:26 +0100 Subject: [PATCH 14/38] refactor: Added docstring for SessionConfig and AsyncHttpSession --- .../src/airflow/providers/http/hooks/http.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 7a4cc146f7b15..855973ffd84fa 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -425,12 +425,27 @@ def test_connection(self): class SessionConfig(BaseModel): + """Configuration container for an asynchronous HTTP session.""" + base_url: str headers: dict[str, Any] | None = None auth: aiohttp.BasicAuth | None = None class AsyncHttpSession(LoggingMixin): + """ + Wrapper around an ``aiohttp.ClientSession`` providing a session bound HttpAsyncHook. + + This class binds an asynchronous HTTP client session to an ``HttpAsyncHook`` and applies connection + configuration, authentication, headers, and retry logic consistently across requests. A single + ``AsyncHttpSession`` instance is intended to be used for multiple HTTP calls within the same logical session. + + :param hook: The ``HttpAsyncHook`` instance that owns this session and provides connection-level behavior + such as retries and logging. + :param request: A callable used to perform the underlying HTTP request. This is typically a bound + ``aiohttp.ClientSession`` request method. + :param config: Resolved session configuration containing base URL, headers, and authentication settings. + """ def __init__( self, hook: HttpAsyncHook, @@ -615,6 +630,7 @@ async def config(self) -> SessionConfig: async def session(self) -> AsyncGenerator[AsyncHttpSession, None]: """ Create an AsyncHttpSession bound to a single aiohttp.ClientSession. + Airflow connection resolution happens exactly once here. """ async with aiohttp.ClientSession() as session: From 5eda002a0c1abfe8d53187a87479b6737ab6f12a Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 11:31:26 +0100 Subject: [PATCH 15/38] refactor: Added warning logging when run attempt fails --- .../http/src/airflow/providers/http/hooks/http.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 855973ffd84fa..74aa16bda3b8d 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -514,7 +514,7 @@ async def request_func() -> ClientResponse: response = await self._request( url, params=data if self.method == "GET" else None, - data=data if self.method in ("POST", "PUT", "PATCH") else None, + data=data if self.method in {"POST", "PUT", "PATCH"} else None, json=json, headers=merged_headers, auth=self.auth, @@ -530,7 +530,16 @@ async def request_func() -> ClientResponse: reraise=True, ): with attempt: - return await request_func() + try: + return await request_func() + except ClientResponseError as e: + self.log.warning( + "[Try %d of %d] Request to %s failed.", + attempt.retry_state.attempt_number, + self.retry_limit, + url, + ) + raise e raise NotImplementedError # should not reach this, but makes mypy happy From 10620f46f104027ddf133f842f9eb6e8f5aaa0bf Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 12:22:23 +0100 Subject: [PATCH 16/38] refactor: Refactored run_method of LivyAsyncHook --- .../providers/apache/livy/hooks/livy.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index e50697045193e..e4ba1f5f568c3 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -23,6 +23,7 @@ from typing import Any import requests +from aiohttp import ClientResponseError from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.http.hooks.http import HttpAsyncHook, HttpHook @@ -498,7 +499,7 @@ def __init__( async def run_method( self, endpoint: str, - method: str = "GET", + method: str | None = None, data: Any | None = None, headers: dict[str, Any] | None = None, ) -> Any: @@ -511,18 +512,18 @@ async def run_method( :param headers: headers :return: http response """ - if method not in ("GET", "POST", "PUT", "DELETE", "HEAD"): + + method = method or self.method + if method not in {"GET", "PATCH", "POST", "PUT", "DELETE", "HEAD"}: return {"status": "error", "response": f"Invalid http method {method}"} - back_method = self.method - self.method = method - try: - endpoint = ( - f"{self.endpoint_prefix}/{endpoint}" - if self.endpoint_prefix and endpoint - else endpoint or self.endpoint_prefix - ) + endpoint = ( + f"{self.endpoint_prefix}/{endpoint}" + if self.endpoint_prefix and endpoint + else endpoint or self.endpoint_prefix + ) + try: response = await self.run( endpoint=endpoint, data=data, @@ -531,9 +532,9 @@ async def run_method( ) result = await response.json() - finally: - self.method = back_method - return {"status": "success", "response": result} + return {"status": "success", "response": result} + except ClientResponseError as e: + return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"} async def get_batch_state(self, session_id: int | str) -> Any: """ From 8a1f6750e9bc1bd3e7719b0915ee02b156c2043c Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 12:22:44 +0100 Subject: [PATCH 17/38] refactor: Refactored unit tests for LivyAsyncHook --- .../tests/unit/apache/livy/hooks/test_livy.py | 153 ++++++------------ 1 file changed, 48 insertions(+), 105 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 3353d7969150a..70a28049737a7 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -24,6 +24,7 @@ import pytest import requests from aiohttp import ClientResponseError, RequestInfo +from requests import Response from requests.exceptions import RequestException from airflow.models import Connection @@ -592,159 +593,99 @@ async def test_dump_batch_logs_error(self, mock_get_batch_logs): assert log_dump == {"id": 1, "log": ["mock_log_1", "mock_log_2"]} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.LivyAsyncHook._do_api_call_async") - async def test_run_method_success(self, mock_do_api_call_async): + @mock.patch("airflow.providers.apache.livy.hooks.livy.LivyAsyncHook.run") + async def test_run_method_success(self, mock_run): """Asserts the run_method for success response.""" - mock_do_api_call_async.return_value = {"status": "error", "response": {"id": 1}} + response = MagicMock(spec=Response) + response.json = AsyncMock(return_value={"id": 1}) + mock_run.return_value = response hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) response = await hook.run_method("localhost", "GET") - assert response["status"] == "success" + assert response == {"status": "success", "response": {"id": 1}} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.LivyAsyncHook._do_api_call_async") - async def test_run_method_error(self, mock_do_api_call_async): + async def test_run_method_error(self): """Asserts the run_method for error response.""" - mock_do_api_call_async.return_value = {"status": "error", "response": {"id": 1}} hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) response = await hook.run_method("localhost", "abc") assert response == {"status": "error", "response": "Invalid http method abc"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_post_method_with_success(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for success response for POST method.""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun + @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") + async def test_run_post_method_with_success(self, mock_session): + """Asserts the run_method for success response for POST method.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() mock_session.return_value.__aenter__.return_value.post.return_value.json = AsyncMock( - return_value={"status": "success"} + return_value={"hello": "world"} ) - GET_RUN_ENDPOINT = "api/jobs/runs/get" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) - hook.http_conn_id = mock_get_connection - hook.http_conn_id.host = "https://localhost" - hook.http_conn_id.login = "login" - hook.http_conn_id.password = "PASSWORD" - response = await hook._do_api_call_async(GET_RUN_ENDPOINT) - assert response == {"status": "success"} + response = await hook.run_method("api/jobs/runs/get") + assert response["status"] == "success" + assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_get_method_with_success(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for GET method.""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun + @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") + async def test_run_get_method_with_success(self, mock_session): + """Asserts the run_method for GET method.""" mock_session.return_value.__aenter__.return_value.get = AsyncMock() mock_session.return_value.__aenter__.return_value.get.return_value.json = AsyncMock( - return_value={"status": "success"} + return_value={"hello": "world"} ) - GET_RUN_ENDPOINT = "api/jobs/runs/get" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) hook.method = "GET" - hook.http_conn_id = mock_get_connection - hook.http_conn_id.host = "test.com" - hook.http_conn_id.login = "login" - hook.http_conn_id.password = "PASSWORD" - hook.http_conn_id.extra_dejson = "" - response = await hook._do_api_call_async(GET_RUN_ENDPOINT) - assert response == {"status": "success"} + response = await hook.run_method("api/jobs/runs/get") + assert response["status"] == "success" + assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_patch_method_with_success(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for PATCH method.""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun + @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") + async def test_run_patch_method_with_success(self, mock_session): + """Asserts the run_method for PATCH method.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock() mock_session.return_value.__aenter__.return_value.patch.return_value.json = AsyncMock( - return_value={"status": "success"} + return_value={"hello": "world"} ) - GET_RUN_ENDPOINT = "api/jobs/runs/get" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) hook.method = "PATCH" - hook.http_conn_id = mock_get_connection - hook.http_conn_id.host = "test.com" - hook.http_conn_id.login = "login" - hook.http_conn_id.password = "PASSWORD" - hook.http_conn_id.extra_dejson = "" - response = await hook._do_api_call_async(GET_RUN_ENDPOINT) - assert response == {"status": "success"} + response = await hook.run_method("api/jobs/runs/get") + assert response["status"] == "success" + assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_unexpected_method_error(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for unexpected method error""" - GET_RUN_ENDPOINT = "api/jobs/runs/get" + async def test_run_unexpected_method_with_success(self): + """Asserts the run_method for unexpected method error""" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) hook.method = "abc" - hook.http_conn_id = mock_get_connection - hook.http_conn_id.host = "test.com" - hook.http_conn_id.login = "login" - hook.http_conn_id.password = "PASSWORD" - hook.http_conn_id.extra_dejson = "" - response = await hook._do_api_call_async(endpoint=GET_RUN_ENDPOINT, headers={}) - assert response == {"Response": "Unexpected HTTP Method: abc", "status": "error"} + response = await hook.run_method(endpoint="api/jobs/runs/get", headers={}) + assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_with_type_error(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for TypeError.""" + async def test_run_put_method_with_type_error(self): + """Asserts the run_method for TypeError.""" async def mock_fun(arg1, arg2, arg3, arg4): return {"random value"} - mock_session.return_value.__aexit__.return_value = mock_fun - mock_session.return_value.__aenter__.return_value.patch.return_value.json.return_value = {} hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) hook.method = "PATCH" - hook.retry_limit = 1 - hook.retry_delay = 1 - hook.http_conn_id = mock_get_connection with pytest.raises(TypeError): - await hook._do_api_call_async(endpoint="", data="test", headers=mock_fun, extra_options=mock_fun) + await hook.run_method(endpoint="api/jobs/runs/get", data="test", headers=mock_fun) @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") - async def test_do_api_call_async_with_client_response_error(self, mock_get_connection, mock_session): - """Asserts the _do_api_call_async for Client Response Error.""" + @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") + async def test_run_method_with_client_response_error(self, mock_session): + """Asserts the run_method for Client Response Error.""" - async def mock_fun(arg1, arg2, arg3, arg4): - return {"random value"} - - mock_session.return_value.__aexit__.return_value = mock_fun - mock_session.return_value.__aenter__.return_value.patch = AsyncMock() - mock_session.return_value.__aenter__.return_value.patch.return_value.json.side_effect = ( - ClientResponseError( + mock_session.return_value.__aenter__.return_value.patch = AsyncMock( + side_effect=ClientResponseError( request_info=RequestInfo(url="example.com", method="PATCH", headers=multidict.CIMultiDict()), status=500, history=[], ) ) - GET_RUN_ENDPOINT = "" hook = LivyAsyncHook(livy_conn_id="livy_default") hook.method = "PATCH" - hook.base_url = "" - hook.http_conn_id = mock_get_connection - hook.http_conn_id.host = "test.com" - hook.http_conn_id.login = "login" - hook.http_conn_id.password = "PASSWORD" - hook.http_conn_id.extra_dejson = "" - response = await hook._do_api_call_async(GET_RUN_ENDPOINT) + response = await hook.run_method("") assert response["status"] == "error" @pytest.fixture @@ -764,7 +705,8 @@ def setup_livy_conn(self, create_connection_without_db): create_connection_without_db(Connection(conn_id="missing_host", conn_type="http", port=1234)) create_connection_without_db(Connection(conn_id="invalid_uri", uri="http://invalid_uri:4321")) - def test_build_get_hook(self, setup_livy_conn): + @pytest.mark.asyncio + async def test_build_get_hook(self, setup_livy_conn): connection_url_mapping = { # id, expected "default_port": "http://host", @@ -776,9 +718,10 @@ def test_build_get_hook(self, setup_livy_conn): for conn_id, expected in connection_url_mapping.items(): hook = LivyAsyncHook(livy_conn_id=conn_id) - response_conn: Connection = hook.get_connection(conn_id=conn_id) - assert isinstance(response_conn, Connection) - assert hook._generate_base_url(response_conn) == expected + async with hook.session() as session: + response_conn: Connection = hook.get_connection(conn_id=conn_id) + assert isinstance(response_conn, Connection) + assert session.base_url == expected def test_build_body(self): # minimal request From 0beab47ca5a5520186dce97c267af27ac458b8d5 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 13:01:45 +0100 Subject: [PATCH 18/38] refactor: Reformatted AsyncHttpSession --- providers/http/src/airflow/providers/http/hooks/http.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 74aa16bda3b8d..0580c2fe96b64 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -446,6 +446,7 @@ class AsyncHttpSession(LoggingMixin): ``aiohttp.ClientSession`` request method. :param config: Resolved session configuration containing base URL, headers, and authentication settings. """ + def __init__( self, hook: HttpAsyncHook, From d6498509c3497ce14f0b99dbed49fd8f46f076a1 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 13:02:08 +0100 Subject: [PATCH 19/38] refactor: Reformatted run_method of LivyAsyncHook --- .../apache/livy/src/airflow/providers/apache/livy/hooks/livy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index e4ba1f5f568c3..022b986a0e8c2 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -512,7 +512,6 @@ async def run_method( :param headers: headers :return: http response """ - method = method or self.method if method not in {"GET", "PATCH", "POST", "PUT", "DELETE", "HEAD"}: return {"status": "error", "response": f"Invalid http method {method}"} From bd93295fd3fa7ce85833ad806944fbe360f8aeb9 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 13:04:26 +0100 Subject: [PATCH 20/38] refactor: Escape aiohttp.ClientSession in docstring of session contextmanager in HttpAsyncHook --- providers/http/src/airflow/providers/http/hooks/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 0580c2fe96b64..86e24333e3de9 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -639,7 +639,7 @@ async def config(self) -> SessionConfig: @asynccontextmanager async def session(self) -> AsyncGenerator[AsyncHttpSession, None]: """ - Create an AsyncHttpSession bound to a single aiohttp.ClientSession. + Create an AsyncHttpSession bound to a single ``aiohttp.ClientSession``. Airflow connection resolution happens exactly once here. """ From 232f0b2a9a6ad61b572b69fc425b158f1d6416a4 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 13:30:31 +0100 Subject: [PATCH 21/38] refactor: Also take into extra_options from connection when building AsyncHttpSession --- .../src/airflow/providers/http/hooks/http.py | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 86e24333e3de9..6fad4982ef420 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -35,7 +35,7 @@ from tenacity import retry_if_exception from airflow.providers.common.compat.sdk import AirflowException, BaseHook -from airflow.providers.http.exceptions import HttpMethodException +from airflow.providers.http.exceptions import HttpMethodException, HttpErrorException from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: @@ -430,6 +430,7 @@ class SessionConfig(BaseModel): base_url: str headers: dict[str, Any] | None = None auth: aiohttp.BasicAuth | None = None + extra_options: dict[str, Any] | None = None class AsyncHttpSession(LoggingMixin): @@ -482,6 +483,10 @@ def retry_delay(self) -> float: def headers(self) -> dict[str, Any] | None: return self.config.headers + @property + def extra_options(self) -> dict[str, Any] | None: + return self.config.extra_options + @property def auth(self) -> aiohttp.BasicAuth | None: return self.config.auth @@ -507,9 +512,9 @@ async def run( """ from tenacity import AsyncRetrying, stop_after_attempt, wait_fixed - extra_options = extra_options or {} url = _url_from_endpoint(self.base_url, endpoint) merged_headers = {**(self.headers or {}), **(headers or {})} + extra_options = {**(self.extra_options or {}), **(extra_options or {})} async def request_func() -> ClientResponse: response = await self._request( @@ -607,6 +612,7 @@ async def config(self) -> SessionConfig: base_url: str = self.base_url auth: aiohttp.BasicAuth | None = None headers: dict[str, Any] = {} + extra_options: dict[str, Any] = {} if self.http_conn_id: conn = await get_async_connection(conn_id=self.http_conn_id) @@ -624,7 +630,7 @@ async def config(self) -> SessionConfig: auth = self.auth_type(conn.login, conn.password) if conn.extra: - conn_extra_options, _ = _process_extra_options_from_connection( + conn_extra_options, extra_options = _process_extra_options_from_connection( conn=conn, extra_options={} ) headers.update(conn_extra_options) @@ -633,6 +639,7 @@ async def config(self) -> SessionConfig: base_url=base_url, headers=headers, auth=auth, + extra_options=extra_options, ) return self._config @@ -669,14 +676,17 @@ async def run( For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)``. """ - if session is not None: - request = self._get_request_func(session=session) - config = await self.config() - return await AsyncHttpSession(hook=self, request=request, config=config).run( - endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options - ) - - async with self.session() as http: - return await http.run( - endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options - ) + try: + if session is not None: + request = self._get_request_func(session=session) + config = await self.config() + return await AsyncHttpSession(hook=self, request=request, config=config).run( + endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options + ) + + async with self.session() as http: + return await http.run( + endpoint=endpoint, data=data, json=json, headers=headers, extra_options=extra_options + ) + except ClientResponseError as e: + raise HttpErrorException(f"{e.status}:{e.message}") From d8918efae41c052c035cec649b96e3bd3728a2fc Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 13:30:56 +0100 Subject: [PATCH 22/38] refactor: Fixed mocking of test_run_method_success --- .../livy/tests/unit/apache/livy/hooks/test_livy.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 70a28049737a7..e595aa3f7bae6 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -593,12 +593,13 @@ async def test_dump_batch_logs_error(self, mock_get_batch_logs): assert log_dump == {"id": 1, "log": ["mock_log_1", "mock_log_2"]} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.LivyAsyncHook.run") - async def test_run_method_success(self, mock_run): + @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") + async def test_run_method_success(self, mock_session): """Asserts the run_method for success response.""" - response = MagicMock(spec=Response) - response.json = AsyncMock(return_value={"id": 1}) - mock_run.return_value = response + mock_session.return_value.__aenter__.return_value.post = AsyncMock() + mock_session.return_value.__aenter__.return_value.post.return_value.json = AsyncMock( + return_value={"id": 1} + ) hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) response = await hook.run_method("localhost", "GET") assert response == {"status": "success", "response": {"id": 1}} From 068493fa5c9d303c9eb9fb48ad8236c488fc6652 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 14:28:43 +0100 Subject: [PATCH 23/38] refactor: Removed unused imports --- providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index e595aa3f7bae6..fc99d9728c7fc 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -24,7 +24,6 @@ import pytest import requests from aiohttp import ClientResponseError, RequestInfo -from requests import Response from requests.exceptions import RequestException from airflow.models import Connection From 3ce90aa1341775368db9199717996dbaf1c42368 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 14:29:23 +0100 Subject: [PATCH 24/38] refactor: Reorganized imports --- providers/http/src/airflow/providers/http/hooks/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 6fad4982ef420..a914b9dbcb9f5 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -35,7 +35,7 @@ from tenacity import retry_if_exception from airflow.providers.common.compat.sdk import AirflowException, BaseHook -from airflow.providers.http.exceptions import HttpMethodException, HttpErrorException +from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: From 2280100f4f8a290697218450277d58be1dcfdeb1 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 14:36:40 +0100 Subject: [PATCH 25/38] refactor: Run method of LivyAsyncHook must internally use session from HttpAsyncHook so it doesn't rely on the error handling of the HttpAsyncHook run method --- .../airflow/providers/apache/livy/hooks/livy.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index 022b986a0e8c2..e9f8e94c74843 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -523,15 +523,16 @@ async def run_method( ) try: - response = await self.run( - endpoint=endpoint, - data=data, - headers={**self._def_headers, **self.extra_headers, **(headers or {})}, - extra_options=self.extra_options, - ) + async with self.session() as session: + response = await session.run( + endpoint=endpoint, + data=data, + headers={**self._def_headers, **self.extra_headers, **(headers or {})}, + extra_options=self.extra_options, + ) - result = await response.json() - return {"status": "success", "response": result} + result = await response.json() + return {"status": "success", "response": result} except ClientResponseError as e: return {"Response": {e.message}, "Status Code": {e.status}, "status": "error"} From ff2b4b1f809b25fbae860ed3cb7f2c97862aec56 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 3 Feb 2026 16:44:29 +0100 Subject: [PATCH 26/38] refactor: Escape reserved words in HttpAsyncHook --- providers/http/src/airflow/providers/http/hooks/http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index a914b9dbcb9f5..240bd24d06cc2 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -435,7 +435,7 @@ class SessionConfig(BaseModel): class AsyncHttpSession(LoggingMixin): """ - Wrapper around an ``aiohttp.ClientSession`` providing a session bound HttpAsyncHook. + Wrapper around an ``aiohttp.ClientSession`` providing a session bound ``HttpAsyncHook``. This class binds an asynchronous HTTP client session to an ``HttpAsyncHook`` and applies connection configuration, authentication, headers, and retry logic consistently across requests. A single @@ -646,7 +646,7 @@ async def config(self) -> SessionConfig: @asynccontextmanager async def session(self) -> AsyncGenerator[AsyncHttpSession, None]: """ - Create an AsyncHttpSession bound to a single ``aiohttp.ClientSession``. + Create an ``AsyncHttpSession`` bound to a single ``aiohttp.ClientSession``. Airflow connection resolution happens exactly once here. """ @@ -667,7 +667,7 @@ async def run( """ Perform an asynchronous HTTP request call. - :param session: aiohttp.ClientSession + :param session: ``aiohttp.ClientSession`` :param endpoint: Endpoint to be called, i.e. ``resource/v1/query?``. :param data: Payload to be uploaded or request parameters. :param json: Payload to be uploaded as JSON. From ffdae830023a240922cc2d73083f56dd651e8551 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 4 Feb 2026 08:20:03 +0100 Subject: [PATCH 27/38] refactor: Mock get_async_connection in TestLivyAsyncHook --- .../tests/unit/apache/livy/hooks/test_livy.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index fc99d9728c7fc..a73fb527666f4 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -612,7 +612,8 @@ async def test_run_method_error(self): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - async def test_run_post_method_with_success(self, mock_session): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_post_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for success response for POST method.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() mock_session.return_value.__aenter__.return_value.post.return_value.json = AsyncMock( @@ -625,7 +626,8 @@ async def test_run_post_method_with_success(self, mock_session): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - async def test_run_get_method_with_success(self, mock_session): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_get_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for GET method.""" mock_session.return_value.__aenter__.return_value.get = AsyncMock() mock_session.return_value.__aenter__.return_value.get.return_value.json = AsyncMock( @@ -639,7 +641,8 @@ async def test_run_get_method_with_success(self, mock_session): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - async def test_run_patch_method_with_success(self, mock_session): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_patch_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for PATCH method.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock() mock_session.return_value.__aenter__.return_value.patch.return_value.json = AsyncMock( @@ -652,7 +655,8 @@ async def test_run_patch_method_with_success(self, mock_session): assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - async def test_run_unexpected_method_with_success(self): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_unexpected_method_with_success(self, mock_get_connection): """Asserts the run_method for unexpected method error""" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) hook.method = "abc" @@ -660,7 +664,8 @@ async def test_run_unexpected_method_with_success(self): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - async def test_run_put_method_with_type_error(self): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_put_method_with_type_error(self, mock_get_connection): """Asserts the run_method for TypeError.""" async def mock_fun(arg1, arg2, arg3, arg4): @@ -673,7 +678,8 @@ async def mock_fun(arg1, arg2, arg3, arg4): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - async def test_run_method_with_client_response_error(self, mock_session): + @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + async def test_run_method_with_client_response_error(self, mock_get_connection, mock_session): """Asserts the run_method for Client Response Error.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock( From 4af843de4bc6635d8f619c552bcd2a2ebb2652a5 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 4 Feb 2026 16:44:24 +0100 Subject: [PATCH 28/38] refactor: Mock get_async_connection in TestLivyAsyncHook should be patched on http hook module --- .../livy/tests/unit/apache/livy/hooks/test_livy.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index a73fb527666f4..13d73828accee 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -612,7 +612,7 @@ async def test_run_method_error(self): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_post_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for success response for POST method.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() @@ -626,7 +626,7 @@ async def test_run_post_method_with_success(self, mock_get_connection, mock_sess @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_get_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for GET method.""" mock_session.return_value.__aenter__.return_value.get = AsyncMock() @@ -641,7 +641,7 @@ async def test_run_get_method_with_success(self, mock_get_connection, mock_sessi @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_patch_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for PATCH method.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock() @@ -655,7 +655,7 @@ async def test_run_patch_method_with_success(self, mock_get_connection, mock_ses assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_unexpected_method_with_success(self, mock_get_connection): """Asserts the run_method for unexpected method error""" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) @@ -664,7 +664,7 @@ async def test_run_unexpected_method_with_success(self, mock_get_connection): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_put_method_with_type_error(self, mock_get_connection): """Asserts the run_method for TypeError.""" @@ -678,7 +678,7 @@ async def mock_fun(arg1, arg2, arg3, arg4): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.apache.livy.hooks.livy.get_async_connection") + @mock.patch("airflow.providers.http.hooks.http.get_async_connection") async def test_run_method_with_client_response_error(self, mock_get_connection, mock_session): """Asserts the run_method for Client Response Error.""" From 5d798eea1be6ed2965d600fb42fc74f358149f4f Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 5 Feb 2026 09:46:58 +0100 Subject: [PATCH 29/38] refactor: Mock get_async_connection in TestLivyAsyncHook should be patched on http hook module --- .../livy/tests/unit/apache/livy/hooks/test_livy.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 13d73828accee..97f2aa76200ae 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -612,7 +612,7 @@ async def test_run_method_error(self): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_post_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for success response for POST method.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() @@ -626,7 +626,7 @@ async def test_run_post_method_with_success(self, mock_get_connection, mock_sess @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_get_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for GET method.""" mock_session.return_value.__aenter__.return_value.get = AsyncMock() @@ -641,7 +641,7 @@ async def test_run_get_method_with_success(self, mock_get_connection, mock_sessi @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_patch_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for PATCH method.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock() @@ -655,7 +655,7 @@ async def test_run_patch_method_with_success(self, mock_get_connection, mock_ses assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_unexpected_method_with_success(self, mock_get_connection): """Asserts the run_method for unexpected method error""" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) @@ -664,7 +664,7 @@ async def test_run_unexpected_method_with_success(self, mock_get_connection): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_put_method_with_type_error(self, mock_get_connection): """Asserts the run_method for TypeError.""" @@ -678,7 +678,7 @@ async def mock_fun(arg1, arg2, arg3, arg4): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.http.hooks.http.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection") async def test_run_method_with_client_response_error(self, mock_get_connection, mock_session): """Asserts the run_method for Client Response Error.""" From 235bf27ff08b578e2065c3ba5b582e0a77dc3bfa Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 6 Feb 2026 19:30:57 +0100 Subject: [PATCH 30/38] refactor: Make sure get_async_connection is mocked with real Connection --- .../tests/unit/apache/livy/hooks/test_livy.py | 54 ++++++++++++++++--- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 97f2aa76200ae..b28c9d11cf9bf 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -612,7 +612,14 @@ async def test_run_method_error(self): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_post_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for success response for POST method.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() @@ -626,7 +633,14 @@ async def test_run_post_method_with_success(self, mock_get_connection, mock_sess @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_get_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for GET method.""" mock_session.return_value.__aenter__.return_value.get = AsyncMock() @@ -641,7 +655,14 @@ async def test_run_get_method_with_success(self, mock_get_connection, mock_sessi @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_patch_method_with_success(self, mock_get_connection, mock_session): """Asserts the run_method for PATCH method.""" mock_session.return_value.__aenter__.return_value.patch = AsyncMock() @@ -655,7 +676,14 @@ async def test_run_patch_method_with_success(self, mock_get_connection, mock_ses assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_unexpected_method_with_success(self, mock_get_connection): """Asserts the run_method for unexpected method error""" hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) @@ -664,7 +692,14 @@ async def test_run_unexpected_method_with_success(self, mock_get_connection): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_put_method_with_type_error(self, mock_get_connection): """Asserts the run_method for TypeError.""" @@ -678,7 +713,14 @@ async def mock_fun(arg1, arg2, arg3, arg4): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection") + @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) async def test_run_method_with_client_response_error(self, mock_get_connection, mock_session): """Asserts the run_method for Client Response Error.""" From 9c41596e7074a62b60199a9f032a1b4b7c580f78 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 6 Feb 2026 21:44:54 +0100 Subject: [PATCH 31/38] refactor: Reformatted Livy unit test --- .../tests/unit/apache/livy/hooks/test_livy.py | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index b28c9d11cf9bf..dd46f583ff3fa 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -593,7 +593,16 @@ async def test_dump_batch_logs_error(self, mock_get_batch_logs): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - async def test_run_method_success(self, mock_session): + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) + async def test_run_method_success(self, mock_get_connection, mock_session): """Asserts the run_method for success response.""" mock_session.return_value.__aenter__.return_value.post = AsyncMock() mock_session.return_value.__aenter__.return_value.post.return_value.json = AsyncMock( @@ -612,7 +621,8 @@ async def test_run_method_error(self): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", return_value=Connection( conn_id=LIVY_CONN_ID, conn_type="http", @@ -633,7 +643,8 @@ async def test_run_post_method_with_success(self, mock_get_connection, mock_sess @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", return_value=Connection( conn_id=LIVY_CONN_ID, conn_type="http", @@ -655,7 +666,8 @@ async def test_run_get_method_with_success(self, mock_get_connection, mock_sessi @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", return_value=Connection( conn_id=LIVY_CONN_ID, conn_type="http", @@ -676,7 +688,8 @@ async def test_run_patch_method_with_success(self, mock_get_connection, mock_ses assert response["response"] == {"hello": "world"} @pytest.mark.asyncio - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", return_value=Connection( conn_id=LIVY_CONN_ID, conn_type="http", @@ -692,15 +705,7 @@ async def test_run_unexpected_method_with_success(self, mock_get_connection): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", - return_value=Connection( - conn_id=LIVY_CONN_ID, - conn_type="http", - host="http://host", - port=80, - ), - ) - async def test_run_put_method_with_type_error(self, mock_get_connection): + async def test_run_put_method_with_type_error(self): """Asserts the run_method for TypeError.""" async def mock_fun(arg1, arg2, arg3, arg4): @@ -713,7 +718,8 @@ async def mock_fun(arg1, arg2, arg3, arg4): @pytest.mark.asyncio @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch("airflow.providers.common.compat.connection.get_async_connection", + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", return_value=Connection( conn_id=LIVY_CONN_ID, conn_type="http", From 90e36dd557837c6268203a53123ebce783497a28 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 6 Feb 2026 23:37:49 +0100 Subject: [PATCH 32/38] refactor: Add get_async_connection mock in test_run_put_method_with_type_error --- .../livy/tests/unit/apache/livy/hooks/test_livy.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index dd46f583ff3fa..2ad0a9eb1ab49 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -705,7 +705,16 @@ async def test_run_unexpected_method_with_success(self, mock_get_connection): assert response == {"response": "Invalid http method abc", "status": "error"} @pytest.mark.asyncio - async def test_run_put_method_with_type_error(self): + @mock.patch( + "airflow.providers.common.compat.connection.get_async_connection", + return_value=Connection( + conn_id=LIVY_CONN_ID, + conn_type="http", + host="http://host", + port=80, + ), + ) + async def test_run_put_method_with_type_error(self, mock_get_connection): """Asserts the run_method for TypeError.""" async def mock_fun(arg1, arg2, arg3, arg4): From a7a87e29b6c2e14a99e814f60ae034b6d2e23912 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 11 Feb 2026 07:50:22 +0100 Subject: [PATCH 33/38] refactor: Make sure http provider dependency is set to next release when livy provider is release --- providers/apache/livy/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/apache/livy/pyproject.toml b/providers/apache/livy/pyproject.toml index 3000a45eb0530..a6740db55eb1e 100644 --- a/providers/apache/livy/pyproject.toml +++ b/providers/apache/livy/pyproject.toml @@ -59,7 +59,7 @@ requires-python = ">=3.10" # After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build`` dependencies = [ "apache-airflow>=2.11.0", - "apache-airflow-providers-http>=5.1.0", + "apache-airflow-providers-http>=5.1.0", # use next version "apache-airflow-providers-common-compat>=1.12.0", "aiohttp>=3.9.2", "asgiref>=2.3.0", From 4fb143680571abda1ae0c75d22714b8fc8ae501e Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 11 Feb 2026 07:56:09 +0100 Subject: [PATCH 34/38] refactor: Added TODO on asgiref dependency as I can probably be removed as it will be resolved transiently through common-compat provider --- providers/apache/livy/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/apache/livy/pyproject.toml b/providers/apache/livy/pyproject.toml index a6740db55eb1e..325e3d3649ca3 100644 --- a/providers/apache/livy/pyproject.toml +++ b/providers/apache/livy/pyproject.toml @@ -62,7 +62,7 @@ dependencies = [ "apache-airflow-providers-http>=5.1.0", # use next version "apache-airflow-providers-common-compat>=1.12.0", "aiohttp>=3.9.2", - "asgiref>=2.3.0", + "asgiref>=2.3.0", # TODO: I think this one can be removed? Already present in apache-airflow-providers-common-compat ] [dependency-groups] From 77d28b7ea136f442efdcd5dbfddf52efa1125d2b Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 11 Feb 2026 09:41:25 +0100 Subject: [PATCH 35/38] refactor: Removed asgiref dependency in livy provider --- providers/apache/livy/pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/apache/livy/pyproject.toml b/providers/apache/livy/pyproject.toml index 325e3d3649ca3..9a013b15034b8 100644 --- a/providers/apache/livy/pyproject.toml +++ b/providers/apache/livy/pyproject.toml @@ -62,7 +62,6 @@ dependencies = [ "apache-airflow-providers-http>=5.1.0", # use next version "apache-airflow-providers-common-compat>=1.12.0", "aiohttp>=3.9.2", - "asgiref>=2.3.0", # TODO: I think this one can be removed? Already present in apache-airflow-providers-common-compat ] [dependency-groups] From 888e88d9ea3cb7ce9352e49d3b20769338dd92fb Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 11 Feb 2026 11:32:22 +0100 Subject: [PATCH 36/38] refactor: Removed asgiref reference from docs --- providers/apache/livy/docs/index.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/apache/livy/docs/index.rst b/providers/apache/livy/docs/index.rst index 058669470b5c2..cc3d26534ed2d 100644 --- a/providers/apache/livy/docs/index.rst +++ b/providers/apache/livy/docs/index.rst @@ -103,7 +103,6 @@ PIP package Version required ``apache-airflow-providers-http`` ``>=5.1.0`` ``apache-airflow-providers-common-compat`` ``>=1.12.0`` ``aiohttp`` ``>=3.9.2`` -``asgiref`` ``>=2.3.0`` ========================================== ================== Cross provider package dependencies From 2a0d73c7b47e3aaa4c3ae354cbe075619dc49d9e Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 4 Mar 2026 09:43:30 +0100 Subject: [PATCH 37/38] refactor: Fixed assertion of Connection type in test_build_get_hook of TestLivyAsyncHook --- .../apache/livy/tests/unit/apache/livy/hooks/test_livy.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 2ad0a9eb1ab49..cba90b9d76864 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -770,6 +770,8 @@ def setup_livy_conn(self, create_connection_without_db): @pytest.mark.asyncio async def test_build_get_hook(self, setup_livy_conn): + from airflow.sdk.definitions.connection import Connection as SDKConnection + connection_url_mapping = { # id, expected "default_port": "http://host", @@ -782,8 +784,8 @@ async def test_build_get_hook(self, setup_livy_conn): for conn_id, expected in connection_url_mapping.items(): hook = LivyAsyncHook(livy_conn_id=conn_id) async with hook.session() as session: - response_conn: Connection = hook.get_connection(conn_id=conn_id) - assert isinstance(response_conn, Connection) + response_conn = hook.get_connection(conn_id=conn_id) + assert isinstance(response_conn, SDKConnection) assert session.base_url == expected def test_build_body(self): From 9f3f6d655fa4936c31cf084fae97fe523b2f8b24 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 4 Mar 2026 10:42:13 +0100 Subject: [PATCH 38/38] refactor: Don't need to assert connections anymore in test_build_get_hook of TestLivyAsyncHook --- .../apache/livy/tests/unit/apache/livy/hooks/test_livy.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index cba90b9d76864..90bdcad8866a6 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -770,8 +770,6 @@ def setup_livy_conn(self, create_connection_without_db): @pytest.mark.asyncio async def test_build_get_hook(self, setup_livy_conn): - from airflow.sdk.definitions.connection import Connection as SDKConnection - connection_url_mapping = { # id, expected "default_port": "http://host", @@ -784,8 +782,6 @@ async def test_build_get_hook(self, setup_livy_conn): for conn_id, expected in connection_url_mapping.items(): hook = LivyAsyncHook(livy_conn_id=conn_id) async with hook.session() as session: - response_conn = hook.get_connection(conn_id=conn_id) - assert isinstance(response_conn, SDKConnection) assert session.base_url == expected def test_build_body(self):