From caef38c59d66b26c93e4b7034f6020b08192906f Mon Sep 17 00:00:00 2001 From: suii2210 Date: Sat, 17 Jan 2026 13:47:12 +0000 Subject: [PATCH 1/7] Fix PowerBIHook to expand relative REST API URLs --- .../microsoft/azure/hooks/powerbi.py | 558 +++++++++--------- .../microsoft/azure/hooks/test_powerbi.py | 469 ++++++++------- 2 files changed, 540 insertions(+), 487 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 3c2d006795ed3..72775f622c091 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -1,272 +1,286 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from enum import Enum -from typing import TYPE_CHECKING, Any - -from airflow.providers.common.compat.sdk import AirflowException -from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook - -if TYPE_CHECKING: - from msgraph_core import APIVersion - - -class PowerBIDatasetRefreshFields(Enum): - """Power BI refresh dataset details.""" - - REQUEST_ID = "request_id" - STATUS = "status" - ERROR = "error" - - -class PowerBIDatasetRefreshStatus: - """Power BI refresh dataset statuses.""" - - IN_PROGRESS = "In Progress" - FAILED = "Failed" - COMPLETED = "Completed" - DISABLED = "Disabled" - - TERMINAL_STATUSES = {FAILED, COMPLETED} - FAILURE_STATUSES = {FAILED, DISABLED} - - -class PowerBIDatasetRefreshException(AirflowException): - """An exception that indicates a dataset refresh failed to complete.""" - - -class PowerBIWorkspaceListException(AirflowException): - """An exception that indicates a failure in getting the list of groups (workspaces).""" - - -class PowerBIDatasetListException(AirflowException): - """An exception that indicates a failure in getting the list of datasets.""" - - -class PowerBIHook(KiotaRequestAdapterHook): - """ - A async hook to interact with Power BI. - - :param conn_id: The connection Id to connect to PowerBI. - :param timeout: The HTTP timeout being used by the `KiotaRequestAdapter` (default is None). - When no timeout is specified or set to None then there is no HTTP timeout on each request. - :param proxies: A dict defining the HTTP proxies to be used (default is None). - :param api_version: The API version of the Microsoft Graph API to be used (default is v1). - You can pass an enum named APIVersion which has 2 possible members v1 and beta, - or you can pass a string as `v1.0` or `beta`. - """ - - conn_type: str = "powerbi" - conn_name_attr: str = "conn_id" - default_conn_name: str = "powerbi_default" - hook_name: str = "Power BI" - - def __init__( - self, - conn_id: str = default_conn_name, - proxies: dict | None = None, - timeout: float = 60 * 60 * 24 * 7, - api_version: APIVersion | str | None = None, - ): - super().__init__( - conn_id=conn_id, - proxies=proxies, - timeout=timeout, - host="https://api.powerbi.com", - scopes=["https://analysis.windows.net/powerbi/api/.default"], - api_version=api_version, - ) - - @classmethod - def get_connection_form_widgets(cls) -> dict[str, Any]: - """Return connection widgets to add to connection form.""" - from flask_appbuilder.fieldwidgets import BS3TextFieldWidget - from flask_babel import lazy_gettext - from wtforms import StringField - - return { - "tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()), - } - - @classmethod - def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Return custom field behaviour.""" - return { - "hidden_fields": ["schema", "port", "host", "extra"], - "relabeling": { - "login": "Client ID", - "password": "Client Secret", - }, - } - - async def get_refresh_history( - self, - dataset_id: str, - group_id: str, - ) -> list[dict[str, str]]: - """ - Retrieve the refresh history of the specified dataset from the given group ID. - - :param dataset_id: The dataset ID. - :param group_id: The workspace ID. - - :return: Dictionary containing all the refresh histories of the dataset. - """ - try: - response = await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", - path_parameters={ - "group_id": group_id, - "dataset_id": dataset_id, - }, - ) - - refresh_histories = response.get("value") - return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories] - - except AirflowException: - raise PowerBIDatasetRefreshException("Failed to retrieve refresh history") - - @classmethod - def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]: - """ - Convert raw refresh details into a dictionary containing required fields. - - :param refresh_details: Raw object of refresh details. - """ - return { - PowerBIDatasetRefreshFields.REQUEST_ID.value: str(refresh_details.get("requestId")), - PowerBIDatasetRefreshFields.STATUS.value: ( - "In Progress" - if str(refresh_details.get("status")) == "Unknown" - else str(refresh_details.get("status")) - ), - PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")), - } - - async def get_refresh_details_by_refresh_id( - self, dataset_id: str, group_id: str, refresh_id: str - ) -> dict[str, str]: - """ - Get the refresh details of the given request Id. - - :param refresh_id: Request Id of the Dataset refresh. - """ - refresh_histories = await self.get_refresh_history(dataset_id=dataset_id, group_id=group_id) - - if len(refresh_histories) == 0: - raise PowerBIDatasetRefreshException( - f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}" - ) - - refresh_ids = [ - refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value) - for refresh_history in refresh_histories - ] - - if refresh_id not in refresh_ids: - raise PowerBIDatasetRefreshException( - f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}" - ) - - refresh_details = refresh_histories[refresh_ids.index(refresh_id)] - - return refresh_details - - async def trigger_dataset_refresh( - self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None - ) -> str: - """ - Triggers a refresh for the specified dataset from the given group id. - - :param dataset_id: The dataset id. - :param group_id: The workspace id. - :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. - - :return: Request id of the dataset refresh request. - """ - try: - response = await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", - response_type=None, - method="POST", - path_parameters={ - "group_id": group_id, - "dataset_id": dataset_id, - }, - data=request_body, - ) - - request_id = response.get("requestid") - return request_id - except AirflowException: - raise PowerBIDatasetRefreshException("Failed to trigger dataset refresh.") - - async def get_workspace_list(self) -> list[str]: - """ - Triggers a request to get all available workspaces for the service principal. - - :return: List of workspace IDs. - """ - try: - response = await self.run(url="myorg/groups", method="GET") - - list_of_workspaces = response.get("value", []) - - return [ws["id"] for ws in list_of_workspaces if "id" in ws] - - except AirflowException: - raise PowerBIWorkspaceListException("Failed to get workspace ID list.") - - async def get_dataset_list(self, *, group_id: str) -> list[str]: - """ - Triggers a request to get all datasets within a group (workspace). - - :param group_id: Workspace ID. - - :return: List of dataset IDs. - """ - try: - response = await self.run(url=f"myorg/groups/{group_id}/datasets", method="GET") - - list_of_datasets = response.get("value", []) - - return [ds["id"] for ds in list_of_datasets if "id" in ds] - - except AirflowException: - raise PowerBIDatasetListException("Failed to get dataset ID list.") - - async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None: - """ - Cancel the dataset refresh. - - :param dataset_id: The dataset Id. - :param group_id: The workspace Id. - :param dataset_refresh_id: The dataset refresh Id. - """ - await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", - response_type=None, - path_parameters={ - "group_id": group_id, - "dataset_id": dataset_id, - "dataset_refresh_id": dataset_refresh_id, - }, - method="DELETE", - ) +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Any +from urllib.parse import urljoin +from airflow.providers.common.compat.sdk import AirflowException +from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook +\ + + +if TYPE_CHECKING: + from msgraph_core import APIVersion + + +class PowerBIDatasetRefreshFields(Enum): + """Power BI refresh dataset details.""" + + REQUEST_ID = "request_id" + STATUS = "status" + ERROR = "error" + + +class PowerBIDatasetRefreshStatus: + """Power BI refresh dataset statuses.""" + + IN_PROGRESS = "In Progress" + FAILED = "Failed" + COMPLETED = "Completed" + DISABLED = "Disabled" + + TERMINAL_STATUSES = {FAILED, COMPLETED} + FAILURE_STATUSES = {FAILED, DISABLED} + + +class PowerBIDatasetRefreshException(AirflowException): + """An exception that indicates a dataset refresh failed to complete.""" + + +class PowerBIWorkspaceListException(AirflowException): + """An exception that indicates a failure in getting the list of groups (workspaces).""" + + +class PowerBIDatasetListException(AirflowException): + """An exception that indicates a failure in getting the list of datasets.""" + + +class PowerBIHook(KiotaRequestAdapterHook): + """ + A async hook to interact with Power BI. + + :param conn_id: The connection Id to connect to PowerBI. + :param timeout: The HTTP timeout being used by the `KiotaRequestAdapter` (default is None). + When no timeout is specified or set to None then there is no HTTP timeout on each request. + :param proxies: A dict defining the HTTP proxies to be used (default is None). + :param api_version: The API version of the Microsoft Graph API to be used (default is v1). + You can pass an enum named APIVersion which has 2 possible members v1 and beta, + or you can pass a string as `v1.0` or `beta`. + """ + + conn_type: str = "powerbi" + conn_name_attr: str = "conn_id" + default_conn_name: str = "powerbi_default" + hook_name: str = "Power BI" + POWERBI_API_BASE_URL = "https://api.powerbi.com/v1.0/" + + def __init__( + self, + conn_id: str = default_conn_name, + proxies: dict | None = None, + timeout: float = 60 * 60 * 24 * 7, + api_version: APIVersion | str | None = None, + ): + super().__init__( + conn_id=conn_id, + proxies=proxies, + timeout=timeout, + host="https://api.powerbi.com", + scopes=["https://analysis.windows.net/powerbi/api/.default"], + api_version=api_version, + ) + + async def run(self, url: str, **kwargs): + """ + Execute a Power BI REST API request. + + Ensures relative Power BI endpoints are expanded to full URLs. + """ + if not url.startswith(("http://", "https://")): + url = urljoin(self.POWERBI_API_BASE_URL, url) + + return await super().run(url=url, **kwargs) + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + + return { + "tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return { + "hidden_fields": ["schema", "port", "host", "extra"], + "relabeling": { + "login": "Client ID", + "password": "Client Secret", + }, + } + + async def get_refresh_history( + self, + dataset_id: str, + group_id: str, + ) -> list[dict[str, str]]: + """ + Retrieve the refresh history of the specified dataset from the given group ID. + + :param dataset_id: The dataset ID. + :param group_id: The workspace ID. + + :return: Dictionary containing all the refresh histories of the dataset. + """ + try: + response = await self.run( + url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", + path_parameters={ + "group_id": group_id, + "dataset_id": dataset_id, + }, + ) + + refresh_histories = response.get("value") + return [self.raw_to_refresh_details(refresh_history) for refresh_history in refresh_histories] + + except AirflowException: + raise PowerBIDatasetRefreshException("Failed to retrieve refresh history") + + @classmethod + def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]: + """ + Convert raw refresh details into a dictionary containing required fields. + + :param refresh_details: Raw object of refresh details. + """ + return { + PowerBIDatasetRefreshFields.REQUEST_ID.value: str(refresh_details.get("requestId")), + PowerBIDatasetRefreshFields.STATUS.value: ( + "In Progress" + if str(refresh_details.get("status")) == "Unknown" + else str(refresh_details.get("status")) + ), + PowerBIDatasetRefreshFields.ERROR.value: str(refresh_details.get("serviceExceptionJson")), + } + + async def get_refresh_details_by_refresh_id( + self, dataset_id: str, group_id: str, refresh_id: str + ) -> dict[str, str]: + """ + Get the refresh details of the given request Id. + + :param refresh_id: Request Id of the Dataset refresh. + """ + refresh_histories = await self.get_refresh_history(dataset_id=dataset_id, group_id=group_id) + + if len(refresh_histories) == 0: + raise PowerBIDatasetRefreshException( + f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}" + ) + + refresh_ids = [ + refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value) + for refresh_history in refresh_histories + ] + + if refresh_id not in refresh_ids: + raise PowerBIDatasetRefreshException( + f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}" + ) + + refresh_details = refresh_histories[refresh_ids.index(refresh_id)] + + return refresh_details + + async def trigger_dataset_refresh( + self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None + ) -> str: + """ + Triggers a refresh for the specified dataset from the given group id. + + :param dataset_id: The dataset id. + :param group_id: The workspace id. + :param request_body: Additional arguments to pass to the request body, as described in https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body. + + :return: Request id of the dataset refresh request. + """ + try: + response = await self.run( + url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", + response_type=None, + method="POST", + path_parameters={ + "group_id": group_id, + "dataset_id": dataset_id, + }, + data=request_body, + ) + + request_id = response.get("requestid") + return request_id + except AirflowException: + raise PowerBIDatasetRefreshException("Failed to trigger dataset refresh.") + + async def get_workspace_list(self) -> list[str]: + """ + Triggers a request to get all available workspaces for the service principal. + + :return: List of workspace IDs. + """ + try: + response = await self.run(url="myorg/groups", method="GET") + + list_of_workspaces = response.get("value", []) + + return [ws["id"] for ws in list_of_workspaces if "id" in ws] + + except AirflowException: + raise PowerBIWorkspaceListException("Failed to get workspace ID list.") + + async def get_dataset_list(self, *, group_id: str) -> list[str]: + """ + Triggers a request to get all datasets within a group (workspace). + + :param group_id: Workspace ID. + + :return: List of dataset IDs. + """ + try: + response = await self.run(url=f"myorg/groups/{group_id}/datasets", method="GET") + + list_of_datasets = response.get("value", []) + + return [ds["id"] for ds in list_of_datasets if "id" in ds] + + except AirflowException: + raise PowerBIDatasetListException("Failed to get dataset ID list.") + + async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> None: + """ + Cancel the dataset refresh. + + :param dataset_id: The dataset Id. + :param group_id: The workspace Id. + :param dataset_refresh_id: The dataset refresh Id. + """ + await self.run( + url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", + response_type=None, + path_parameters={ + "group_id": group_id, + "dataset_id": dataset_id, + "dataset_refresh_id": dataset_refresh_id, + }, + method="DELETE", + ) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_powerbi.py b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_powerbi.py index b0569a84b3509..58985d2173fe4 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_powerbi.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_powerbi.py @@ -1,215 +1,254 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from unittest import mock - -import pytest - -from airflow.providers.common.compat.sdk import AirflowException -from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook -from airflow.providers.microsoft.azure.hooks.powerbi import ( - PowerBIDatasetRefreshException, - PowerBIDatasetRefreshFields, - PowerBIDatasetRefreshStatus, - PowerBIHook, -) - -FORMATTED_RESPONSE = [ - # Completed refresh - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", - PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, - PowerBIDatasetRefreshFields.ERROR.value: "None", - }, - # In-progress refresh - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "6b6536c1-cfcb-4148-9c21-402c3f5241e4", - PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS, - PowerBIDatasetRefreshFields.ERROR.value: "None", - }, - # Failed refresh - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "11bf290a-346b-48b7-8973-c5df149337ff", - PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.FAILED, - PowerBIDatasetRefreshFields.ERROR.value: '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}', - }, -] - -DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id" -GROUP_ID = "group_id" -DATASET_ID = "dataset_id" - - -class TestPowerBIHook: - @pytest.mark.asyncio - async def test_get_refresh_history(self, powerbi_hook): - response_data = {"value": [{"requestId": "1234", "status": "Completed", "serviceExceptionJson": ""}]} - - with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: - mock_run.return_value = response_data - result = await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID) - - expected = [{"request_id": "1234", "status": "Completed", "error": ""}] - assert result == expected - - @pytest.mark.asyncio - async def test_get_refresh_history_airflow_exception(self, powerbi_hook): - """Test handling of AirflowException in get_refresh_history.""" - - with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: - mock_run.side_effect = AirflowException("Test exception") - - with pytest.raises(PowerBIDatasetRefreshException, match="Failed to retrieve refresh history"): - await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID) - - @pytest.mark.parametrize( - ("input_data", "expected_output"), - [ - ( - {"requestId": "1234", "status": "Completed", "serviceExceptionJson": ""}, - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "1234", - PowerBIDatasetRefreshFields.STATUS.value: "Completed", - PowerBIDatasetRefreshFields.ERROR.value: "", - }, - ), - ( - {"requestId": "5678", "status": "Unknown", "serviceExceptionJson": "Some error"}, - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "5678", - PowerBIDatasetRefreshFields.STATUS.value: "In Progress", - PowerBIDatasetRefreshFields.ERROR.value: "Some error", - }, - ), - ( - {"requestId": None, "status": None, "serviceExceptionJson": None}, - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "None", - PowerBIDatasetRefreshFields.STATUS.value: "None", - PowerBIDatasetRefreshFields.ERROR.value: "None", - }, - ), - ( - {}, # Empty input dictionary - { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "None", - PowerBIDatasetRefreshFields.STATUS.value: "None", - PowerBIDatasetRefreshFields.ERROR.value: "None", - }, - ), - ], - ) - def test_raw_to_refresh_details(self, input_data, expected_output): - """Test raw_to_refresh_details method.""" - result = PowerBIHook.raw_to_refresh_details(input_data) - assert result == expected_output - - @pytest.mark.asyncio - async def test_get_refresh_details_by_refresh_id(self, powerbi_hook): - # Mock the get_refresh_history method to return a list of refresh histories - refresh_histories = FORMATTED_RESPONSE - powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=refresh_histories) - - # Call the function with a valid request ID - refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" - result = await powerbi_hook.get_refresh_details_by_refresh_id( - dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id - ) - - # Assert that the correct refresh details are returned - assert result == { - PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", - PowerBIDatasetRefreshFields.STATUS.value: "Completed", - PowerBIDatasetRefreshFields.ERROR.value: "None", - } - - # Call the function with an invalid request ID - invalid_request_id = "invalid_request_id" - with pytest.raises(PowerBIDatasetRefreshException): - await powerbi_hook.get_refresh_details_by_refresh_id( - dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=invalid_request_id - ) - - @pytest.mark.asyncio - async def test_get_refresh_details_by_refresh_id_empty_history(self, powerbi_hook): - """Test exception when refresh history is empty.""" - # Mock the get_refresh_history method to return an empty list - powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=[]) - - # Call the function with a request ID - refresh_id = "any_request_id" - with pytest.raises( - PowerBIDatasetRefreshException, - match=f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}", - ): - await powerbi_hook.get_refresh_details_by_refresh_id( - dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id - ) - - @pytest.mark.asyncio - async def test_get_refresh_details_by_refresh_id_not_found(self, powerbi_hook): - """Test exception when the refresh ID is not found in the refresh history.""" - # Mock the get_refresh_history method to return a list of refresh histories without the specified ID - powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=FORMATTED_RESPONSE) - - # Call the function with an invalid request ID - invalid_request_id = "invalid_request_id" - with pytest.raises( - PowerBIDatasetRefreshException, - match=f"Unable to fetch the details of dataset refresh with Request Id: {invalid_request_id}", - ): - await powerbi_hook.get_refresh_details_by_refresh_id( - dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=invalid_request_id - ) - - @pytest.mark.asyncio - async def test_trigger_dataset_refresh_success(self, powerbi_hook): - response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"} - - with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: - mock_run.return_value = response_data - result = await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID) - - assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c" - - @pytest.mark.asyncio - async def test_trigger_dataset_refresh_failure(self, powerbi_hook): - """Test failure to trigger dataset refresh due to AirflowException.""" - with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: - mock_run.side_effect = AirflowException("Test exception") - - with pytest.raises(PowerBIDatasetRefreshException, match="Failed to trigger dataset refresh."): - await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID) - - @pytest.mark.asyncio - async def test_cancel_dataset_refresh(self, powerbi_hook): - dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" - - with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: - await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID, dataset_refresh_id) - - mock_run.assert_called_once_with( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", - response_type=None, - path_parameters={ - "group_id": GROUP_ID, - "dataset_id": DATASET_ID, - "dataset_refresh_id": dataset_refresh_id, - }, - method="DELETE", - ) +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.common.compat.sdk import AirflowException +from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook +from airflow.providers.microsoft.azure.hooks.powerbi import ( + PowerBIDatasetRefreshException, + PowerBIDatasetRefreshFields, + PowerBIDatasetRefreshStatus, + PowerBIHook, +) + +FORMATTED_RESPONSE = [ + # Completed refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.COMPLETED, + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + # In-progress refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "6b6536c1-cfcb-4148-9c21-402c3f5241e4", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.IN_PROGRESS, + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + # Failed refresh + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "11bf290a-346b-48b7-8973-c5df149337ff", + PowerBIDatasetRefreshFields.STATUS.value: PowerBIDatasetRefreshStatus.FAILED, + PowerBIDatasetRefreshFields.ERROR.value: '{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}', + }, +] + +DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id" +GROUP_ID = "group_id" +DATASET_ID = "dataset_id" + + +class TestPowerBIHook: + @pytest.mark.asyncio + async def test_get_refresh_history(self, powerbi_hook): + response_data = {"value": [{"requestId": "1234", "status": "Completed", "serviceExceptionJson": ""}]} + + with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: + mock_run.return_value = response_data + result = await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID) + + expected = [{"request_id": "1234", "status": "Completed", "error": ""}] + assert result == expected + + @pytest.mark.asyncio + async def test_get_refresh_history_airflow_exception(self, powerbi_hook): + """Test handling of AirflowException in get_refresh_history.""" + + with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: + mock_run.side_effect = AirflowException("Test exception") + + with pytest.raises(PowerBIDatasetRefreshException, match="Failed to retrieve refresh history"): + await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID) + + @pytest.mark.parametrize( + ("input_data", "expected_output"), + [ + ( + {"requestId": "1234", "status": "Completed", "serviceExceptionJson": ""}, + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "1234", + PowerBIDatasetRefreshFields.STATUS.value: "Completed", + PowerBIDatasetRefreshFields.ERROR.value: "", + }, + ), + ( + {"requestId": "5678", "status": "Unknown", "serviceExceptionJson": "Some error"}, + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5678", + PowerBIDatasetRefreshFields.STATUS.value: "In Progress", + PowerBIDatasetRefreshFields.ERROR.value: "Some error", + }, + ), + ( + {"requestId": None, "status": None, "serviceExceptionJson": None}, + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "None", + PowerBIDatasetRefreshFields.STATUS.value: "None", + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + ), + ( + {}, # Empty input dictionary + { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "None", + PowerBIDatasetRefreshFields.STATUS.value: "None", + PowerBIDatasetRefreshFields.ERROR.value: "None", + }, + ), + ], + ) + def test_raw_to_refresh_details(self, input_data, expected_output): + """Test raw_to_refresh_details method.""" + result = PowerBIHook.raw_to_refresh_details(input_data) + assert result == expected_output + + @pytest.mark.asyncio + async def test_get_refresh_details_by_refresh_id(self, powerbi_hook): + # Mock the get_refresh_history method to return a list of refresh histories + refresh_histories = FORMATTED_RESPONSE + powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=refresh_histories) + + # Call the function with a valid request ID + refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + result = await powerbi_hook.get_refresh_details_by_refresh_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id + ) + + # Assert that the correct refresh details are returned + assert result == { + PowerBIDatasetRefreshFields.REQUEST_ID.value: "5e2d9921-e91b-491f-b7e1-e7d8db49194c", + PowerBIDatasetRefreshFields.STATUS.value: "Completed", + PowerBIDatasetRefreshFields.ERROR.value: "None", + } + + # Call the function with an invalid request ID + invalid_request_id = "invalid_request_id" + with pytest.raises(PowerBIDatasetRefreshException): + await powerbi_hook.get_refresh_details_by_refresh_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=invalid_request_id + ) + + @pytest.mark.asyncio + async def test_get_refresh_details_by_refresh_id_empty_history(self, powerbi_hook): + """Test exception when refresh history is empty.""" + # Mock the get_refresh_history method to return an empty list + powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=[]) + + # Call the function with a request ID + refresh_id = "any_request_id" + with pytest.raises( + PowerBIDatasetRefreshException, + match=f"Unable to fetch the details of dataset refresh with Request Id: {refresh_id}", + ): + await powerbi_hook.get_refresh_details_by_refresh_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id + ) + + @pytest.mark.asyncio + async def test_get_refresh_details_by_refresh_id_not_found(self, powerbi_hook): + """Test exception when the refresh ID is not found in the refresh history.""" + # Mock the get_refresh_history method to return a list of refresh histories without the specified ID + powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=FORMATTED_RESPONSE) + + # Call the function with an invalid request ID + invalid_request_id = "invalid_request_id" + with pytest.raises( + PowerBIDatasetRefreshException, + match=f"Unable to fetch the details of dataset refresh with Request Id: {invalid_request_id}", + ): + await powerbi_hook.get_refresh_details_by_refresh_id( + dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=invalid_request_id + ) + + @pytest.mark.asyncio + async def test_trigger_dataset_refresh_success(self, powerbi_hook): + response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"} + + with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: + mock_run.return_value = response_data + result = await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID) + + assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + + @pytest.mark.asyncio + async def test_trigger_dataset_refresh_failure(self, powerbi_hook): + """Test failure to trigger dataset refresh due to AirflowException.""" + with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: + mock_run.side_effect = AirflowException("Test exception") + + with pytest.raises(PowerBIDatasetRefreshException, match="Failed to trigger dataset refresh."): + await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID) + + @pytest.mark.asyncio + async def test_cancel_dataset_refresh(self, powerbi_hook): + dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c" + + with mock.patch.object(KiotaRequestAdapterHook, "run", new_callable=mock.AsyncMock) as mock_run: + await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID, dataset_refresh_id) + + mock_run.assert_called_once_with( + url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", + response_type=None, + path_parameters={ + "group_id": GROUP_ID, + "dataset_id": DATASET_ID, + "dataset_refresh_id": dataset_refresh_id, + }, + method="DELETE", + ) + + @pytest.mark.asyncio + async def test_run_expands_relative_powerbi_url(self, powerbi_hook): + """Relative Power BI URLs should be expanded to full REST API URLs.""" + + with mock.patch.object( + KiotaRequestAdapterHook, + "run", + new_callable=mock.AsyncMock, + ) as mock_run: + mock_run.return_value = {} + + await powerbi_hook.run(url="myorg/groups") + + mock_run.assert_awaited_once() + called_url = mock_run.call_args.kwargs["url"] + + assert called_url == "https://api.powerbi.com/v1.0/myorg/groups" + + @pytest.mark.asyncio + async def test_run_does_not_modify_absolute_url(self, powerbi_hook): + """Absolute URLs should not be modified by PowerBIHook.run.""" + + absolute_url = "https://api.powerbi.com/v1.0/myorg/groups" + + with mock.patch.object( + KiotaRequestAdapterHook, + "run", + new_callable=mock.AsyncMock, + ) as mock_run: + mock_run.return_value = {} + + await powerbi_hook.run(url=absolute_url) + + mock_run.assert_awaited_once() + called_url = mock_run.call_args.kwargs["url"] + + assert called_url == absolute_url + From c4b3d6bfeb1590c81a4aeec83d3ee002754b3b33 Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 19:44:07 +0530 Subject: [PATCH 2/7] Restore licensing information and formatting --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 72775f622c091..3c4ebbf7a41cb 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -22,7 +22,7 @@ from urllib.parse import urljoin from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook -\ + if TYPE_CHECKING: @@ -284,3 +284,4 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r }, method="DELETE", ) + From 056f33c299aa31175197137c1c52d90b459c1add Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 19:45:36 +0530 Subject: [PATCH 3/7] Restore license and docstring comments in powerbi.py --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 3c4ebbf7a41cb..dd032afe174e8 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -23,8 +23,6 @@ from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook - - if TYPE_CHECKING: from msgraph_core import APIVersion @@ -285,3 +283,4 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r method="DELETE", ) + From 559e004a3b05c1530fee06a109ab4f2b17e399d3 Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 19:46:53 +0530 Subject: [PATCH 4/7] Restore license and docstring comments in powerbi.py --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index dd032afe174e8..591e826d2ffca 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -25,8 +25,7 @@ if TYPE_CHECKING: from msgraph_core import APIVersion - - + class PowerBIDatasetRefreshFields(Enum): """Power BI refresh dataset details.""" @@ -284,3 +283,4 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r ) + From 220f7cfa2ce5c95d30140baad8e254379e811494 Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 20:06:49 +0530 Subject: [PATCH 5/7] Refactor PowerBI hook with enhanced functionality --- .../microsoft/azure/hooks/powerbi.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 591e826d2ffca..117b5524e0348 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -22,6 +22,8 @@ from urllib.parse import urljoin from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook +from io import BytesIO +from typing import Any if TYPE_CHECKING: from msgraph_core import APIVersion @@ -91,19 +93,36 @@ def __init__( host="https://api.powerbi.com", scopes=["https://analysis.windows.net/powerbi/api/.default"], api_version=api_version, - ) + ) - async def run(self, url: str, **kwargs): + async def run( + self, + url: str = "", + response_type: str | None = None, + path_parameters: dict[str, Any] | None = None, + method: str = "GET", + query_parameters: dict[str, Any] | None = None, + headers: dict[str, str] | None = None, + data: dict[str, Any] | str | BytesIO | None = None, + ): """ Execute a Power BI REST API request. - + Ensures relative Power BI endpoints are expanded to full URLs. """ if not url.startswith(("http://", "https://")): url = urljoin(self.POWERBI_API_BASE_URL, url) - - return await super().run(url=url, **kwargs) - + + return await super().run( + url=url, + response_type=response_type, + path_parameters=path_parameters, + method=method, + query_parameters=query_parameters, + headers=headers, + data=data, + ) + @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: """Return connection widgets to add to connection form.""" @@ -284,3 +303,4 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r + From 17210181bfbf2cf83d45e23009fe662f552ebb5f Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 21:42:28 +0530 Subject: [PATCH 6/7] Remove license comments and clean up code --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 117b5524e0348..dd8b8fafd4163 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -35,7 +35,6 @@ class PowerBIDatasetRefreshFields(Enum): STATUS = "status" ERROR = "error" - class PowerBIDatasetRefreshStatus: """Power BI refresh dataset statuses.""" @@ -47,19 +46,15 @@ class PowerBIDatasetRefreshStatus: TERMINAL_STATUSES = {FAILED, COMPLETED} FAILURE_STATUSES = {FAILED, DISABLED} - class PowerBIDatasetRefreshException(AirflowException): """An exception that indicates a dataset refresh failed to complete.""" - class PowerBIWorkspaceListException(AirflowException): """An exception that indicates a failure in getting the list of groups (workspaces).""" - class PowerBIDatasetListException(AirflowException): """An exception that indicates a failure in getting the list of datasets.""" - class PowerBIHook(KiotaRequestAdapterHook): """ A async hook to interact with Power BI. @@ -304,3 +299,4 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r + From ed92342b8fc7b00824c9952b8879cd9c38372012 Mon Sep 17 00:00:00 2001 From: Shruti Singh Date: Sat, 17 Jan 2026 22:11:59 +0530 Subject: [PATCH 7/7] Remove license comments and clean up code --- .../src/airflow/providers/microsoft/azure/hooks/powerbi.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index dd8b8fafd4163..620307cd013d2 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -295,8 +295,3 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r }, method="DELETE", ) - - - - -