Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tests/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_clean_unused(session, create_task_instance):
assert session.query(Trigger).one().id == trigger1.id


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_submit_event(session, create_task_instance):
"""
Tests that events submitted to a trigger re-wake their dependent
Expand Down Expand Up @@ -126,6 +127,7 @@ def test_submit_event(session, create_task_instance):
assert updated_task_instance.next_kwargs == {"event": 42, "cheesecake": True}


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_submit_failure(session, create_task_instance):
"""
Tests that failures submitted to a trigger fail their dependent
Expand All @@ -150,6 +152,7 @@ def test_submit_failure(session, create_task_instance):
assert updated_task_instance.next_method == "__fail__"


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize(
"event_cls, expected",
[
Expand Down Expand Up @@ -300,6 +303,7 @@ def test_assign_unassigned(session, create_task_instance):
)


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_get_sorted_triggers_same_priority_weight(session, create_task_instance):
"""
Tests that triggers are sorted by the creation_date if they have the same priority.
Expand Down Expand Up @@ -350,6 +354,7 @@ def test_get_sorted_triggers_same_priority_weight(session, create_task_instance)
assert trigger_ids_query == [(1,), (2,)]


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_get_sorted_triggers_different_priority_weights(session, create_task_instance):
"""
Tests that triggers are sorted by the priority_weight.
Expand Down
1 change: 1 addition & 0 deletions tests/models/test_xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def push_xcom_value(key, value, **context):
dag.run()


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@pytest.mark.parametrize(
"fillvalue, expected_results",
[
Expand Down
8 changes: 8 additions & 0 deletions tests/models/test_xcom_arg_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
pytestmark = pytest.mark.db_test


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map(dag_maker, session):
results = set()
with dag_maker(session=session) as dag:
Expand Down Expand Up @@ -64,6 +65,7 @@ def pull(value):
assert results == {"aa", "bb", "cc"}


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map_transform_to_none(dag_maker, session):
results = set()

Expand Down Expand Up @@ -98,6 +100,7 @@ def c_to_none(v):
assert results == {"a", "b", None}


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_convert_to_kwargs_fails_task(dag_maker, session):
results = set()

Expand Down Expand Up @@ -145,6 +148,7 @@ def c_to_none(v):
]


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map_error_fails_task(dag_maker, session):
with dag_maker(session=session) as dag:

Expand Down Expand Up @@ -241,6 +245,7 @@ def test_task_map_variant():
assert task_map.variant == TaskMapVariant.DICT


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map_raise_to_skip(dag_maker, session):
result = None

Expand Down Expand Up @@ -285,6 +290,7 @@ def skip_c(v):
assert result == ["a", "b"]


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map_nest(dag_maker, session):
results = set()

Expand Down Expand Up @@ -318,6 +324,7 @@ def pull(value):
assert results == {"aa", "bb", "cc"}


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_map_zip_nest(dag_maker, session):
results = set()

Expand Down Expand Up @@ -364,6 +371,7 @@ def convert_zipped(zipped):
assert results == {"aa", "bbbb", "cccccc", "dddddddd"}


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
def test_xcom_concat(dag_maker, session):
from airflow.models.xcom_arg import _ConcatResult

Expand Down
12 changes: 6 additions & 6 deletions tests/utils/test_task_handler_with_custom_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import pytest

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.utils.log.logging_mixin import set_context
Expand Down Expand Up @@ -59,11 +58,11 @@ def custom_task_log_handler_config():


@pytest.fixture
def task_instance():
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
task = EmptyOperator(task_id=TASK_ID, dag=dag)
dagrun = dag.create_dagrun(
DagRunState.RUNNING,
def task_instance(dag_maker):
with dag_maker(DAG_ID, start_date=DEFAULT_DATE, serialized=True) as dag:
task = EmptyOperator(task_id=TASK_ID)
dagrun = dag_maker.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.MANUAL,
data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE),
Expand Down Expand Up @@ -103,6 +102,7 @@ def test_custom_formatter_default_format(task_instance):
assert_prefix_once(task_instance, "")


@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ ti.task_id }}"})
def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
"""Certifies that the prefix is only added once, even after repeated calls"""
Expand Down