From 77ebd6b878672076890ebed65e421c3a993edc57 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 9 Jun 2026 11:26:16 +0800 Subject: [PATCH 1/2] fix(scheduler): populate partition_date for temporal asset partitions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Asset-triggered partitioned Dag runs now set partition_date when the partition is temporal — a temporal mapper directly, or composed in a RollupMapper (fan-in), FanOutMapper (fan-out), or ChainMapper. The anchor is localized with the mapper's own timezone so it lands on the same UTC instant as the cron partition path; non-temporal mappers leave it unset. Adds PartitionMapper.to_partition_date(downstream_key): temporal mappers decode the key to their window anchor and composites delegate to the child that owns the downstream key, so the scheduler resolves the date polymorphically instead of type-switching. When upstream mappers disagree on the instant for a key, partition_date is left unset and logged rather than guessed. --- airflow-core/newsfragments/68266.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 73 ++++++++ .../src/airflow/partition_mappers/base.py | 19 ++ .../src/airflow/partition_mappers/chain.py | 9 +- .../src/airflow/partition_mappers/temporal.py | 12 ++ .../tests/unit/jobs/test_scheduler_job.py | 167 +++++++++++++++++- .../unit/partition_mappers/test_chain.py | 18 ++ .../unit/partition_mappers/test_temporal.py | 95 ++++++++++ 8 files changed, 391 insertions(+), 3 deletions(-) create mode 100644 airflow-core/newsfragments/68266.bugfix.rst diff --git a/airflow-core/newsfragments/68266.bugfix.rst b/airflow-core/newsfragments/68266.bugfix.rst new file mode 100644 index 0000000000000..0d41b654186c5 --- /dev/null +++ b/airflow-core/newsfragments/68266.bugfix.rst @@ -0,0 +1 @@ +Asset-triggered partitioned Dag runs now set ``partition_date`` when the consumer's partition mapper is temporal (directly, or wrapped in ``RollupMapper`` / ``FanOutMapper`` / ``ChainMapper``). Non-temporal mappers leave ``partition_date`` unset. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index a6496665e8b0d..7987d79150dce 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1953,6 +1953,68 @@ def _resolve_asset_partition_status( ) return False + def _resolve_partition_date( + self, + *, + timetable: PartitionedAssetTimetable, + asset_infos: Iterable[tuple[str, str]], + partition_key: str, + dag_id: str, + ) -> datetime | None: + """ + Return the temporal anchor (period-start datetime) for *partition_key*. + + Resolves the temporal anchor (period-start datetime) for *partition_key* + across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets + that contributed to it. Each upstream mapper resolves the key via + :meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`: + temporal mappers decode the key, composite mappers delegate to their + child, and non-temporal mappers (e.g. + :class:`~airflow.partition_mappers.identity.IdentityMapper`) return ``None``. + + A partitioned consumer has a single partition identity, so every temporal + mapper feeding it must resolve the same key to the same instant. Anchors + are compared by instant (timezone-aware), so equivalent moments collapse + to one. When the temporal mappers agree, that anchor is returned; when + they disagree — a misconfiguration, e.g. assets mapping the same key under + different timezones — ``partition_date`` is left unset and a warning is + logged rather than silently picking one by scan order. Returns ``None`` if + no mapper is temporal. + + A failure in any mapper aborts the whole resolution and returns ``None`` + (logged) — anchors accumulated from earlier mappers are discarded rather + than used as a partial result, since a partial set could hide a conflict. + A broken mapper must not crash the scheduler tick. + """ + anchors: set[datetime] = set() + try: + for name, uri in asset_infos: + mapper = timetable.get_partition_mapper(name=name, uri=uri) + anchor = mapper.to_partition_date(partition_key) + if anchor is not None: + anchors.add(anchor) + except Exception: + self.log.exception( + "Failed to resolve partition_date for asset-triggered Dag run; partition_date will be None.", + dag_id=dag_id, + partition_key=partition_key, + ) + return None + + if not anchors: + return None + if len(anchors) > 1: + self.log.warning( + "Upstream partition mappers resolved conflicting partition_date values for the same " + "key; leaving partition_date unset. The consumer's assets likely use inconsistent " + "partition mappers.", + dag_id=dag_id, + partition_key=partition_key, + partition_dates=sorted(anchor.isoformat() for anchor in anchors), + ) + return None + return anchors.pop() + def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[str]: """ Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose partition is satisfied. @@ -2137,6 +2199,16 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st partition_dag_ids.add(apdr.target_dag_id) run_after = timezone.utcnow() + partition_date: datetime | None = None + if timetable.partitioned: + if TYPE_CHECKING: + assert isinstance(timetable, PartitionedAssetTimetable) + partition_date = self._resolve_partition_date( + timetable=timetable, + asset_infos=asset_info_per_apdr[apdr.id].values(), + partition_key=apdr.partition_key, + dag_id=apdr.target_dag_id, + ) dag_run = dag.create_dagrun( run_id=DagRun.generate_run_id( run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=run_after @@ -2144,6 +2216,7 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st logical_date=None, data_interval=None, partition_key=apdr.partition_key, + partition_date=partition_date, run_after=run_after, run_type=DagRunType.ASSET_TRIGGERED, triggered_by=DagRunTriggeredByType.ASSET, diff --git a/airflow-core/src/airflow/partition_mappers/base.py b/airflow-core/src/airflow/partition_mappers/base.py index ba4a3ee658e9f..ee70842cdcf4a 100644 --- a/airflow-core/src/airflow/partition_mappers/base.py +++ b/airflow-core/src/airflow/partition_mappers/base.py @@ -22,6 +22,7 @@ if TYPE_CHECKING: from collections.abc import Iterable + from datetime import datetime from airflow.partition_mappers.window import Window @@ -92,6 +93,19 @@ def encode_upstream(self, decoded: Any) -> str: """ return decoded + def to_partition_date(self, downstream_key: str) -> datetime | None: + """ + Return the temporal anchor (period-start datetime) for *downstream_key*. + + The scheduler stamps this on the asset-triggered Dag run as its + ``partition_date``. The base implementation returns ``None`` — a plain + partition key carries no temporal meaning. Temporal mappers override to + decode the key into its window anchor; composite mappers + (:class:`RollupMapper`, :class:`~airflow.partition_mappers.temporal.FanOutMapper`) + delegate to whichever child owns the downstream key's identity. + """ + return None + def serialize(self) -> dict[str, Any]: return {} @@ -138,6 +152,11 @@ def to_upstream(self, downstream_key: str) -> frozenset[str]: for expected_upstream in self.window.to_upstream(decoded) ) + def to_partition_date(self, downstream_key: str) -> datetime | None: + # The downstream key is in upstream_mapper's format (to_downstream delegates + # to it), so the anchor is the upstream_mapper's to resolve. + return self.upstream_mapper.to_partition_date(downstream_key) + def serialize(self) -> dict[str, Any]: from airflow.serialization.encoders import encode_partition_mapper, encode_window diff --git a/airflow-core/src/airflow/partition_mappers/chain.py b/airflow-core/src/airflow/partition_mappers/chain.py index 850517482cfaf..a4a2110c25628 100644 --- a/airflow-core/src/airflow/partition_mappers/chain.py +++ b/airflow-core/src/airflow/partition_mappers/chain.py @@ -18,10 +18,13 @@ from __future__ import annotations from collections.abc import Iterable -from typing import Any +from typing import TYPE_CHECKING, Any from airflow.partition_mappers.base import PartitionMapper +if TYPE_CHECKING: + from datetime import datetime + class ChainMapper(PartitionMapper): """Partition mapper that applies multiple mappers sequentially.""" @@ -60,6 +63,10 @@ def to_downstream(self, key: str) -> str | Iterable[str]: keys = next_keys return keys[0] if len(keys) == 1 else keys + def to_partition_date(self, downstream_key: str) -> datetime | None: + # The last mapper in the chain formats the final downstream key, so it owns the anchor. + return self.mappers[-1].to_partition_date(downstream_key) + def serialize(self) -> dict[str, Any]: from airflow.serialization.encoders import encode_partition_mapper diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index 4fc1ed45ed35c..622f0c6a556c1 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -198,6 +198,14 @@ def decode_downstream(self, downstream_key: str) -> datetime: """ return datetime.strptime(downstream_key, self.output_format) + def to_partition_date(self, downstream_key: str) -> datetime: + anchor = self.normalize(self.decode_downstream(downstream_key)) + # decode_downstream returns a naive datetime; localise it with the mapper's + # own timezone, mirroring to_downstream, so the stored instant is correct. + if anchor.tzinfo is None: + anchor = make_aware(anchor, self._timezone) + return anchor + def encode_upstream(self, dt: datetime) -> str: """ Format *dt* as an upstream partition key string. @@ -522,6 +530,10 @@ def to_downstream(self, key: str) -> Iterable[str]: coarse = self.upstream_mapper.decode_downstream(formatted) return [_format_with(self.downstream_mapper, item) for item in self.window.to_upstream(coarse)] + def to_partition_date(self, downstream_key: str) -> datetime | None: + # Fan-out keys are formatted by downstream_mapper, so it owns the anchor. + return self.downstream_mapper.to_partition_date(downstream_key) + def serialize(self) -> dict[str, Any]: from airflow.serialization.encoders import encode_partition_mapper, encode_window diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index d08628a51fbcd..6b9c6e343097f 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -93,8 +93,18 @@ PartitionMapper as CorePartitionMapper, RollupMapper as CoreRollupMapper, ) -from airflow.partition_mappers.temporal import StartOfHourMapper as CoreStartOfHourMapper -from airflow.partition_mappers.window import DayWindow as CoreDayWindow, HourWindow as CoreHourWindow +from airflow.partition_mappers.identity import IdentityMapper as CoreIdentityMapper +from airflow.partition_mappers.temporal import ( + FanOutMapper as CoreFanOutMapper, + StartOfDayMapper as CoreStartOfDayMapper, + StartOfHourMapper as CoreStartOfHourMapper, + StartOfWeekMapper as CoreStartOfWeekMapper, +) +from airflow.partition_mappers.window import ( + DayWindow as CoreDayWindow, + HourWindow as CoreHourWindow, + WeekWindow as CoreWeekWindow, +) from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.file import FileDeleteTrigger @@ -109,6 +119,7 @@ IdentityMapper, RollupMapper, SegmentWindow, + StartOfDayMapper, StartOfHourMapper, task, ) @@ -11712,3 +11723,155 @@ def test_reaper_ignores_terminal_states(self, scheduler_job_runner_for_connectio session.expire_all() assert session.get(ConnectionTestRequest, ct_success.id).state == ConnectionTestState.SUCCESS assert session.get(ConnectionTestRequest, ct_failed.id).state == ConnectionTestState.FAILED + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +@pytest.mark.parametrize( + ("sdk_mapper", "upstream_partition_key", "expected_downstream_key", "expected_partition_date"), + [ + ( + StartOfDayMapper(), + "2024-03-15T10:30:00", + "2024-03-15", + datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + RollupMapper(upstream_mapper=StartOfHourMapper(), window=HourWindow()), + "2024-01-01T00:00:00", + "2024-01-01T00", + datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + IdentityMapper(), + "key-abc", + "key-abc", + None, + ), + ], +) +def test_partition_date_populated_on_dagrun( + dag_maker: DagMaker, + session: Session, + sdk_mapper, + upstream_partition_key, + expected_downstream_key, + expected_partition_date, +): + """DagRun.partition_date is set correctly for temporal / rollup-of-temporal mappers.""" + asset_1 = Asset(name="asset-pd-test") + + with dag_maker( + dag_id="partition-date-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=sdk_mapper, + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="partition-date-producer", + asset=asset_1, + partition_key=upstream_partition_key, + session=session, + dag_maker=dag_maker, + expected_partition_key=expected_downstream_key, + ) + + # For the rollup case, send all 60 minute keys so the window is complete. + if isinstance(sdk_mapper, RollupMapper): + for minute in range(1, 60): + _produce_and_register_asset_event( + dag_id=f"partition-date-producer-{minute}", + asset=asset_1, + partition_key=f"2024-01-01T00:{minute:02d}:00", + session=session, + dag_maker=dag_maker, + expected_partition_key=expected_downstream_key, + ) + + runner._create_dagruns_for_partitioned_asset_dags(session=session) + session.refresh(apdr) + + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_date == expected_partition_date + + +def _make_runner() -> SchedulerJobRunner: + return SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + +@pytest.mark.parametrize( + ("mappers", "partition_key", "expected"), + [ + # Non-temporal mapper → no anchor. + pytest.param([CoreIdentityMapper()], "some-key", None, id="non-temporal-none"), + # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, DST since 2024-03-10), + # localised with the mapper's own timezone rather than the global default. + pytest.param( + [CoreStartOfDayMapper(timezone="America/New_York")], + "2024-03-15", + datetime.datetime(2024, 3, 15, 4, 0, 0, tzinfo=datetime.timezone.utc), + id="non-utc-uses-mapper-timezone", + ), + # Key cannot be decoded by the mapper's format → caught → None (no raise). + pytest.param([CoreStartOfDayMapper()], "not-a-date", None, id="decode-failure-none"), + # FanOutMapper unwraps to its downstream_mapper (daily), which owns the per-day key. + pytest.param( + [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), window=CoreWeekWindow())], + "2024-01-16", + datetime.datetime(2024, 1, 16, 0, 0, 0, tzinfo=datetime.timezone.utc), + id="fanout-uses-downstream-mapper", + ), + # Two temporal mappers resolving the same instant → that single anchor. + pytest.param( + [CoreStartOfDayMapper(), CoreStartOfDayMapper()], + "2024-03-15", + datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc), + id="agreeing-mappers-anchor", + ), + # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None. + pytest.param( + [CoreStartOfDayMapper(timezone="UTC"), CoreStartOfDayMapper(timezone="America/New_York")], + "2024-03-15", + None, + id="conflicting-mappers-none", + ), + # Second mapper (hour format) raises on the day key → whole resolution aborts → None + # (the first mapper's anchor is discarded; all-or-nothing). + pytest.param( + [CoreStartOfDayMapper(), CoreStartOfHourMapper()], + "2024-03-15", + None, + id="one-failing-mapper-aborts", + ), + ], +) +def test_resolve_partition_date(mappers, partition_key, expected): + """_resolve_partition_date over mapper compositions: temporal / fan-out / agree / conflict / failure. + + The mappers are consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``. + """ + runner = _make_runner() + timetable = mock.MagicMock() + timetable.get_partition_mapper.side_effect = mappers + asset_infos = [(f"asset-{i}-name", f"asset-{i}-uri") for i in range(len(mappers))] + + result = runner._resolve_partition_date( + timetable=timetable, + asset_infos=asset_infos, + partition_key=partition_key, + dag_id="test-dag", + ) + assert result == expected diff --git a/airflow-core/tests/unit/partition_mappers/test_chain.py b/airflow-core/tests/unit/partition_mappers/test_chain.py index 79f888af75f33..528a176b1e0cc 100644 --- a/airflow-core/tests/unit/partition_mappers/test_chain.py +++ b/airflow-core/tests/unit/partition_mappers/test_chain.py @@ -17,6 +17,8 @@ from __future__ import annotations +from datetime import datetime, timezone + import pytest from airflow.partition_mappers.base import PartitionMapper @@ -40,6 +42,22 @@ def test_to_downstream(self): sm = ChainMapper(StartOfHourMapper(), StartOfDayMapper(input_format="%Y-%m-%dT%H")) assert sm.to_downstream("2024-01-15T10:30:00") == "2024-01-15" + @pytest.mark.parametrize( + ("chain", "downstream_key", "expected"), + [ + # Last mapper temporal → it owns the final downstream key, so it owns the anchor. + ( + ChainMapper(IdentityMapper(), StartOfDayMapper()), + "2024-03-15", + datetime(2024, 3, 15, 0, 0, 0, tzinfo=timezone.utc), + ), + # Last mapper non-temporal → no anchor. + (ChainMapper(StartOfDayMapper(), IdentityMapper()), "anything", None), + ], + ) + def test_to_partition_date_delegates_to_last_mapper(self, chain, downstream_key, expected): + assert chain.to_partition_date(downstream_key) == expected + def test_to_downstream_invalid_non_iterable_return(self): sm = ChainMapper(IdentityMapper(), _InvalidReturnMapper()) with pytest.raises(TypeError, match="must return a string or iterable of strings"): diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py b/airflow-core/tests/unit/partition_mappers/test_temporal.py index 6ec84893506ae..ab2a79f52ce0d 100644 --- a/airflow-core/tests/unit/partition_mappers/test_temporal.py +++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py @@ -16,11 +16,16 @@ # under the License. from __future__ import annotations +from datetime import datetime, timezone as dt_timezone + import pendulum import pytest from airflow import sdk +from airflow.partition_mappers.base import RollupMapper +from airflow.partition_mappers.identity import IdentityMapper from airflow.partition_mappers.temporal import ( + FanOutMapper, StartOfDayMapper, StartOfHourMapper, StartOfMonthMapper, @@ -30,6 +35,7 @@ _BaseTemporalMapper, _compile_output_format_regex, ) +from airflow.partition_mappers.window import HourWindow, WeekWindow from airflow.serialization.decoders import decode_partition_mapper from airflow.serialization.encoders import encode_partition_mapper @@ -352,3 +358,92 @@ def test_separator_between_default_placeholders_is_allowed(self): assert match is not None assert match.group("first") == "foo" assert match.group("last") == "bar" + + +class TestTemporalMapperDecodeNormalizeRoundTrip: + """ + ``normalize(decode_downstream(to_downstream(dt)))`` must equal the anchor + produced by ``normalize(dt)`` for every ``_BaseTemporalMapper`` subclass. + + This is the "Step 2" semantic guarantee: ``decode_downstream`` reconstructs + the period-start, and ``normalize`` is idempotent, so the composed call + used in ``_resolve_partition_date`` must not drift from the direct anchor. + """ + + SAMPLE_DT = datetime(2024, 3, 15, 10, 42, 35) + + @pytest.mark.parametrize( + "mapper", + [ + StartOfHourMapper(), + StartOfDayMapper(), + StartOfWeekMapper(), + StartOfMonthMapper(), + StartOfQuarterMapper(), + StartOfYearMapper(), + ], + ) + def test_round_trip_anchor_is_stable(self, mapper: _BaseTemporalMapper): + """``normalize(decode_downstream(to_downstream(dt)))`` == ``normalize(dt)``.""" + downstream_key = mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format)) + decoded = mapper.decode_downstream(downstream_key) + round_tripped = mapper.normalize(decoded) + direct_anchor = mapper.normalize(self.SAMPLE_DT) + assert round_tripped == direct_anchor, ( + f"{type(mapper).__name__}: round-trip anchor {round_tripped!r} " + f"differs from direct anchor {direct_anchor!r}" + ) + + @pytest.mark.parametrize( + ("mapper", "expected_aware"), + [ + # UTC mapper: UTC midnight stays at 00:00 UTC. + ( + StartOfDayMapper(timezone="UTC"), + datetime(2024, 3, 15, 0, 0, 0, tzinfo=dt_timezone.utc), + ), + # Non-UTC mapper: NY midnight (EDT = UTC-4) → 04:00 UTC. + ( + StartOfDayMapper(timezone="America/New_York"), + datetime(2024, 3, 15, 4, 0, 0, tzinfo=dt_timezone.utc), + ), + ], + ) + def test_to_partition_date_uses_mapper_timezone( + self, mapper: _BaseTemporalMapper, expected_aware: datetime + ): + """``to_partition_date`` localises the anchor with ``mapper._timezone``, not the global default.""" + downstream_key = mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format)) + aware = mapper.to_partition_date(downstream_key) + # Convert to UTC for a timezone-neutral comparison. + aware_utc = aware.astimezone(dt_timezone.utc) + assert aware_utc == expected_aware, ( + f"{type(mapper).__name__} (tz={mapper._timezone}): " + f"to_partition_date produced {aware_utc!r}, expected {expected_aware!r}" + ) + + +class TestToPartitionDateDelegation: + """Composite mappers delegate ``to_partition_date`` to the child that owns the downstream key.""" + + @pytest.mark.parametrize( + ("mapper", "downstream_key", "expected"), + [ + # RollupMapper (fan-in): downstream key is the upstream_mapper's format → it owns it. + ( + RollupMapper(upstream_mapper=StartOfHourMapper(), window=HourWindow()), + "2024-01-01T00", + datetime(2024, 1, 1, 0, 0, 0, tzinfo=dt_timezone.utc), + ), + # FanOutMapper (fan-out): downstream keys are the downstream_mapper's format → it owns them. + ( + FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow()), + "2024-01-16", + datetime(2024, 1, 16, 0, 0, 0, tzinfo=dt_timezone.utc), + ), + # Non-temporal mapper → no anchor. + (IdentityMapper(), "anything", None), + ], + ) + def test_to_partition_date(self, mapper, downstream_key, expected): + assert mapper.to_partition_date(downstream_key) == expected From de416b5f3f7c9e7196604b84aa2aaa9e267acd23 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 10 Jun 2026 17:48:33 +0800 Subject: [PATCH 2/2] refactor(scheduler): type partition helpers at the Timetable protocol level The partitioned-asset Dag run path only calls the base `Timetable.get_partition_mapper` on its timetable, so its helpers were over-typed as `PartitionedAssetTimetable`. That over-narrow type was what forced the `if TYPE_CHECKING: assert isinstance(...)` boilerplate the reviewer flagged. Widen `_resolve_asset_partition_status` / `_resolve_partition_date` to accept `Timetable` and gate on the generic `timetable.partitioned` flag. This drops the assert boilerplate with no narrowing needed and keeps the path open to any partitioned timetable, matching the documented `partitioned`/`get_partition_mapper` protocol on the base class. --- .../src/airflow/jobs/scheduler_job_runner.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 7987d79150dce..a860810b74606 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -113,8 +113,8 @@ from airflow.serialization.definitions.assets import SerializedAssetUniqueKey from airflow.serialization.definitions.notset import NOTSET from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES -from airflow.timetables.base import compute_rollup_fingerprint -from airflow.timetables.simple import AssetTriggeredTimetable, PartitionedAssetTimetable +from airflow.timetables.base import Timetable, compute_rollup_fingerprint +from airflow.timetables.simple import AssetTriggeredTimetable from airflow.triggers.base import TriggerEvent from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import LoggingMixin @@ -1916,7 +1916,7 @@ def _resolve_asset_partition_status( name: str, uri: str, apdr: AssetPartitionDagRun, - timetable: PartitionedAssetTimetable, + timetable: Timetable, actual_by_asset: dict[int, set[str]], ) -> bool: """ @@ -1956,7 +1956,7 @@ def _resolve_asset_partition_status( def _resolve_partition_date( self, *, - timetable: PartitionedAssetTimetable, + timetable: Timetable, asset_infos: Iterable[tuple[str, str]], partition_key: str, dag_id: str, @@ -2181,8 +2181,6 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items(): key = SerializedAssetUniqueKey(name=name, uri=uri) if timetable.partitioned: - if TYPE_CHECKING: - assert isinstance(timetable, PartitionedAssetTimetable) statuses[key] = self._resolve_asset_partition_status( session=session, asset_id=asset_id, @@ -2201,8 +2199,6 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st run_after = timezone.utcnow() partition_date: datetime | None = None if timetable.partitioned: - if TYPE_CHECKING: - assert isinstance(timetable, PartitionedAssetTimetable) partition_date = self._resolve_partition_date( timetable=timetable, asset_infos=asset_info_per_apdr[apdr.id].values(),