From 341d163b35f5e209b3bdbf6c3f7a4bc102aac6b9 Mon Sep 17 00:00:00 2001 From: Pranay Kumar Karvi Date: Sat, 14 Mar 2026 15:46:01 +0530 Subject: [PATCH 1/5] fix: replace savepoint-per-DAG with per-DAG transaction in migration 0101 downgrade to prevent PostgreSQL lock table exhaustion --- ...101_3_2_0_ui_improvements_for_deadlines.py | 98 +++++++++---------- 1 file changed, 48 insertions(+), 50 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py index 92bfc558d4726..583a5688a6482 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py @@ -589,6 +589,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: last_dag_id = "" conn = op.get_bind() + engine = conn.engine dialect = conn.dialect.name # Count all dags - we'll filter in the loop for those with deadline data @@ -643,60 +644,57 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: processed_dags.append(dag_id) last_dag_id = dag_id - # Create a savepoint for this Dag to allow rollback on error. - savepoint = conn.begin_nested() - try: - dag_data = get_dag_data(data, data_compressed) - deadline_uuids = dag_data[DAG_KEY][DEADLINE_KEY] - - if not isinstance(deadline_uuids, list) or not deadline_uuids: - continue - - if not all(isinstance(uuid_val, str) for uuid_val in deadline_uuids): - log.warning("Dag has non-string deadline values, skipping", dag_id=dag_id) - continue - - dags_with_deadlines.add(dag_id) - restored_deadline_objects = [] - - alert_result = conn.execute( - sa.text(""" - SELECT reference, interval, callback_def - FROM deadline_alert - WHERE serialized_dag_id = :serialized_dag_id - """), - {"serialized_dag_id": serialized_dag_id}, - ).fetchall() - - if not alert_result: - dags_with_errors[dag_id].append( - f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" - ) - continue - - for alert in alert_result: - deadline_object = { - Encoding.TYPE: ENCODING_TYPE, - Encoding.VAR: { - REFERENCE_KEY: alert.reference, - INTERVAL_KEY: float(alert.interval), - CALLBACK_KEY: alert.callback_def, - }, - } - restored_deadline_objects.append(deadline_object) - restored_alerts_count += 1 - - # Replace the UUID array with the restored objects. - if restored_deadline_objects: - update_dag_deadline_field(conn, serialized_dag_id, restored_deadline_objects, dialect) + with engine.begin() as dag_conn: + dag_data = get_dag_data(data, data_compressed) + deadline_uuids = dag_data[DAG_KEY][DEADLINE_KEY] - # Commit the savepoint if everything succeeded for this Dag. - savepoint.commit() + if not isinstance(deadline_uuids, list) or not deadline_uuids: + continue + + if not all(isinstance(uuid_val, str) for uuid_val in deadline_uuids): + log.warning("Dag has non-string deadline values, skipping", dag_id=dag_id) + continue + + dags_with_deadlines.add(dag_id) + restored_deadline_objects = [] + + alert_result = dag_conn.execute( + sa.text(""" + SELECT reference, interval, callback_def + FROM deadline_alert + WHERE serialized_dag_id = :serialized_dag_id + """), + {"serialized_dag_id": serialized_dag_id}, + ).fetchall() + + if not alert_result: + dags_with_errors[dag_id].append( + f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" + ) + continue + + for alert in alert_result: + deadline_object = { + Encoding.TYPE: ENCODING_TYPE, + Encoding.VAR: { + REFERENCE_KEY: alert.reference, + INTERVAL_KEY: float(alert.interval), + CALLBACK_KEY: alert.callback_def, + }, + } + restored_deadline_objects.append(deadline_object) + restored_alerts_count += 1 + + # Replace the UUID array with the restored objects. + if restored_deadline_objects: + update_dag_deadline_field( + dag_conn, serialized_dag_id, restored_deadline_objects, dialect + ) except Exception as e: - dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") - savepoint.rollback() + log.error("Could not restore deadline for dag %s: %s", dag_id, e) + dags_with_errors[dag_id].append(str(e)) log.info("Batch complete", batch_num=batch_num, total_batches=total_batches) From bee8e9db5ff7b511be0d9e31d1879dd841960b51 Mon Sep 17 00:00:00 2001 From: Pranay Kumar Karvi Date: Sun, 15 Mar 2026 15:33:03 +0530 Subject: [PATCH 2/5] fix: address review feedback - fix upgrade path, move validation before transaction, restore error prefix, handle SQLite separately --- ...101_3_2_0_ui_improvements_for_deadlines.py | 332 +++++++++++------- 1 file changed, 207 insertions(+), 125 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py index 583a5688a6482..5cfd99e9930d2 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py @@ -374,6 +374,7 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: last_dag_id = "" conn = op.get_bind() + engine = conn.engine dialect = conn.dialect.name total_dags = conn.execute( @@ -427,99 +428,107 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: processed_dags.append(dag_id) last_dag_id = dag_id - # Create a savepoint for this Dag to allow rollback on error. - savepoint = conn.begin_nested() - + # Validation that does not need a DB connection. try: dag_data = get_dag_data(data, data_compressed) + except (json.JSONDecodeError, KeyError, TypeError) as e: + dags_with_errors[dag_id].append(f"Could not process serialized Dag: {e}") + continue - if dag_deadline := dag_data[DAG_KEY][DEADLINE_KEY]: - dags_with_deadlines.add(dag_id) - deadline_alerts = dag_deadline if isinstance(dag_deadline, list) else [dag_deadline] - - migrated_alert_ids = [] - - for serialized_alert in deadline_alerts: - if isinstance(serialized_alert, dict): - try: - alert_data = serialized_alert.get(Encoding.VAR, serialized_alert) - - if not DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data): - dags_with_errors[dag_id].append( - f"Invalid DeadlineAlert structure: {serialized_alert}" - ) - continue - - reference_data = json.dumps(alert_data[REFERENCE_KEY], sort_keys=True) - interval_data = float(alert_data.get(INTERVAL_KEY)) - callback_data = json.dumps(alert_data[CALLBACK_KEY], sort_keys=True) - deadline_alert_id = str(uuid6.uuid7()) - - conn.execute( - sa.text(""" - INSERT INTO deadline_alert ( - id, - created_at, - serialized_dag_id, - reference, - interval, - callback_def, - name, - description) - VALUES ( - :id, - :created_at, - :serialized_dag_id, - :reference, - :interval, - :callback_def, - NULL, - NULL) - """), - { - "id": deadline_alert_id, - "created_at": created_at or timezone.utcnow(), - "serialized_dag_id": serialized_dag_id, - "reference": reference_data, - "interval": interval_data, - "callback_def": callback_data, - }, + dag_deadline = ( + dag_data.get(DAG_KEY, {}).get(DEADLINE_KEY) + if isinstance(dag_data.get(DAG_KEY), dict) + else None + ) + if not dag_deadline: + continue + + dags_with_deadlines.add(dag_id) + deadline_alerts = dag_deadline if isinstance(dag_deadline, list) else [dag_deadline] + + def _migrate_dag_deadlines(dag_conn: Connection) -> list[str]: + migrated_alert_ids = [] + for serialized_alert in deadline_alerts: + if isinstance(serialized_alert, dict): + try: + alert_data = serialized_alert.get(Encoding.VAR, serialized_alert) + + if not DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data): + dags_with_errors[dag_id].append( + f"Invalid DeadlineAlert structure: {serialized_alert}" ) + continue - if not validate_written_data( - conn, deadline_alert_id, reference_data, interval_data, callback_data - ): - dags_with_errors[dag_id].append( - f"Invalid DeadlineAlert data: {serialized_alert}" - ) - continue - - migrated_alert_ids.append(deadline_alert_id) - migrated_alerts_count += 1 - - conn.execute( - sa.text(""" - UPDATE deadline - SET deadline_alert_id = :alert_id - WHERE dagrun_id IN ( - SELECT dr.id - FROM dag_run dr - JOIN serialized_dag sd ON dr.dag_id = sd.dag_id - WHERE sd.id = :serialized_dag_id) - AND deadline_alert_id IS NULL - """), - {"alert_id": deadline_alert_id, "serialized_dag_id": serialized_dag_id}, + reference_data = json.dumps(alert_data[REFERENCE_KEY], sort_keys=True) + interval_data = float(alert_data.get(INTERVAL_KEY)) + callback_data = json.dumps(alert_data[CALLBACK_KEY], sort_keys=True) + deadline_alert_id = str(uuid6.uuid7()) + + dag_conn.execute( + sa.text(""" + INSERT INTO deadline_alert ( + id, + created_at, + serialized_dag_id, + reference, + interval, + callback_def, + name, + description) + VALUES ( + :id, + :created_at, + :serialized_dag_id, + :reference, + :interval, + :callback_def, + NULL, + NULL) + """), + { + "id": deadline_alert_id, + "created_at": created_at or timezone.utcnow(), + "serialized_dag_id": serialized_dag_id, + "reference": reference_data, + "interval": interval_data, + "callback_def": callback_data, + }, + ) + + if not validate_written_data( + dag_conn, deadline_alert_id, reference_data, interval_data, callback_data + ): + dags_with_errors[dag_id].append( + f"Invalid DeadlineAlert data: {serialized_alert}" ) - except Exception as e: - dags_with_errors[dag_id].append(f"Failed to process {serialized_alert}: {e}") continue + migrated_alert_ids.append(deadline_alert_id) + dag_conn.execute( + sa.text(""" + UPDATE deadline + SET deadline_alert_id = :alert_id + WHERE dagrun_id IN ( + SELECT dr.id + FROM dag_run dr + JOIN serialized_dag sd ON dr.dag_id = sd.dag_id + WHERE sd.id = :serialized_dag_id) + AND deadline_alert_id IS NULL + """), + {"alert_id": deadline_alert_id, "serialized_dag_id": serialized_dag_id}, + ) + except Exception as e: + dags_with_errors[dag_id].append(f"Failed to process {serialized_alert}: {e}") + continue + return migrated_alert_ids + + if dialect == "sqlite": + savepoint = conn.begin_nested() + try: + migrated_alert_ids = _migrate_dag_deadlines(conn) if migrated_alert_ids: uuid_strings = [str(uuid_id) for uuid_id in migrated_alert_ids] update_dag_deadline_field(conn, serialized_dag_id, uuid_strings, dialect) - - # Recalculate and update the dag_hash after modifying the deadline data to ensure - # it matches what write_dag() will compute later and avoid re-serialization. updated_result = conn.execute( sa.text( "SELECT data, data_compressed " @@ -528,14 +537,11 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: ), {"serialized_dag_id": serialized_dag_id}, ).fetchone() - if updated_result: updated_dag_data = get_dag_data( updated_result.data, updated_result.data_compressed ) - # Import here to avoid a circular dependency issue new_hash = hash_dag(updated_dag_data) - conn.execute( sa.text( "UPDATE serialized_dag " @@ -544,13 +550,43 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: ), {"new_hash": new_hash, "serialized_dag_id": serialized_dag_id}, ) - - # Commit the savepoint if everything succeeded for this Dag. - savepoint.commit() - - except (json.JSONDecodeError, KeyError, TypeError) as e: - dags_with_errors[dag_id].append(f"Could not process serialized Dag: {e}") - savepoint.rollback() + migrated_alerts_count += len(migrated_alert_ids) + savepoint.commit() + except Exception as e: + savepoint.rollback() + dags_with_errors[dag_id].append(f"Could not migrate deadline: {e}") + else: + try: + with engine.begin() as dag_conn: + migrated_alert_ids = _migrate_dag_deadlines(dag_conn) + if migrated_alert_ids: + migrated_alerts_count += len(migrated_alert_ids) + uuid_strings = [str(uuid_id) for uuid_id in migrated_alert_ids] + update_dag_deadline_field(dag_conn, serialized_dag_id, uuid_strings, dialect) + updated_result = dag_conn.execute( + sa.text( + "SELECT data, data_compressed " + "FROM serialized_dag " + "WHERE id = :serialized_dag_id" + ), + {"serialized_dag_id": serialized_dag_id}, + ).fetchone() + if updated_result: + updated_dag_data = get_dag_data( + updated_result.data, updated_result.data_compressed + ) + new_hash = hash_dag(updated_dag_data) + dag_conn.execute( + sa.text( + "UPDATE serialized_dag " + "SET dag_hash = :new_hash " + "WHERE id = :serialized_dag_id" + ), + {"new_hash": new_hash, "serialized_dag_id": serialized_dag_id}, + ) + except Exception as e: + log.error("Could not migrate deadline for dag %s: %s", dag_id, e) + dags_with_errors[dag_id].append(f"Could not migrate deadline: {e}") log.info("Batch complete", batch_num=batch_num, total_batches=total_batches) @@ -644,22 +680,30 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: processed_dags.append(dag_id) last_dag_id = dag_id + # Validation that does not need a DB connection. try: - with engine.begin() as dag_conn: - dag_data = get_dag_data(data, data_compressed) - deadline_uuids = dag_data[DAG_KEY][DEADLINE_KEY] + dag_data = get_dag_data(data, data_compressed) + except (json.JSONDecodeError, KeyError, TypeError): + continue + deadline_uuids = ( + dag_data.get(DAG_KEY, {}).get(DEADLINE_KEY) + if isinstance(dag_data.get(DAG_KEY), dict) + else None + ) - if not isinstance(deadline_uuids, list) or not deadline_uuids: - continue + if not isinstance(deadline_uuids, list) or not deadline_uuids: + continue - if not all(isinstance(uuid_val, str) for uuid_val in deadline_uuids): - log.warning("Dag has non-string deadline values, skipping", dag_id=dag_id) - continue + if not all(isinstance(uuid_val, str) for uuid_val in deadline_uuids): + log.warning("Dag has non-string deadline values, skipping", dag_id=dag_id) + continue - dags_with_deadlines.add(dag_id) - restored_deadline_objects = [] + dags_with_deadlines.add(dag_id) - alert_result = dag_conn.execute( + if dialect == "sqlite": + savepoint = conn.begin_nested() + try: + alert_result = conn.execute( sa.text(""" SELECT reference, interval, callback_def FROM deadline_alert @@ -672,29 +716,67 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: dags_with_errors[dag_id].append( f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" ) - continue - - for alert in alert_result: - deadline_object = { - Encoding.TYPE: ENCODING_TYPE, - Encoding.VAR: { - REFERENCE_KEY: alert.reference, - INTERVAL_KEY: float(alert.interval), - CALLBACK_KEY: alert.callback_def, - }, - } - restored_deadline_objects.append(deadline_object) - restored_alerts_count += 1 - - # Replace the UUID array with the restored objects. - if restored_deadline_objects: - update_dag_deadline_field( - dag_conn, serialized_dag_id, restored_deadline_objects, dialect - ) + savepoint.rollback() + else: + restored_deadline_objects = [] + for alert in alert_result: + deadline_object = { + Encoding.TYPE: ENCODING_TYPE, + Encoding.VAR: { + REFERENCE_KEY: alert.reference, + INTERVAL_KEY: float(alert.interval), + CALLBACK_KEY: alert.callback_def, + }, + } + restored_deadline_objects.append(deadline_object) + restored_alerts_count += 1 + if restored_deadline_objects: + update_dag_deadline_field( + conn, serialized_dag_id, restored_deadline_objects, dialect + ) + savepoint.commit() + except Exception as e: + savepoint.rollback() + log.error("Could not restore deadline for dag %s: %s", dag_id, e) + dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") + else: + try: + with engine.begin() as dag_conn: + alert_result = dag_conn.execute( + sa.text(""" + SELECT reference, interval, callback_def + FROM deadline_alert + WHERE serialized_dag_id = :serialized_dag_id + """), + {"serialized_dag_id": serialized_dag_id}, + ).fetchall() - except Exception as e: - log.error("Could not restore deadline for dag %s: %s", dag_id, e) - dags_with_errors[dag_id].append(str(e)) + if not alert_result: + dags_with_errors[dag_id].append( + f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" + ) + continue + + restored_deadline_objects = [] + for alert in alert_result: + deadline_object = { + Encoding.TYPE: ENCODING_TYPE, + Encoding.VAR: { + REFERENCE_KEY: alert.reference, + INTERVAL_KEY: float(alert.interval), + CALLBACK_KEY: alert.callback_def, + }, + } + restored_deadline_objects.append(deadline_object) + restored_alerts_count += 1 + + if restored_deadline_objects: + update_dag_deadline_field( + dag_conn, serialized_dag_id, restored_deadline_objects, dialect + ) + except Exception as e: + log.error("Could not restore deadline for dag %s: %s", dag_id, e) + dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") log.info("Batch complete", batch_num=batch_num, total_batches=total_batches) From 0dd1f9cdaec8f458824db3a830a4f4ae2d7bc720 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 18 Mar 2026 16:07:28 +0800 Subject: [PATCH 3/5] Simplify duplicate engine-specific session handling --- ...101_3_2_0_ui_improvements_for_deadlines.py | 277 +++++++----------- 1 file changed, 110 insertions(+), 167 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py index 8bc1e9416ca49..3b59add58f36a 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py @@ -30,6 +30,7 @@ from __future__ import annotations +import contextlib import json import zlib from collections import defaultdict @@ -363,6 +364,34 @@ def _sort_serialized_dag_dict(serialized_dag: Any): return serialized_dag +@contextlib.contextmanager +def _begin_nested_transaction(conn): + """ + Create an nested transaction. + + On SQLite, this calls ``conn.begin_nested()`` and commit/rollback manually. + Anywhere else, the ``conn.engine.begin()`` context manager is used. + + On additional feature is the inner code can use ``gen.send()`` to set a + truthy value to explicitly tell the session to rollback even if no error + was raised. + """ + if conn.dialect.name != "sqlite": + with conn.engine.begin(): + yield + return + try: + savepoint = conn.begin_nested() + rollback = yield + except Exception: + savepoint.rollback() + raise + if rollback: + savepoint.rollback() + else: + savepoint.commit() + + def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: """Extract DeadlineAlert data from serialized Dag data and populate deadline_alert table.""" if context.is_offline_mode(): @@ -383,7 +412,6 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: last_dag_id = "" conn = op.get_bind() - engine = conn.engine dialect = conn.dialect.name total_dags = conn.execute( @@ -437,102 +465,92 @@ def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: # Validation that does not need a DB connection. try: - dag_data = get_dag_data(data, data_compressed) + dag_deadline = get_dag_data(data, data_compressed)[DAG_KEY].get(DEADLINE_KEY) except (json.JSONDecodeError, KeyError, TypeError) as e: dags_with_errors[dag_id].append(f"Could not process serialized Dag: {e}") continue - - dag_deadline = ( - dag_data.get(DAG_KEY, {}).get(DEADLINE_KEY) - if isinstance(dag_data.get(DAG_KEY), dict) - else None - ) if not dag_deadline: continue dags_with_deadlines.add(dag_id) deadline_alerts = dag_deadline if isinstance(dag_deadline, list) else [dag_deadline] - def _migrate_dag_deadlines(dag_conn: Connection) -> list[str]: - migrated_alert_ids = [] + def _migrate_dag_deadlines(dag_conn: Connection) -> Iterable[str]: for serialized_alert in deadline_alerts: - if isinstance(serialized_alert, dict): - try: - alert_data = serialized_alert.get(Encoding.VAR, serialized_alert) - - if not DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data): - dags_with_errors[dag_id].append( - f"Invalid DeadlineAlert structure: {serialized_alert}" - ) - continue - - reference_data = json.dumps(alert_data[REFERENCE_KEY], sort_keys=True) - interval_data = float(alert_data.get(INTERVAL_KEY)) - callback_data = json.dumps(alert_data[CALLBACK_KEY], sort_keys=True) - deadline_alert_id = str(uuid6.uuid7()) - - dag_conn.execute( - sa.text(""" - INSERT INTO deadline_alert ( - id, - created_at, - serialized_dag_id, - reference, - interval, - callback_def, - name, - description) - VALUES ( - :id, - :created_at, - :serialized_dag_id, - :reference, - :interval, - :callback_def, - NULL, - NULL) - """), - { - "id": deadline_alert_id, - "created_at": created_at or timezone.utcnow(), - "serialized_dag_id": serialized_dag_id, - "reference": reference_data, - "interval": interval_data, - "callback_def": callback_data, - }, - ) + if not isinstance(serialized_alert, dict): + continue + try: + alert_data = serialized_alert.get(Encoding.VAR, serialized_alert) - if not validate_written_data( - dag_conn, deadline_alert_id, reference_data, interval_data, callback_data - ): - dags_with_errors[dag_id].append( - f"Invalid DeadlineAlert data: {serialized_alert}" - ) - continue - - migrated_alert_ids.append(deadline_alert_id) - dag_conn.execute( - sa.text(""" - UPDATE deadline - SET deadline_alert_id = :alert_id - WHERE dagrun_id IN ( - SELECT dr.id - FROM dag_run dr - JOIN serialized_dag sd ON dr.dag_id = sd.dag_id - WHERE sd.id = :serialized_dag_id) - AND deadline_alert_id IS NULL - """), - {"alert_id": deadline_alert_id, "serialized_dag_id": serialized_dag_id}, + if not DEADLINE_ALERT_REQUIRED_FIELDS.issubset(alert_data): + dags_with_errors[dag_id].append( + f"Invalid DeadlineAlert structure: {serialized_alert}" ) - except Exception as e: - dags_with_errors[dag_id].append(f"Failed to process {serialized_alert}: {e}") continue - return migrated_alert_ids - if dialect == "sqlite": - savepoint = conn.begin_nested() - try: - migrated_alert_ids = _migrate_dag_deadlines(conn) + reference_data = json.dumps(alert_data[REFERENCE_KEY], sort_keys=True) + interval_data = float(alert_data.get(INTERVAL_KEY)) + callback_data = json.dumps(alert_data[CALLBACK_KEY], sort_keys=True) + deadline_alert_id = str(uuid6.uuid7()) + + dag_conn.execute( + sa.text(""" + INSERT INTO deadline_alert ( + id, + created_at, + serialized_dag_id, + reference, + interval, + callback_def, + name, + description) + VALUES ( + :id, + :created_at, + :serialized_dag_id, + :reference, + :interval, + :callback_def, + NULL, + NULL) + """), + { + "id": deadline_alert_id, + "created_at": created_at or timezone.utcnow(), + "serialized_dag_id": serialized_dag_id, + "reference": reference_data, + "interval": interval_data, + "callback_def": callback_data, + }, + ) + + if not validate_written_data( + dag_conn, deadline_alert_id, reference_data, interval_data, callback_data + ): + dags_with_errors[dag_id].append(f"Invalid DeadlineAlert data: {serialized_alert}") + continue + + yield deadline_alert_id + dag_conn.execute( + sa.text(""" + UPDATE deadline + SET deadline_alert_id = :alert_id + WHERE dagrun_id IN ( + SELECT dr.id + FROM dag_run dr + JOIN serialized_dag sd ON dr.dag_id = sd.dag_id + WHERE sd.id = :serialized_dag_id) + AND deadline_alert_id IS NULL + """), + {"alert_id": deadline_alert_id, "serialized_dag_id": serialized_dag_id}, + ) + except Exception as e: + dags_with_errors[dag_id].append(f"Failed to process {serialized_alert}: {e}") + continue + + try: + with _begin_nested_transaction(conn): + migrated_alert_ids = list(_migrate_dag_deadlines(conn)) if migrated_alert_ids: uuid_strings = [str(uuid_id) for uuid_id in migrated_alert_ids] update_dag_deadline_field(conn, serialized_dag_id, uuid_strings, dialect) @@ -558,42 +576,9 @@ def _migrate_dag_deadlines(dag_conn: Connection) -> list[str]: {"new_hash": new_hash, "serialized_dag_id": serialized_dag_id}, ) migrated_alerts_count += len(migrated_alert_ids) - savepoint.commit() - except Exception as e: - savepoint.rollback() - dags_with_errors[dag_id].append(f"Could not migrate deadline: {e}") - else: - try: - with engine.begin() as dag_conn: - migrated_alert_ids = _migrate_dag_deadlines(dag_conn) - if migrated_alert_ids: - migrated_alerts_count += len(migrated_alert_ids) - uuid_strings = [str(uuid_id) for uuid_id in migrated_alert_ids] - update_dag_deadline_field(dag_conn, serialized_dag_id, uuid_strings, dialect) - updated_result = dag_conn.execute( - sa.text( - "SELECT data, data_compressed " - "FROM serialized_dag " - "WHERE id = :serialized_dag_id" - ), - {"serialized_dag_id": serialized_dag_id}, - ).fetchone() - if updated_result: - updated_dag_data = get_dag_data( - updated_result.data, updated_result.data_compressed - ) - new_hash = hash_dag(updated_dag_data) - dag_conn.execute( - sa.text( - "UPDATE serialized_dag " - "SET dag_hash = :new_hash " - "WHERE id = :serialized_dag_id" - ), - {"new_hash": new_hash, "serialized_dag_id": serialized_dag_id}, - ) - except Exception as e: - log.error("Could not migrate deadline for dag %s: %s", dag_id, e) - dags_with_errors[dag_id].append(f"Could not migrate deadline: {e}") + except (json.JSONDecodeError, KeyError, TypeError) as e: + log.exception("Could not migrate deadline for dag %s", dag_id) + dags_with_errors[dag_id].append(f"Could not migrate deadline: {e}") log.info("Batch complete", batch_num=batch_num, total_batches=total_batches) @@ -632,7 +617,6 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: last_dag_id = "" conn = op.get_bind() - engine = conn.engine dialect = conn.dialect.name # Count all dags - we'll filter in the loop for those with deadline data @@ -707,9 +691,8 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: dags_with_deadlines.add(dag_id) - if dialect == "sqlite": - savepoint = conn.begin_nested() - try: + try: + with (ctx := _begin_nested_transaction(conn)): alert_result = conn.execute( sa.select( deadline_alert_table.c.reference, @@ -725,7 +708,7 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: dags_with_errors[dag_id].append( f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" ) - savepoint.rollback() + ctx.gen.send(True) # Explicit rollback. else: restored_deadline_objects = [] for alert in alert_result: @@ -743,49 +726,9 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: update_dag_deadline_field( conn, serialized_dag_id, restored_deadline_objects, dialect ) - savepoint.commit() - except Exception as e: - savepoint.rollback() - log.error("Could not restore deadline for dag %s: %s", dag_id, e) - dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") - else: - try: - with engine.begin() as dag_conn: - alert_result = dag_conn.execute( - sa.text(""" - SELECT reference, interval, callback_def - FROM deadline_alert - WHERE serialized_dag_id = :serialized_dag_id - """), - {"serialized_dag_id": serialized_dag_id}, - ).fetchall() - - if not alert_result: - dags_with_errors[dag_id].append( - f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" - ) - continue - - restored_deadline_objects = [] - for alert in alert_result: - deadline_object = { - Encoding.TYPE: ENCODING_TYPE, - Encoding.VAR: { - REFERENCE_KEY: alert.reference, - INTERVAL_KEY: float(alert.interval), - CALLBACK_KEY: alert.callback_def, - }, - } - restored_deadline_objects.append(deadline_object) - restored_alerts_count += 1 - - if restored_deadline_objects: - update_dag_deadline_field( - dag_conn, serialized_dag_id, restored_deadline_objects, dialect - ) - except Exception as e: - log.error("Could not restore deadline for dag %s: %s", dag_id, e) - dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") + except Exception as e: + log.exception("Could not restore deadline for dag %s", dag_id) + dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") log.info("Batch complete", batch_num=batch_num, total_batches=total_batches) From 3b33325c768881eb93394a12f1f8cf3aa37082fd Mon Sep 17 00:00:00 2001 From: Pranay Kumar Karvi Date: Thu, 19 Mar 2026 12:03:00 +0530 Subject: [PATCH 4/5] fix: yield new connection from _begin_nested_transaction and update callers to use it --- ...101_3_2_0_ui_improvements_for_deadlines.py | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py index 3b59add58f36a..8a8f151413060 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py @@ -367,22 +367,22 @@ def _sort_serialized_dag_dict(serialized_dag: Any): @contextlib.contextmanager def _begin_nested_transaction(conn): """ - Create an nested transaction. + Create a nested transaction. - On SQLite, this calls ``conn.begin_nested()`` and commit/rollback manually. - Anywhere else, the ``conn.engine.begin()`` context manager is used. - - On additional feature is the inner code can use ``gen.send()`` to set a + On SQLite, uses ``conn.begin_nested()`` with commit/rollback. + On other backends, opens a new connection via ``conn.engine.begin()`` + and yields it so callers use the new connection for writes. + An additional feature is the inner code can use ``gen.send()`` to set a truthy value to explicitly tell the session to rollback even if no error - was raised. + was raised (SQLite only). """ if conn.dialect.name != "sqlite": - with conn.engine.begin(): - yield + with conn.engine.begin() as new_conn: + yield new_conn return try: savepoint = conn.begin_nested() - rollback = yield + rollback = yield conn except Exception: savepoint.rollback() raise @@ -549,12 +549,12 @@ def _migrate_dag_deadlines(dag_conn: Connection) -> Iterable[str]: continue try: - with _begin_nested_transaction(conn): - migrated_alert_ids = list(_migrate_dag_deadlines(conn)) + with _begin_nested_transaction(conn) as dag_conn: + migrated_alert_ids = list(_migrate_dag_deadlines(dag_conn)) if migrated_alert_ids: uuid_strings = [str(uuid_id) for uuid_id in migrated_alert_ids] - update_dag_deadline_field(conn, serialized_dag_id, uuid_strings, dialect) - updated_result = conn.execute( + update_dag_deadline_field(dag_conn, serialized_dag_id, uuid_strings, dialect) + updated_result = dag_conn.execute( sa.text( "SELECT data, data_compressed " "FROM serialized_dag " @@ -567,7 +567,7 @@ def _migrate_dag_deadlines(dag_conn: Connection) -> Iterable[str]: updated_result.data, updated_result.data_compressed ) new_hash = hash_dag(updated_dag_data) - conn.execute( + dag_conn.execute( sa.text( "UPDATE serialized_dag " "SET dag_hash = :new_hash " @@ -692,8 +692,8 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: dags_with_deadlines.add(dag_id) try: - with (ctx := _begin_nested_transaction(conn)): - alert_result = conn.execute( + with _begin_nested_transaction(conn) as dag_conn: + alert_result = dag_conn.execute( sa.select( deadline_alert_table.c.reference, deadline_alert_table.c.interval, @@ -708,24 +708,24 @@ def migrate_deadline_alert_data_back_to_serialized_dag() -> None: dags_with_errors[dag_id].append( f"Could not find deadline_alert for serialized_dag {serialized_dag_id}" ) - ctx.gen.send(True) # Explicit rollback. - else: - restored_deadline_objects = [] - for alert in alert_result: - deadline_object = { - Encoding.TYPE: ENCODING_TYPE, - Encoding.VAR: { - REFERENCE_KEY: alert.reference, - INTERVAL_KEY: float(alert.interval), - CALLBACK_KEY: alert.callback_def, - }, - } - restored_deadline_objects.append(deadline_object) - restored_alerts_count += 1 - if restored_deadline_objects: - update_dag_deadline_field( - conn, serialized_dag_id, restored_deadline_objects, dialect - ) + continue + + restored_deadline_objects = [] + for alert in alert_result: + deadline_object = { + Encoding.TYPE: ENCODING_TYPE, + Encoding.VAR: { + REFERENCE_KEY: alert.reference, + INTERVAL_KEY: float(alert.interval), + CALLBACK_KEY: alert.callback_def, + }, + } + restored_deadline_objects.append(deadline_object) + restored_alerts_count += 1 + if restored_deadline_objects: + update_dag_deadline_field( + dag_conn, serialized_dag_id, restored_deadline_objects, dialect + ) except Exception as e: log.exception("Could not restore deadline for dag %s", dag_id) dags_with_errors[dag_id].append(f"Could not restore deadline: {e}") From 5874d8229e911ff72fe91146115b9650f8ad47be Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 19 Mar 2026 15:24:23 +0800 Subject: [PATCH 5/5] Remove sending a flag to rollback --- .../0101_3_2_0_ui_improvements_for_deadlines.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py index 8a8f151413060..0233f48c9a9e6 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py @@ -372,9 +372,6 @@ def _begin_nested_transaction(conn): On SQLite, uses ``conn.begin_nested()`` with commit/rollback. On other backends, opens a new connection via ``conn.engine.begin()`` and yields it so callers use the new connection for writes. - An additional feature is the inner code can use ``gen.send()`` to set a - truthy value to explicitly tell the session to rollback even if no error - was raised (SQLite only). """ if conn.dialect.name != "sqlite": with conn.engine.begin() as new_conn: @@ -382,14 +379,11 @@ def _begin_nested_transaction(conn): return try: savepoint = conn.begin_nested() - rollback = yield conn + yield conn except Exception: savepoint.rollback() raise - if rollback: - savepoint.rollback() - else: - savepoint.commit() + savepoint.commit() def migrate_existing_deadline_alert_data_from_serialized_dag() -> None: