Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airflow-core/docs/core-concepts/multi-team.rst
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ is uppercase.
export AIRFLOW__TEAM_B___CELERY__BROKER_URL="redis://team-b-redis:6379/0"

# team_b's Celery result backend
export AIRFLOW__TEAM_B___CELERY__RESULT_BACKEND="db+postgresql://team-b-db/celery_results"
export AIRFLOW__TEAM_B___CELERY__RESULT_BACKEND="db+postgresql+psycopg2://team-b-db/celery_results"
Comment thread
ashb marked this conversation as resolved.

Via Config File
"""""""""""""""
Expand All @@ -499,17 +499,17 @@ name followed by an equals sign:
# Global celery settings (used by the global executor, NOT as a fallback for teams)
[celery]
broker_url = redis://default-redis:6379/0
result_backend = db+postgresql://default-db/celery_results
result_backend = db+postgresql+psycopg2://default-db/celery_results

# team_a overrides
[team_a=celery]
broker_url = redis://team-a-redis:6379/0
result_backend = db+postgresql://team-a-db/celery_results
result_backend = db+postgresql+psycopg2://team-a-db/celery_results

# team_b overrides
[team_b=celery]
broker_url = redis://team-b-redis:6379/0
result_backend = db+postgresql://team-b-db/celery_results
result_backend = db+postgresql+psycopg2://team-b-db/celery_results

Dag Bundle to Team Association
------------------------------
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/docs/howto/set-up-database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ in the Postgres documentation to learn more.

.. warning::

When you use SQLAlchemy 1.4.0+, you need to use ``postgresql://`` as the database in the ``sql_alchemy_conn``.
When you use SQLAlchemy 1.4.0+, you need to use ``postgresql+psycopg2://`` as the database in the ``sql_alchemy_conn``.
In the previous versions of SQLAlchemy it was possible to use ``postgres://``, but using it in
SQLAlchemy 1.4.0+ results in:

Expand Down Expand Up @@ -353,7 +353,7 @@ For instance, you can specify a database schema where Airflow will create its re

.. code-block:: bash

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"

Note the ``search_path`` at the end of the ``SQL_ALCHEMY_CONN`` database URL.
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68314.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The default PostgreSQL connection string now uses an explicit ``postgresql+psycopg2://`` SQLAlchemy dialect instead of the bare ``postgresql://`` scheme. SQLAlchemy 2.1 changes the implicit default from psycopg2 to psycopg (v3), so bare URLs silently change driver across upgrades. If your ``sql_alchemy_conn`` or Celery ``result_backend`` uses ``postgresql://``, switch to ``postgresql+psycopg2://`` (or ``postgresql+psycopg://`` if you intend to use psycopg v3). Airflow will continue to auto-upgrade legacy ``postgres://`` schemes on startup.
7 changes: 4 additions & 3 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,13 @@ def _upgrade_postgres_metastore_conn(self):
Upgrade SQL schemas.

As of SQLAlchemy 1.4, schemes `postgres+psycopg2` and `postgres`
must be replaced with `postgresql`.
must be replaced with `postgresql+psycopg2`. The bare `postgresql`
scheme is also upgraded to make the psycopg2 driver explicit.
"""
section, key = "database", "sql_alchemy_conn"
old_value = self.get(section, key, _extra_stacklevel=1)
bad_schemes = ["postgres+psycopg2", "postgres"]
good_scheme = "postgresql"
bad_schemes = ["postgres+psycopg2", "postgres", "postgresql"]
good_scheme = "postgresql+psycopg2"
parsed = urlsplit(old_value)
if parsed.scheme in bad_schemes:
warnings.warn(
Expand Down
39 changes: 37 additions & 2 deletions airflow-core/tests/unit/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,9 @@ def test_as_dict_respects_sensitive_cmds(self):
assert conf_maintain_cmds["database"]["sql_alchemy_conn"] == conf_conn

@mock.patch.dict(
"os.environ", {"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql://'"}, clear=True
"os.environ",
{"AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD": "echo -n 'postgresql+psycopg2://'"},
clear=True,
)
def test_as_dict_respects_sensitive_cmds_from_env(self):
test_conf = copy.deepcopy(conf)
Expand All @@ -811,7 +813,7 @@ def test_as_dict_respects_sensitive_cmds_from_env(self):
assert "sql_alchemy_conn" in conf_materialize_cmds["database"]
assert "sql_alchemy_conn_cmd" not in conf_materialize_cmds["database"]

assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "postgresql://"
assert conf_materialize_cmds["database"]["sql_alchemy_conn"] == "postgresql+psycopg2://"

@skip_if_force_lowest_dependencies_marker
@conf_vars(
Expand Down Expand Up @@ -1118,6 +1120,39 @@ def test_write_handles_multiline_non_json_string(self):

assert expected_raw_output in content

@pytest.mark.parametrize(
("input_scheme", "expected_scheme"),
[
pytest.param(
"postgres://user:pass@host/db", "postgresql+psycopg2://user:pass@host/db", id="postgres"
),
pytest.param(
"postgres+psycopg2://user:pass@host/db",
"postgresql+psycopg2://user:pass@host/db",
id="postgres+psycopg2",
),
pytest.param(
"postgresql://user:pass@host/db",
"postgresql+psycopg2://user:pass@host/db",
id="postgresql-bare",
),
pytest.param(
"postgresql+psycopg2://user:pass@host/db",
"postgresql+psycopg2://user:pass@host/db",
id="postgresql+psycopg2-noop",
),
pytest.param("mysql://user:pass@host/db", "mysql://user:pass@host/db", id="mysql-untouched"),
],
)
@mock.patch.dict("os.environ", {}, clear=False)
def test_upgrade_postgres_metastore_conn(self, input_scheme, expected_scheme):
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = input_scheme
test_conf = AirflowConfigParser()
with warnings.catch_warnings():
warnings.simplefilter("ignore", FutureWarning)
test_conf._upgrade_postgres_metastore_conn()
assert test_conf.get("database", "sql_alchemy_conn") == expected_scheme


@mock.patch.dict(
"os.environ",
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/tests/unit/models/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@pytest.mark.parametrize(
("dsn", "expected", "extra"),
[
pytest.param("postgresql://host/the_database", {}, {}, id="postgres"),
pytest.param("postgresql+psycopg2://host/the_database", {}, {}, id="postgres"),
pytest.param("mysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql"),
pytest.param(
"mysql+pymysql://host/the_database", {"collation": "utf8mb3_bin"}, {}, id="mysql+pymysql"
Expand All @@ -40,7 +40,7 @@
id="mysql with explicit config",
),
pytest.param(
"postgresql://host/the_database",
"postgresql+psycopg2://host/the_database",
{"collation": "ascii"},
{("database", "sql_engine_collation_for_ids"): "ascii"},
id="postgres with explicit config",
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class TestAutocommitEngineForMySQL:
def test_non_mysql_database_is_noop(self, mocker):
"""Test that non-MySQL databases don't trigger any changes."""
# Mock settings to use PostgreSQL
mocker.patch.object(settings, "SQL_ALCHEMY_CONN", "postgresql://user:pass@localhost/db")
mocker.patch.object(settings, "SQL_ALCHEMY_CONN", "postgresql+psycopg2://user:pass@localhost/db")
mock_dispose = mocker.patch.object(settings, "dispose_orm")
mock_configure = mocker.patch.object(settings, "configure_orm")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
# Has to be run in parallel with a task to create BigTable table
[core]
executor = LocalExecutor
sql_alchemy_conn = postgresql:///airflow/airflow.db
sql_alchemy_conn = postgresql+psycopg2:///airflow/airflow.db
2 changes: 1 addition & 1 deletion providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ config:
version_added: ~
type: string
sensitive: true
example: "db+postgresql://postgres:airflow@postgres/airflow"
example: "db+postgresql+psycopg2://postgres:airflow@postgres/airflow"
default: ~
result_backend_sqlalchemy_engine_options:
description: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_provider_info():
"version_added": None,
"type": "string",
"sensitive": True,
"example": "db+postgresql://postgres:airflow@postgres/airflow",
"example": "db+postgresql+psycopg2://postgres:airflow@postgres/airflow",
"default": None,
},
"result_backend_sqlalchemy_engine_options": {
Expand Down
21 changes: 12 additions & 9 deletions task-sdk/tests/task_sdk/definitions/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
pytest.param("sqlite:///:memory:", "\n\t", True, id="sqlite-whitespace"),
pytest.param("sqlite:///:memory:", "a" * 1501, True, id="sqlite-too-long"),
pytest.param("sqlite:///:memory:", "😊", False, id="sqlite-non-ascii"),
pytest.param("postgresql://localhost/db", "", True, id="postgres-empty"),
pytest.param("postgresql://localhost/db", "\n\t", True, id="postgres-whitespace"),
pytest.param("postgresql://localhost/db", "a" * 1501, True, id="postgres-too-long"),
pytest.param("postgresql://localhost/db", "😊", False, id="postgres-non-ascii"),
pytest.param("postgresql+psycopg2://localhost/db", "", True, id="postgres-empty"),
pytest.param("postgresql+psycopg2://localhost/db", "\n\t", True, id="postgres-whitespace"),
pytest.param("postgresql+psycopg2://localhost/db", "a" * 1501, True, id="postgres-too-long"),
pytest.param("postgresql+psycopg2://localhost/db", "😊", False, id="postgres-non-ascii"),
],
)
def test_invalid_names(sql_conn_value, name, should_raise, monkeypatch):
Expand All @@ -84,13 +84,16 @@ def test_invalid_names(sql_conn_value, name, should_raise, monkeypatch):
pytest.param("sqlite:///:memory:", "a" * 1501, True, id="sqlite-too-long"),
pytest.param("sqlite:///:memory:", "airflow://xcom/dag/task", True, id="sqlite-reserved-scheme"),
pytest.param("sqlite:///:memory:", "😊", False, id="sqlite-non-ascii"),
pytest.param("postgresql://localhost/db", "", True, id="postgres-empty"),
pytest.param("postgresql://localhost/db", "\n\t", True, id="postgres-whitespace"),
pytest.param("postgresql://localhost/db", "a" * 1501, True, id="postgres-too-long"),
pytest.param("postgresql+psycopg2://localhost/db", "", True, id="postgres-empty"),
pytest.param("postgresql+psycopg2://localhost/db", "\n\t", True, id="postgres-whitespace"),
pytest.param("postgresql+psycopg2://localhost/db", "a" * 1501, True, id="postgres-too-long"),
pytest.param(
"postgresql://localhost/db", "airflow://xcom/dag/task", True, id="postgres-reserved-scheme"
"postgresql+psycopg2://localhost/db",
"airflow://xcom/dag/task",
True,
id="postgres-reserved-scheme",
),
pytest.param("postgresql://localhost/db", "😊", False, id="postgres-non-ascii"),
pytest.param("postgresql+psycopg2://localhost/db", "😊", False, id="postgres-non-ascii"),
],
)
def test_invalid_uris(sql_conn_value, uri, should_raise, monkeypatch):
Expand Down
Loading