Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@
log = logging.getLogger(__name__)

RESOURCE_ID_ATTRIBUTE_NAME = "resource_id"
KEYCLOAK_RESOURCE_NOT_FOUND_ERROR = "resource not found:"


def _is_missing_keycloak_resource_response(status_code: int, text: Any) -> bool:
return status_code == 500 and isinstance(text, str) and KEYCLOAK_RESOURCE_NOT_FOUND_ERROR in text.lower()
Comment thread
vincbeck marked this conversation as resolved.


TEAM_SCOPED_RESOURCES = frozenset(
{
KeycloakResource.CONNECTION,
Expand Down Expand Up @@ -434,6 +441,9 @@ def _is_authorized(
raise AirflowException(
f"Request not recognized by Keycloak. {error.get('error')}. {error.get('error_description')}"
)
if _is_missing_keycloak_resource_response(resp.status_code, resp.text):
log.warning("Keycloak authorization resource is missing; denying access. Response: %s", resp.text)
return False
raise AirflowException(f"Unexpected error: {resp.status_code} - {resp.text}")

def filter_authorized_dag_ids(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,19 @@ def test_is_authorized_failure(self, function, auth_manager, user):

assert "Unexpected error" in str(e.value)

def test_is_authorized_missing_keycloak_resource(self, auth_manager, user, caplog):
resp = Mock()
resp.status_code = 500
resp.text = "resource not found: Dag:team-a"
auth_manager.http_session.post = Mock(return_value=resp)
caplog.set_level("WARNING", logger="airflow.providers.keycloak.auth_manager.keycloak_auth_manager")

result = auth_manager.is_authorized_dag(method="GET", details=DagDetails(id="dag_0"), user=user)

assert result is False
assert "Keycloak authorization resource is missing; denying access" in caplog.text
assert "resource not found: Dag:team-a" in caplog.text

@pytest.mark.parametrize(
"function",
[
Expand Down Expand Up @@ -652,6 +665,30 @@ def test_filter_authorized_dag_ids_team_match(self, mock_is_authorized, auth_man
mock_is_authorized.assert_called_once()
assert result == {"dag-a"}

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="team_name not supported before Airflow 3.2.0")
def test_filter_authorized_dag_ids_missing_keycloak_resource(self, auth_manager_multi_team, user, caplog):
def post_response(*_, data, **__):
claims = json.loads(base64.b64decode(data["claim_token"]).decode())
dag_id = claims[RESOURCE_ID_ATTRIBUTE_NAME][0]
response = Mock()
if dag_id == "dag-missing":
response.status_code = 500
response.text = "resource not found: Dag:team-a"
else:
response.status_code = 200
return response

auth_manager_multi_team.http_session.post = Mock(side_effect=post_response)
caplog.set_level("WARNING", logger="airflow.providers.keycloak.auth_manager.keycloak_auth_manager")

result = auth_manager_multi_team.filter_authorized_dag_ids(
dag_ids={"dag-allowed", "dag-missing"}, user=user, team_name="team-a"
)

assert result == {"dag-allowed"}
assert auth_manager_multi_team.http_session.post.call_count == 2
assert "Keycloak authorization resource is missing; denying access" in caplog.text

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="team_name not supported before Airflow 3.2.0")
@patch.object(KeycloakAuthManager, "is_authorized_pool", return_value=False)
def test_filter_authorized_pools_no_team_returns_empty(
Expand Down
Loading