Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68266.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Asset-triggered partitioned Dag runs now set ``partition_date`` when the consumer's partition mapper is temporal (directly, or wrapped in ``RollupMapper`` / ``FanOutMapper`` / ``ChainMapper``). Non-temporal mappers leave ``partition_date`` unset.
79 changes: 74 additions & 5 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
from airflow.serialization.definitions.notset import NOTSET
from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES
from airflow.timetables.base import compute_rollup_fingerprint
from airflow.timetables.simple import AssetTriggeredTimetable, PartitionedAssetTimetable
from airflow.timetables.base import Timetable, compute_rollup_fingerprint
from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.triggers.base import TriggerEvent
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -1916,7 +1916,7 @@ def _resolve_asset_partition_status(
name: str,
uri: str,
apdr: AssetPartitionDagRun,
timetable: PartitionedAssetTimetable,
timetable: Timetable,
actual_by_asset: dict[int, set[str]],
) -> bool:
"""
Expand Down Expand Up @@ -1953,6 +1953,68 @@ def _resolve_asset_partition_status(
)
return False

def _resolve_partition_date(
self,
*,
timetable: Timetable,
asset_infos: Iterable[tuple[str, str]],
partition_key: str,
dag_id: str,
) -> datetime | None:
"""
Return the temporal anchor (period-start datetime) for *partition_key*.

Resolves the temporal anchor (period-start datetime) for *partition_key*
across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
that contributed to it. Each upstream mapper resolves the key via
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
temporal mappers decode the key, composite mappers delegate to their
child, and non-temporal mappers (e.g.
:class:`~airflow.partition_mappers.identity.IdentityMapper`) return ``None``.

A partitioned consumer has a single partition identity, so every temporal
mapper feeding it must resolve the same key to the same instant. Anchors
are compared by instant (timezone-aware), so equivalent moments collapse
to one. When the temporal mappers agree, that anchor is returned; when
they disagree — a misconfiguration, e.g. assets mapping the same key under
different timezones — ``partition_date`` is left unset and a warning is
logged rather than silently picking one by scan order. Returns ``None`` if
no mapper is temporal.

A failure in any mapper aborts the whole resolution and returns ``None``
(logged) — anchors accumulated from earlier mappers are discarded rather
than used as a partial result, since a partial set could hide a conflict.
A broken mapper must not crash the scheduler tick.
"""
anchors: set[datetime] = set()
try:
for name, uri in asset_infos:
mapper = timetable.get_partition_mapper(name=name, uri=uri)
anchor = mapper.to_partition_date(partition_key)
if anchor is not None:
anchors.add(anchor)
except Exception:
self.log.exception(
"Failed to resolve partition_date for asset-triggered Dag run; partition_date will be None.",
dag_id=dag_id,
partition_key=partition_key,
)
return None

if not anchors:
return None
if len(anchors) > 1:
self.log.warning(
"Upstream partition mappers resolved conflicting partition_date values for the same "
"key; leaving partition_date unset. The consumer's assets likely use inconsistent "
"partition mappers.",
dag_id=dag_id,
partition_key=partition_key,
partition_dates=sorted(anchor.isoformat() for anchor in anchors),
)
return None
return anchors.pop()

def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[str]:
"""
Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose partition is satisfied.
Expand Down Expand Up @@ -2119,8 +2181,6 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st
for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items():
key = SerializedAssetUniqueKey(name=name, uri=uri)
if timetable.partitioned:
if TYPE_CHECKING:
assert isinstance(timetable, PartitionedAssetTimetable)
statuses[key] = self._resolve_asset_partition_status(
session=session,
asset_id=asset_id,
Expand All @@ -2137,13 +2197,22 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st

partition_dag_ids.add(apdr.target_dag_id)
run_after = timezone.utcnow()
partition_date: datetime | None = None
if timetable.partitioned:
partition_date = self._resolve_partition_date(
timetable=timetable,
asset_infos=asset_info_per_apdr[apdr.id].values(),
partition_key=apdr.partition_key,
dag_id=apdr.target_dag_id,
)
dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=run_after
),
logical_date=None,
data_interval=None,
partition_key=apdr.partition_key,
partition_date=partition_date,
run_after=run_after,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
Expand Down
19 changes: 19 additions & 0 deletions airflow-core/src/airflow/partition_mappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

if TYPE_CHECKING:
from collections.abc import Iterable
from datetime import datetime

from airflow.partition_mappers.window import Window

Expand Down Expand Up @@ -92,6 +93,19 @@ def encode_upstream(self, decoded: Any) -> str:
"""
return decoded

def to_partition_date(self, downstream_key: str) -> datetime | None:
"""
Return the temporal anchor (period-start datetime) for *downstream_key*.

The scheduler stamps this on the asset-triggered Dag run as its
``partition_date``. The base implementation returns ``None`` — a plain
partition key carries no temporal meaning. Temporal mappers override to
decode the key into its window anchor; composite mappers
(:class:`RollupMapper`, :class:`~airflow.partition_mappers.temporal.FanOutMapper`)
delegate to whichever child owns the downstream key's identity.
"""
return None

def serialize(self) -> dict[str, Any]:
return {}

Expand Down Expand Up @@ -138,6 +152,11 @@ def to_upstream(self, downstream_key: str) -> frozenset[str]:
for expected_upstream in self.window.to_upstream(decoded)
)

def to_partition_date(self, downstream_key: str) -> datetime | None:
# The downstream key is in upstream_mapper's format (to_downstream delegates
# to it), so the anchor is the upstream_mapper's to resolve.
return self.upstream_mapper.to_partition_date(downstream_key)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper, encode_window

Expand Down
9 changes: 8 additions & 1 deletion airflow-core/src/airflow/partition_mappers/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
from __future__ import annotations

from collections.abc import Iterable
from typing import Any
from typing import TYPE_CHECKING, Any

from airflow.partition_mappers.base import PartitionMapper

if TYPE_CHECKING:
from datetime import datetime


class ChainMapper(PartitionMapper):
"""Partition mapper that applies multiple mappers sequentially."""
Expand Down Expand Up @@ -60,6 +63,10 @@ def to_downstream(self, key: str) -> str | Iterable[str]:
keys = next_keys
return keys[0] if len(keys) == 1 else keys

def to_partition_date(self, downstream_key: str) -> datetime | None:
# The last mapper in the chain formats the final downstream key, so it owns the anchor.
return self.mappers[-1].to_partition_date(downstream_key)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper

Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/partition_mappers/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ def decode_downstream(self, downstream_key: str) -> datetime:
"""
return datetime.strptime(downstream_key, self.output_format)

def to_partition_date(self, downstream_key: str) -> datetime:
anchor = self.normalize(self.decode_downstream(downstream_key))
# decode_downstream returns a naive datetime; localise it with the mapper's
# own timezone, mirroring to_downstream, so the stored instant is correct.
if anchor.tzinfo is None:
anchor = make_aware(anchor, self._timezone)
return anchor

def encode_upstream(self, dt: datetime) -> str:
"""
Format *dt* as an upstream partition key string.
Expand Down Expand Up @@ -522,6 +530,10 @@ def to_downstream(self, key: str) -> Iterable[str]:
coarse = self.upstream_mapper.decode_downstream(formatted)
return [_format_with(self.downstream_mapper, item) for item in self.window.to_upstream(coarse)]

def to_partition_date(self, downstream_key: str) -> datetime | None:
# Fan-out keys are formatted by downstream_mapper, so it owns the anchor.
return self.downstream_mapper.to_partition_date(downstream_key)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper, encode_window

Expand Down
167 changes: 165 additions & 2 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,18 @@
PartitionMapper as CorePartitionMapper,
RollupMapper as CoreRollupMapper,
)
from airflow.partition_mappers.temporal import StartOfHourMapper as CoreStartOfHourMapper
from airflow.partition_mappers.window import DayWindow as CoreDayWindow, HourWindow as CoreHourWindow
from airflow.partition_mappers.identity import IdentityMapper as CoreIdentityMapper
from airflow.partition_mappers.temporal import (
FanOutMapper as CoreFanOutMapper,
StartOfDayMapper as CoreStartOfDayMapper,
StartOfHourMapper as CoreStartOfHourMapper,
StartOfWeekMapper as CoreStartOfWeekMapper,
)
from airflow.partition_mappers.window import (
DayWindow as CoreDayWindow,
HourWindow as CoreHourWindow,
WeekWindow as CoreWeekWindow,
)
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
Expand All @@ -109,6 +119,7 @@
IdentityMapper,
RollupMapper,
SegmentWindow,
StartOfDayMapper,
StartOfHourMapper,
task,
)
Expand Down Expand Up @@ -11712,3 +11723,155 @@ def test_reaper_ignores_terminal_states(self, scheduler_job_runner_for_connectio
session.expire_all()
assert session.get(ConnectionTestRequest, ct_success.id).state == ConnectionTestState.SUCCESS
assert session.get(ConnectionTestRequest, ct_failed.id).state == ConnectionTestState.FAILED


@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
@pytest.mark.parametrize(
("sdk_mapper", "upstream_partition_key", "expected_downstream_key", "expected_partition_date"),
[
(
StartOfDayMapper(),
"2024-03-15T10:30:00",
"2024-03-15",
datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc),
),
(
RollupMapper(upstream_mapper=StartOfHourMapper(), window=HourWindow()),
"2024-01-01T00:00:00",
"2024-01-01T00",
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc),
),
(
IdentityMapper(),
"key-abc",
"key-abc",
None,
),
],
)
def test_partition_date_populated_on_dagrun(
dag_maker: DagMaker,
session: Session,
sdk_mapper,
upstream_partition_key,
expected_downstream_key,
expected_partition_date,
):
"""DagRun.partition_date is set correctly for temporal / rollup-of-temporal mappers."""
asset_1 = Asset(name="asset-pd-test")

with dag_maker(
dag_id="partition-date-consumer",
schedule=PartitionedAssetTimetable(
assets=asset_1,
default_partition_mapper=sdk_mapper,
),
session=session,
):
EmptyOperator(task_id="hi")
session.commit()

runner = SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)

apdr = _produce_and_register_asset_event(
dag_id="partition-date-producer",
asset=asset_1,
partition_key=upstream_partition_key,
session=session,
dag_maker=dag_maker,
expected_partition_key=expected_downstream_key,
)

# For the rollup case, send all 60 minute keys so the window is complete.
if isinstance(sdk_mapper, RollupMapper):
for minute in range(1, 60):
_produce_and_register_asset_event(
dag_id=f"partition-date-producer-{minute}",
asset=asset_1,
partition_key=f"2024-01-01T00:{minute:02d}:00",
session=session,
dag_maker=dag_maker,
expected_partition_key=expected_downstream_key,
)

runner._create_dagruns_for_partitioned_asset_dags(session=session)
session.refresh(apdr)

assert apdr.created_dag_run_id is not None
dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id))
assert dag_run is not None
assert dag_run.partition_date == expected_partition_date


def _make_runner() -> SchedulerJobRunner:
return SchedulerJobRunner(
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
)


@pytest.mark.parametrize(
("mappers", "partition_key", "expected"),
[
# Non-temporal mapper → no anchor.
pytest.param([CoreIdentityMapper()], "some-key", None, id="non-temporal-none"),
# StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, DST since 2024-03-10),
# localised with the mapper's own timezone rather than the global default.
pytest.param(
[CoreStartOfDayMapper(timezone="America/New_York")],
"2024-03-15",
datetime.datetime(2024, 3, 15, 4, 0, 0, tzinfo=datetime.timezone.utc),
id="non-utc-uses-mapper-timezone",
),
# Key cannot be decoded by the mapper's format → caught → None (no raise).
pytest.param([CoreStartOfDayMapper()], "not-a-date", None, id="decode-failure-none"),
# FanOutMapper unwraps to its downstream_mapper (daily), which owns the per-day key.
pytest.param(
[CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), window=CoreWeekWindow())],
"2024-01-16",
datetime.datetime(2024, 1, 16, 0, 0, 0, tzinfo=datetime.timezone.utc),
id="fanout-uses-downstream-mapper",
),
# Two temporal mappers resolving the same instant → that single anchor.
pytest.param(
[CoreStartOfDayMapper(), CoreStartOfDayMapper()],
"2024-03-15",
datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc),
id="agreeing-mappers-anchor",
),
# Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None.
pytest.param(
[CoreStartOfDayMapper(timezone="UTC"), CoreStartOfDayMapper(timezone="America/New_York")],
"2024-03-15",
None,
id="conflicting-mappers-none",
),
# Second mapper (hour format) raises on the day key → whole resolution aborts → None
# (the first mapper's anchor is discarded; all-or-nothing).
pytest.param(
[CoreStartOfDayMapper(), CoreStartOfHourMapper()],
"2024-03-15",
None,
id="one-failing-mapper-aborts",
),
],
)
def test_resolve_partition_date(mappers, partition_key, expected):
"""_resolve_partition_date over mapper compositions: temporal / fan-out / agree / conflict / failure.

The mappers are consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``.
"""
runner = _make_runner()
timetable = mock.MagicMock()
timetable.get_partition_mapper.side_effect = mappers
asset_infos = [(f"asset-{i}-name", f"asset-{i}-uri") for i in range(len(mappers))]

result = runner._resolve_partition_date(
timetable=timetable,
asset_infos=asset_infos,
partition_key=partition_key,
dag_id="test-dag",
)
assert result == expected
Loading
Loading