diff --git a/airflow-core/docs/core-concepts/multi-team.rst b/airflow-core/docs/core-concepts/multi-team.rst index 2dbed2e587952..6a9d8a2ec63e7 100644 --- a/airflow-core/docs/core-concepts/multi-team.rst +++ b/airflow-core/docs/core-concepts/multi-team.rst @@ -486,7 +486,7 @@ is uppercase. export AIRFLOW__TEAM_B___CELERY__BROKER_URL="redis://team-b-redis:6379/0" # team_b's Celery result backend - export AIRFLOW__TEAM_B___CELERY__RESULT_BACKEND="db+postgresql://team-b-db/celery_results" + export AIRFLOW__TEAM_B___CELERY__RESULT_BACKEND="db+postgresql+psycopg2://team-b-db/celery_results" Via Config File """"""""""""""" @@ -499,17 +499,17 @@ name followed by an equals sign: # Global celery settings (used by the global executor, NOT as a fallback for teams) [celery] broker_url = redis://default-redis:6379/0 - result_backend = db+postgresql://default-db/celery_results + result_backend = db+postgresql+psycopg2://default-db/celery_results # team_a overrides [team_a=celery] broker_url = redis://team-a-redis:6379/0 - result_backend = db+postgresql://team-a-db/celery_results + result_backend = db+postgresql+psycopg2://team-a-db/celery_results # team_b overrides [team_b=celery] broker_url = redis://team-b-redis:6379/0 - result_backend = db+postgresql://team-b-db/celery_results + result_backend = db+postgresql+psycopg2://team-b-db/celery_results Dag Bundle to Team Association ------------------------------ diff --git a/airflow-core/docs/howto/set-up-database.rst b/airflow-core/docs/howto/set-up-database.rst index b9c8f1aae01f0..714fe252fc0d9 100644 --- a/airflow-core/docs/howto/set-up-database.rst +++ b/airflow-core/docs/howto/set-up-database.rst @@ -184,7 +184,7 @@ in the Postgres documentation to learn more. .. warning:: - When you use SQLAlchemy 1.4.0+, you need to use ``postgresql://`` as the database in the ``sql_alchemy_conn``. + When you use SQLAlchemy 1.4.0+, you need to use ``postgresql+psycopg2://`` as the database in the ``sql_alchemy_conn``. In the previous versions of SQLAlchemy it was possible to use ``postgres://``, but using it in SQLAlchemy 1.4.0+ results in: @@ -353,7 +353,7 @@ For instance, you can specify a database schema where Airflow will create its re .. code-block:: bash - export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow" + export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow" export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow" Note the ``search_path`` at the end of the ``SQL_ALCHEMY_CONN`` database URL. diff --git a/airflow-core/newsfragments/68314.improvement.rst b/airflow-core/newsfragments/68314.improvement.rst new file mode 100644 index 0000000000000..02f9338679056 --- /dev/null +++ b/airflow-core/newsfragments/68314.improvement.rst @@ -0,0 +1 @@ +The default PostgreSQL connection string now uses an explicit ``postgresql+psycopg2://`` SQLAlchemy dialect instead of the bare ``postgresql://`` scheme. SQLAlchemy 2.1 changes the implicit default from psycopg2 to psycopg (v3), so bare URLs silently change driver across upgrades. If your ``sql_alchemy_conn`` or Celery ``result_backend`` uses ``postgresql://``, switch to ``postgresql+psycopg2://`` (or ``postgresql+psycopg://`` if you intend to use psycopg v3). Airflow will continue to auto-upgrade legacy ``postgres://`` schemes on startup. diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 7790a252af37a..d43eb715d320f 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -372,12 +372,13 @@ def _upgrade_postgres_metastore_conn(self): Upgrade SQL schemas. As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres` - must be replaced with `postgresql`. + must be replaced with `postgresql+psycopg2`. The bare `postgresql` + scheme is also upgraded to make the psycopg2 driver explicit. """ section, key = "database", "sql_alchemy_conn" old_value = self.get(section, key, _extra_stacklevel=1) - bad_schemes = ["postgres+psycopg2", "postgres"] - good_scheme = "postgresql" + bad_schemes = ["postgres+psycopg2", "postgres", "postgresql"] + good_scheme = "postgresql+psycopg2" parsed = urlsplit(old_value) if parsed.scheme in bad_schemes: warnings.warn( diff --git a/airflow-core/tests/unit/core/test_configuration.py b/airflow-core/tests/unit/core/test_configuration.py index abc42c26433dd..fc44c398e8e84 100644 --- a/airflow-core/tests/unit/core/test_configuration.py +++ b/airflow-core/tests/unit/core/test_configuration.py @@ -800,7 +800,9 @@ def test_as_dict_respects_sensitive_cmds(self): assert conf_maintain_cmds["database"]["sql_alchemy_conn"] == conf_conn @mock.patch.dict( - "os.environ", {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True + "os.environ", + {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql+psycopg2://'"}, + clear=True, ) def test_as_dict_respects_sensitive_cmds_from_env(self): test_conf = copy.deepcopy(conf) @@ -811,7 +813,7 @@ def test_as_dict_respects_sensitive_cmds_from_env(self): assert "sql_alchemy_conn" in conf_materialize_cmds["database"] assert "sql_alchemy_conn_cmd" not in conf_materialize_cmds["database"] - assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "postgresql://" + assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "postgresql+psycopg2://" @skip_if_force_lowest_dependencies_marker @conf_vars( @@ -1118,6 +1120,39 @@ def test_write_handles_multiline_non_json_string(self): assert expected_raw_output in content + @pytest.mark.parametrize( + ("input_scheme", "expected_scheme"), + [ + pytest.param( + "postgres://user:pass@host/db", "postgresql+psycopg2://user:pass@host/db", id="postgres" + ), + pytest.param( + "postgres+psycopg2://user:pass@host/db", + "postgresql+psycopg2://user:pass@host/db", + id="postgres+psycopg2", + ), + pytest.param( + "postgresql://user:pass@host/db", + "postgresql+psycopg2://user:pass@host/db", + id="postgresql-bare", + ), + pytest.param( + "postgresql+psycopg2://user:pass@host/db", + "postgresql+psycopg2://user:pass@host/db", + id="postgresql+psycopg2-noop", + ), + pytest.param("mysql://user:pass@host/db", "mysql://user:pass@host/db", id="mysql-untouched"), + ], + ) + @mock.patch.dict("os.environ", {}, clear=False) + def test_upgrade_postgres_metastore_conn(self, input_scheme, expected_scheme): + os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = input_scheme + test_conf = AirflowConfigParser() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", FutureWarning) + test_conf._upgrade_postgres_metastore_conn() + assert test_conf.get("database", "sql_alchemy_conn") == expected_scheme + @mock.patch.dict( "os.environ", diff --git a/airflow-core/tests/unit/models/test_base.py b/airflow-core/tests/unit/models/test_base.py index b41fb5d0cc2a7..51fd430dbab23 100644 --- a/airflow-core/tests/unit/models/test_base.py +++ b/airflow-core/tests/unit/models/test_base.py @@ -28,7 +28,7 @@ @pytest.mark.parametrize( ("dsn", "expected", "extra"), [ - pytest.param("postgresql://host/the_database", {}, {}, id="postgres"), + pytest.param("postgresql+psycopg2://host/the_database", {}, {}, id="postgres"), pytest.param("mysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql"), pytest.param( "mysql+pymysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymysql" @@ -40,7 +40,7 @@ id="mysql with explicit config", ), pytest.param( - "postgresql://host/the_database", + "postgresql+psycopg2://host/the_database", {"collation": "ascii"}, {("database", "sql_engine_collation_for_ids"): "ascii"}, id="postgres with explicit config", diff --git a/airflow-core/tests/unit/utils/test_db.py b/airflow-core/tests/unit/utils/test_db.py index d2f13e27e9e70..e54cc6bf4bb15 100644 --- a/airflow-core/tests/unit/utils/test_db.py +++ b/airflow-core/tests/unit/utils/test_db.py @@ -406,7 +406,7 @@ class TestAutocommitEngineForMySQL: def test_non_mysql_database_is_noop(self, mocker): """Test that non-MySQL databases don't trigger any changes.""" # Mock settings to use PostgreSQL - mocker.patch.object(settings, "SQL_ALCHEMY_CONN", "postgresql://user:pass@localhost/db") + mocker.patch.object(settings, "SQL_ALCHEMY_CONN", "postgresql+psycopg2://user:pass@localhost/db") mock_dispose = mocker.patch.object(settings, "dispose_orm") mock_configure = mocker.patch.object(settings, "configure_orm") diff --git a/devel-common/src/tests_common/test_utils/operators/postgres_local_executor.cfg b/devel-common/src/tests_common/test_utils/operators/postgres_local_executor.cfg index 379c9b6a92721..4a71c05244b76 100644 --- a/devel-common/src/tests_common/test_utils/operators/postgres_local_executor.cfg +++ b/devel-common/src/tests_common/test_utils/operators/postgres_local_executor.cfg @@ -24,4 +24,4 @@ # Has to be run in parallel with a task to create BigTable table [core] executor = LocalExecutor -sql_alchemy_conn = postgresql:///airflow/airflow.db +sql_alchemy_conn = postgresql+psycopg2:///airflow/airflow.db diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index 29d92b74642c5..95412bd3fa608 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -236,7 +236,7 @@ config: version_added: ~ type: string sensitive: true - example: "db+postgresql://postgres:airflow@postgres/airflow" + example: "db+postgresql+psycopg2://postgres:airflow@postgres/airflow" default: ~ result_backend_sqlalchemy_engine_options: description: | diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index b1a663e3495cb..1d266d12a9983 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -123,7 +123,7 @@ def get_provider_info(): "version_added": None, "type": "string", "sensitive": True, - "example": "db+postgresql://postgres:airflow@postgres/airflow", + "example": "db+postgresql+psycopg2://postgres:airflow@postgres/airflow", "default": None, }, "result_backend_sqlalchemy_engine_options": { diff --git a/task-sdk/tests/task_sdk/definitions/test_asset.py b/task-sdk/tests/task_sdk/definitions/test_asset.py index 9b1f08b6d8ba2..38c9cd2064f28 100644 --- a/task-sdk/tests/task_sdk/definitions/test_asset.py +++ b/task-sdk/tests/task_sdk/definitions/test_asset.py @@ -56,10 +56,10 @@ pytest.param("sqlite:///:memory:", "\n\t", True, id="sqlite-whitespace"), pytest.param("sqlite:///:memory:", "a" * 1501, True, id="sqlite-too-long"), pytest.param("sqlite:///:memory:", "😊", False, id="sqlite-non-ascii"), - pytest.param("postgresql://localhost/db", "", True, id="postgres-empty"), - pytest.param("postgresql://localhost/db", "\n\t", True, id="postgres-whitespace"), - pytest.param("postgresql://localhost/db", "a" * 1501, True, id="postgres-too-long"), - pytest.param("postgresql://localhost/db", "😊", False, id="postgres-non-ascii"), + pytest.param("postgresql+psycopg2://localhost/db", "", True, id="postgres-empty"), + pytest.param("postgresql+psycopg2://localhost/db", "\n\t", True, id="postgres-whitespace"), + pytest.param("postgresql+psycopg2://localhost/db", "a" * 1501, True, id="postgres-too-long"), + pytest.param("postgresql+psycopg2://localhost/db", "😊", False, id="postgres-non-ascii"), ], ) def test_invalid_names(sql_conn_value, name, should_raise, monkeypatch): @@ -84,13 +84,16 @@ def test_invalid_names(sql_conn_value, name, should_raise, monkeypatch): pytest.param("sqlite:///:memory:", "a" * 1501, True, id="sqlite-too-long"), pytest.param("sqlite:///:memory:", "airflow://xcom/dag/task", True, id="sqlite-reserved-scheme"), pytest.param("sqlite:///:memory:", "😊", False, id="sqlite-non-ascii"), - pytest.param("postgresql://localhost/db", "", True, id="postgres-empty"), - pytest.param("postgresql://localhost/db", "\n\t", True, id="postgres-whitespace"), - pytest.param("postgresql://localhost/db", "a" * 1501, True, id="postgres-too-long"), + pytest.param("postgresql+psycopg2://localhost/db", "", True, id="postgres-empty"), + pytest.param("postgresql+psycopg2://localhost/db", "\n\t", True, id="postgres-whitespace"), + pytest.param("postgresql+psycopg2://localhost/db", "a" * 1501, True, id="postgres-too-long"), pytest.param( - "postgresql://localhost/db", "airflow://xcom/dag/task", True, id="postgres-reserved-scheme" + "postgresql+psycopg2://localhost/db", + "airflow://xcom/dag/task", + True, + id="postgres-reserved-scheme", ), - pytest.param("postgresql://localhost/db", "😊", False, id="postgres-non-ascii"), + pytest.param("postgresql+psycopg2://localhost/db", "😊", False, id="postgres-non-ascii"), ], ) def test_invalid_uris(sql_conn_value, uri, should_raise, monkeypatch):