Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions airflow-core/src/airflow/api_fastapi/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,26 @@ def exception_handler(self, request: Request, exc: IntegrityError):
for tb in traceback.format_tb(exc.__traceback__):
stacktrace += tb

log_message = f"Error with id {exception_id}\n{stacktrace}"
log_message = f"Error with id {exception_id}, statement: {exc.statement}\n{stacktrace}"
log.error(log_message)
if conf.get("api", "expose_stacktrace") == "True":
message = log_message
statement = str(exc.statement)
orig_error = str(exc.orig)
else:
message = (
"Serious error when handling your request. Check logs for more details - "
f"you will find it in api server when you look for ID {exception_id}"
)
statement = "hidden"
orig_error = "hidden"

raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": str(exc.statement),
"orig_error": str(exc.orig),
"statement": statement,
"orig_error": orig_error,
"message": message,
},
)
Expand Down
129 changes: 111 additions & 18 deletions airflow-core/tests/unit/api_fastapi/common/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,67 @@ def teardown_method(self) -> None:
clear_db_runs()
clear_db_dags()

@pytest.mark.parametrize(
("table", "expected_exception"),
[
[
"Pool",
HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": "hidden",
"orig_error": "hidden",
"message": MESSAGE,
},
),
],
[
"Variable",
HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": "hidden",
"orig_error": "hidden",
"message": MESSAGE,
},
),
],
],
)
@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@provide_session
def test_handle_single_column_unique_constraint_error_without_stacktrace(
self,
mock_get_random_string,
session,
table,
expected_exception,
) -> None:
# Take Pool and Variable tables as test cases
# Note: SQLA2 uses a more optimized bulk insert strategy when multiple objects are added to the
# session. Instead of individual INSERT statements, a single INSERT with the SELECT FROM VALUES
# pattern is used.
if table == "Pool":
session.add(Pool(pool=TEST_POOL, slots=1, description="test pool", include_deferred=False))
session.flush() # Avoid SQLA2.0 bulk insert optimization
session.add(Pool(pool=TEST_POOL, slots=1, description="test pool", include_deferred=False))
elif table == "Variable":
session.add(Variable(key=TEST_VARIABLE_KEY, val="test_val"))
session.flush()
session.add(Variable(key=TEST_VARIABLE_KEY, val="test_val"))

with pytest.raises(IntegrityError) as exeinfo_integrity_error:
session.commit()

with pytest.raises(HTTPException) as exeinfo_response_error:
self.unique_constraint_error_handler.exception_handler(None, exeinfo_integrity_error.value) # type: ignore

assert exeinfo_response_error.value.status_code == expected_exception.status_code
assert exeinfo_response_error.value.detail == expected_exception.detail

@pytest.mark.parametrize(
"table, expected_exception",
generate_test_cases_parametrize(
Expand All @@ -135,7 +196,6 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred, team_id) VALUES (?, ?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed: slot_pool.pool",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -144,16 +204,14 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred, team_id) VALUES (%s, %s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_pool' for key 'slot_pool.slot_pool_pool_uq'\")",
"message": MESSAGE,
},
),
HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": f"INSERT INTO slot_pool (pool, slots, description, include_deferred, team_id) VALUES (%(pool)s, %(slots)s, %(description)s, %(include_deferred)s, %(team_id)s{uuid_suffix}) RETURNING slot_pool.id",
"statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred, team_id) VALUES (%(pool)s, %(slots)s, %(description)s, %(include_deferred)s, %(team_id)s) RETURNING slot_pool.id",
"orig_error": 'duplicate key value violates unique constraint "slot_pool_pool_uq"\nDETAIL: Key (pool)=(test_pool) already exists.\n',
"message": MESSAGE,
},
),
],
Expand All @@ -164,7 +222,6 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": 'INSERT INTO variable ("key", val, description, is_encrypted, team_id) VALUES (?, ?, ?, ?, ?)',
"orig_error": "UNIQUE constraint failed: variable.key",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -173,26 +230,24 @@ def teardown_method(self) -> None:
"reason": "Unique constraint violation",
"statement": "INSERT INTO variable (`key`, val, description, is_encrypted, team_id) VALUES (%s, %s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_key' for key 'variable.variable_key_uq'\")",
"message": MESSAGE,
},
),
HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": f"INSERT INTO variable (key, val, description, is_encrypted, team_id) VALUES (%(key)s, %(val)s, %(description)s, %(is_encrypted)s, %(team_id)s{uuid_suffix}) RETURNING variable.id",
"statement": "INSERT INTO variable (key, val, description, is_encrypted, team_id) VALUES (%(key)s, %(val)s, %(description)s, %(is_encrypted)s, %(team_id)s) RETURNING variable.id",
"orig_error": 'duplicate key value violates unique constraint "variable_key_uq"\nDETAIL: Key (key)=(test_key) already exists.\n',
"message": MESSAGE,
},
),
],
],
),
)
@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@conf_vars({("api", "expose_stacktrace"): "True"})
@provide_session
def test_handle_single_column_unique_constraint_error(
def test_handle_single_column_unique_constraint_error_with_stacktrace(
self,
mock_get_random_string,
session,
Expand All @@ -218,11 +273,50 @@ def test_handle_single_column_unique_constraint_error(
with pytest.raises(HTTPException) as exeinfo_response_error:
self.unique_constraint_error_handler.exception_handler(None, exeinfo_integrity_error.value) # type: ignore

exeinfo_response_error.value.detail.pop("message", None) # type: ignore[attr-defined]
assert exeinfo_response_error.value.status_code == expected_exception.status_code
assert exeinfo_response_error.value.detail == expected_exception.detail

@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@provide_session
def test_handle_multiple_columns_unique_constraint_error_without_stacktrace(
self,
mock_get_random_string,
session,
) -> None:
expected_exception = HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"reason": "Unique constraint violation",
"statement": "hidden",
"orig_error": "hidden",
"message": MESSAGE,
},
)
session.add(
DagRun(dag_id="test_dag_id", run_id="test_run_id", run_type="manual", state=DagRunState.RUNNING)
)
session.add(
DagRun(dag_id="test_dag_id", run_id="test_run_id", run_type="manual", state=DagRunState.RUNNING)
)
with pytest.raises(IntegrityError) as exeinfo_integrity_error:
session.commit()

with pytest.raises(HTTPException) as exeinfo_response_error:
self.unique_constraint_error_handler.exception_handler(None, exeinfo_integrity_error.value) # type: ignore

assert exeinfo_response_error.value.status_code == expected_exception.status_code
# The SQL statement is an implementation detail, so we match on the statement pattern (contains
# the table name and is an INSERT) instead of insisting on an exact match.
response_detail = exeinfo_response_error.value.detail
expected_detail = expected_exception.detail

assert response_detail == expected_detail
assert exeinfo_response_error.value.detail == expected_exception.detail

@pytest.mark.parametrize(
"table, expected_exception",
("table", "expected_exception"),
generate_test_cases_parametrize(
["DagRun"],
[
Expand All @@ -233,7 +327,6 @@ def test_handle_single_column_unique_constraint_error(
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, triggering_user_name, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?, ?, ?)",
"orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -242,7 +335,6 @@ def test_handle_single_column_unique_constraint_error(
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, triggering_user_name, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s, %s, %s)",
"orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")",
"message": MESSAGE,
},
),
HTTPException(
Expand All @@ -251,17 +343,16 @@ def test_handle_single_column_unique_constraint_error(
"reason": "Unique constraint violation",
"statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, triggering_user_name, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(run_type)s, %(triggered_by)s, %(triggering_user_name)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(run_after)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(bundle_version)s, %(scheduled_by_job_id)s, %(context_carrier)s, %(created_dag_version_id)s) RETURNING dag_run.id",
"orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n',
"message": MESSAGE,
},
),
],
],
),
)
@patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID)
@conf_vars({("api", "expose_stacktrace"): "False"})
@conf_vars({("api", "expose_stacktrace"): "True"})
@provide_session
def test_handle_multiple_columns_unique_constraint_error(
def test_handle_multiple_columns_unique_constraint_error_with_stacktrace(
self,
mock_get_random_string,
session,
Expand All @@ -287,18 +378,20 @@ def test_handle_multiple_columns_unique_constraint_error(
self.unique_constraint_error_handler.exception_handler(None, exeinfo_integrity_error.value) # type: ignore

assert exeinfo_response_error.value.status_code == expected_exception.status_code
response_detail = exeinfo_response_error.value.detail
# Removes the stacktrace from response to remove during comparison.
response_detail.pop("message", None) # type: ignore[attr-defined]
if SQLALCHEMY_V_1_4:
assert exeinfo_response_error.value.detail == expected_exception.detail
else:
# The SQL statement is an implementation detail, so we match on the statement pattern (contains
# the table name and is an INSERT) instead of insisting on an exact match.
response_detail = exeinfo_response_error.value.detail
expected_detail = expected_exception.detail
actual_statement = response_detail.pop("statement", None) # type: ignore[attr-defined]
expected_detail.pop("statement", None)

assert response_detail == expected_detail
assert "INSERT INTO dag_run" in actual_statement

assert exeinfo_response_error.value.detail == expected_exception.detail


Expand Down
Loading