From 2e3a709defbf9cbbd04776b14615b6e6a5883ec9 Mon Sep 17 00:00:00 2001 From: Ramachandra Nalam Date: Sat, 6 Jun 2026 14:50:33 -0700 Subject: [PATCH 1/3] Make RollupMapper calendar windows DST-aware (#68004) --- .../docs/authoring-and-scheduling/assets.rst | 15 +-- airflow-core/newsfragments/68004.bugfix.rst | 9 ++ .../example_dags/example_asset_partition.py | 4 +- .../src/airflow/partition_mappers/base.py | 6 +- .../src/airflow/partition_mappers/temporal.py | 5 + .../src/airflow/partition_mappers/window.py | 96 ++++++++++++------- .../unit/partition_mappers/test_window.py | 79 ++++++++++----- .../definitions/partition_mappers/window.py | 9 +- 8 files changed, 152 insertions(+), 71 deletions(-) create mode 100644 airflow-core/newsfragments/68004.bugfix.rst diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 7bf609e9d9834..4b14a98954b0e 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -726,12 +726,15 @@ error is a type mismatch: an identity-decoding mapper's ``expected_decoded_type` ``datetime``; ``RollupMapper`` detects the mismatch at construction time and raises before the Dag is scheduled. -``DayWindow`` always enumerates twenty-four hourly steps. With an upstream mapper -configured for a local timezone that observes daylight-saving time, the spring-forward -day has only twenty-three real hours (one window member never has a matching event, -so the run is held indefinitely) and the fall-back day has twenty-five (the repeated -hour is dropped). Use a UTC-based upstream mapper for any rollup that crosses a DST -boundary; see the ``DayWindow`` class docstring for the full discussion. +``DayWindow`` is DST-aware. With a UTC (or naive) upstream mapper a day is always +twenty-four hours. With an upstream mapper configured for a local timezone that observes +daylight-saving time, ``DayWindow`` enumerates the *real* local hours of the calendar +day — twenty-three on the spring-forward day and twenty-five on the fall-back day — so +the expected key set matches the hours that actually occur and a spring-forward rollup is +no longer held forever waiting for a key (e.g. ``2024-03-10T02``) that never arrives. On +a fall-back day the local clock 01:00 occurs twice; unless the upstream mapper's +``input_format`` carries the UTC offset (``%z``), the two hours share a single key (the +rollup still does not hang). See the ``DayWindow`` class docstring for details. Setting partition keys at runtime ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/airflow-core/newsfragments/68004.bugfix.rst b/airflow-core/newsfragments/68004.bugfix.rst new file mode 100644 index 0000000000000..7dfa0c1747270 --- /dev/null +++ b/airflow-core/newsfragments/68004.bugfix.rst @@ -0,0 +1,9 @@ +Make ``RollupMapper`` calendar windows DST-aware + +``DayWindow`` no longer assumes every day has 24 hours. When paired with a local-timezone upstream +mapper it now enumerates the real local hours of the calendar day, yielding 23 expected upstream +keys on a spring-forward day and 25 on a fall-back day instead of a fixed 24. This fixes +asset-partitioned rollups being held forever on spring-forward days (an expected hourly key that +never occurs is no longer in the required set). With a UTC (or naive) upstream mapper the behavior +is unchanged (24 hours). On a fall-back day, the repeated local hour collapses to a single key +unless the upstream mapper's ``input_format`` carries the UTC offset (``%z``). diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index 3995702df6b21..3de21557c6ac5 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -312,7 +312,9 @@ def multi_region_player_stats(self, outlet_events): ``StartOfDayMapper`` normalizes each upstream hourly timestamp (``%Y-%m-%dT%H:%M:%S``) to its day-start (``%Y-%m-%d``); ``DayWindow`` declares the downstream run needs all 24 hourly partitions before firing. Publishes ``daily_team_a`` so the - monthly rollup below can consume it. + monthly rollup below can consume it. (This mapper uses the default UTC timezone, so a + day is always 24 hours; with a local-timezone mapper ``DayWindow`` is DST-aware and + expects 23 or 25 partitions on DST-transition days.) """ @task(outlets=[daily_team_a]) diff --git a/airflow-core/src/airflow/partition_mappers/base.py b/airflow-core/src/airflow/partition_mappers/base.py index ba4a3ee658e9f..c6120b33b05e5 100644 --- a/airflow-core/src/airflow/partition_mappers/base.py +++ b/airflow-core/src/airflow/partition_mappers/base.py @@ -133,9 +133,13 @@ def to_downstream(self, key: str) -> str | Iterable[str]: def to_upstream(self, downstream_key: str) -> frozenset[str]: """Return the complete set of upstream partition keys required for *downstream_key*.""" decoded = self.upstream_mapper.decode_downstream(downstream_key) + # Hand the upstream mapper's timezone to the window so calendar windows (DayWindow) can + # enumerate the real local hours across DST. Non-temporal mappers have no tzinfo -> None, + # in which case the window falls back to its fixed-count (UTC) behaviour. + tz = getattr(self.upstream_mapper, "tzinfo", None) return frozenset( self.upstream_mapper.encode_upstream(expected_upstream) - for expected_upstream in self.window.to_upstream(decoded) + for expected_upstream in self.window.to_upstream(decoded, tz) ) def serialize(self) -> dict[str, Any]: diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index b45ca9da3a4a2..6f114c8ae78a1 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -167,6 +167,11 @@ def __init__( timezone = parse_timezone(timezone) self._timezone = timezone + @property + def tzinfo(self) -> Timezone | FixedTimezone: + """Timezone this mapper localizes keys in (used by ``RollupMapper`` for DST-aware windows).""" + return self._timezone + def to_downstream(self, key: str) -> str: dt = datetime.strptime(key, self.input_format) if dt.tzinfo is None: diff --git a/airflow-core/src/airflow/partition_mappers/window.py b/airflow-core/src/airflow/partition_mappers/window.py index f0c92d458d525..71782d823ccdc 100644 --- a/airflow-core/src/airflow/partition_mappers/window.py +++ b/airflow-core/src/airflow/partition_mappers/window.py @@ -20,8 +20,11 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, ClassVar +from airflow._shared.timezones.timezone import make_aware, utc + if TYPE_CHECKING: from collections.abc import Iterable + from datetime import tzinfo def _require_day_one(dt: datetime, window_cls: type) -> None: @@ -89,8 +92,14 @@ class Window(ABC): expected_decoded_type: ClassVar[type] = str @abstractmethod - def to_upstream(self, decoded_downstream: Any) -> Iterable[Any]: - """Yield each decoded upstream item composing *decoded_downstream*.""" + def to_upstream(self, decoded_downstream: Any, tz: tzinfo | None = None) -> Iterable[Any]: + """ + Yield each decoded upstream item composing *decoded_downstream*. + + *tz* is the paired upstream mapper's timezone (``None`` for naive / non-temporal + mappers). Windows whose member count depends on the local calendar (currently only + :class:`DayWindow`, across DST transitions) use it; others ignore it. + """ def serialize(self) -> dict[str, Any]: return {} @@ -105,45 +114,59 @@ class HourWindow(Window): expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: + # Minute steps within an hour are unaffected by DST (offsets shift on hour boundaries). return (period_start + timedelta(minutes=i) for i in range(60)) class DayWindow(Window): """ - Twenty-four consecutive hourly period-starts making up one day. - - Arithmetic is done on naive datetime steps so the 24-hour stride is - unambiguous across DST transitions; the upstream mapper handles timezone - awareness when it encodes each upstream member back to a key string. - - .. warning:: **DST edge cases with local-timezone upstream mappers** - - ``DayWindow`` always yields exactly 24 steps regardless of the local - calendar date. When the upstream mapper uses a local timezone - (e.g. ``StartOfDayMapper(timezone="America/New_York")``), DST gaps - and folds can cause a mismatch: - - - **Spring-forward (clock skips ahead)**: the local day has fewer than - 24 real hours. One naive step falls in the gap (e.g. 02:00 ET on - spring-forward day does not exist), so the upstream mapper encodes it - to the *next* local hour. That key (e.g. ``"2024-03-10T03"``) does - not match any upstream event — the rollup window can never be fully - satisfied. - - **Fall-back (clock repeats)**: the local day has 25 real hours, but - ``DayWindow`` only enumerates 24 steps. The extra hour's upstream - events are never included in the expected set, so those events do not - contribute to any rollup. - - **Mitigation**: use UTC ``input_format`` (e.g. ``%Y-%m-%dT%H%z``) and - ensure upstream producers emit UTC partition keys so local-clock - ambiguity never arises. + The hourly period-starts making up one calendar day. + + With a UTC (or naive) upstream mapper a day is always 24 hours, so the + window yields the canonical 24 steps. When the paired upstream mapper uses + a local timezone (e.g. ``StartOfDayMapper(timezone="America/New_York")``) + the window is DST-aware: it enumerates the *real* local hours of that + calendar day by stepping through the period in UTC, so it yields 23 members + on a spring-forward day and 25 on a fall-back day instead of a fixed 24. + This keeps the expected upstream key set matched to the hours that actually + occur, so a spring-forward rollup is no longer held forever waiting for a + key (e.g. ``"2024-03-10T02"``) that never arrives. + + .. note:: + + On a fall-back day the local clock 01:00 occurs twice. Both instants + map to the same wall-clock key unless the mapper's ``input_format`` + carries the UTC offset (``%z``), so without ``%z`` the two hours + collapse to a single expected key (the window still does not hang; the + duplicate hour's events simply share one key). """ expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: - return (period_start + timedelta(hours=i) for i in range(24)) + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: + if tz is None: + # UTC / naive upstream mapper: a day is always 24 unambiguous hourly steps. + return (period_start + timedelta(hours=i) for i in range(24)) + return self._local_hours(period_start, tz) + + @staticmethod + def _local_hours(period_start: datetime, tz: tzinfo) -> list[datetime]: + """ + Enumerate the real local hour-starts of *period_start*'s calendar day in *tz*. + + Steps in UTC (where ``+ timedelta`` is unambiguous, there being no DST) from the day + start to the next day start, converting each instant back to *tz*. This yields 23 + members on spring-forward days, 25 on fall-back days, and 24 otherwise. + """ + start_utc = make_aware(period_start, tz).astimezone(utc) + end_utc = make_aware(period_start + timedelta(days=1), tz).astimezone(utc) + hours: list[datetime] = [] + current = start_utc + while current < end_utc: + hours.append(current.astimezone(tz)) + current += timedelta(hours=1) + return hours class WeekWindow(Window): @@ -151,7 +174,8 @@ class WeekWindow(Window): expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: + # Day-grain steps are DST-safe (a calendar day is one key regardless of its length). return (period_start + timedelta(days=i) for i in range(7)) @@ -167,7 +191,7 @@ class MonthWindow(Window): expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: _require_day_one(period_start, type(self)) next_month = period_start.month % 12 + 1 next_year = period_start.year + (1 if period_start.month == 12 else 0) @@ -181,7 +205,7 @@ class QuarterWindow(Window): expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: _require_day_one(period_start, type(self)) return (_shift_months(period_start, i) for i in range(3)) @@ -191,6 +215,6 @@ class YearWindow(Window): expected_decoded_type: ClassVar[type] = datetime - def to_upstream(self, period_start: datetime) -> Iterable[datetime]: + def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]: _require_day_one(period_start, type(self)) return (_shift_months(period_start, i) for i in range(12)) diff --git a/airflow-core/tests/unit/partition_mappers/test_window.py b/airflow-core/tests/unit/partition_mappers/test_window.py index f32b9423579b9..c053a5c96cb4f 100644 --- a/airflow-core/tests/unit/partition_mappers/test_window.py +++ b/airflow-core/tests/unit/partition_mappers/test_window.py @@ -52,45 +52,61 @@ def test_yields_24_hourly_period_starts(self): members = list(DayWindow().to_upstream(period_start)) assert members == [datetime(2024, 6, 10, h) for h in range(24)] - def test_day_window_yields_24_naive_steps_regardless_of_dst(self): - """DayWindow always yields exactly 24 naive steps; it does not know about DST.""" - # 2024-03-10: US Eastern spring-forward (clocks skip 02:00 → 03:00). + def test_day_window_tz_none_yields_24_naive_steps(self): + """With no timezone (UTC / naive upstream mapper) DayWindow always yields 24 naive steps.""" + # 2024-03-10 / 2024-11-03 are US Eastern DST days, but with tz=None the window is + # timezone-agnostic and yields a fixed 24 naive hourly steps. spring_forward = datetime(2024, 3, 10) assert list(DayWindow().to_upstream(spring_forward)) == [datetime(2024, 3, 10, h) for h in range(24)] - # 2024-11-03: US Eastern fall-back (clocks repeat 01:00 → 01:00). fall_back = datetime(2024, 11, 3) assert list(DayWindow().to_upstream(fall_back)) == [datetime(2024, 11, 3, h) for h in range(24)] - @pytest.mark.xfail( - reason=( - "DayWindow with a local-timezone upstream mapper cannot satisfy the rollup on a " - "spring-forward day: the 02:00 ET gap causes one naive step to encode to " - "03:00 ET, which upstream producers never emit. This is an accepted limitation " - "of naive 24-hour stepping — the documented mitigation is to use UTC input_format " - "so local-clock ambiguity never arises. See DayWindow's docstring." - ), - strict=True, - ) - def test_day_window_rollup_under_yields_on_spring_forward_with_local_tz(self): + def test_day_window_spring_forward_local_tz_yields_23_keys(self): """ - Rollup with a local-timezone upstream mapper is unsatisfiable on spring-forward days. - - DayWindow generates 24 naive steps. StartOfDayMapper(timezone="America/New_York") - encodes each step into a local-time key. On 2024-03-10 (spring-forward), 02:00 - local time does not exist — the encoder produces "2024-03-10T03" for *two* - consecutive steps. One expected upstream key ("2024-03-10T02") is never emitted - by any real producer, so to_upstream returns a frozenset of 23 *distinct* keys. - The rollup window requires all 24 and therefore can never be satisfied. + On a spring-forward day a local-timezone rollup expects the 23 real local hours. + + 2024-03-10 US Eastern skips 02:00 → 03:00, so the day has 23 hours. DayWindow is DST-aware + and yields those 23 (no ``2024-03-10T02`` gap key), so the rollup can actually be satisfied + instead of waiting forever for a key no producer emits. """ mapper = RollupMapper( upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"), window=DayWindow(), ) upstream_keys = mapper.to_upstream("2024-03-10") - # A correctly functioning rollup would expect exactly 24 distinct upstream keys. - # On spring-forward this assertion fails because only 23 distinct keys are produced. - assert len(upstream_keys) == 24 + assert len(upstream_keys) == 23 + assert "2024-03-10T02" not in upstream_keys + + def test_day_window_fall_back_local_tz_does_not_hang(self): + """ + On a fall-back day the local clock 01:00 repeats. + + Without ``%z`` in the upstream format the two 01:00 hours share a single key, so the window + yields 24 distinct keys (not 25). Crucially none of them is unsatisfiable, so the rollup no + longer hangs (full 25-key coverage would require ``%z`` in ``input_format``). + """ + mapper = RollupMapper( + upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"), + window=DayWindow(), + ) + assert len(mapper.to_upstream("2024-11-03")) == 24 + + def test_day_window_non_dst_local_day_yields_24_keys(self): + """A local-timezone day with no DST transition still yields exactly 24 keys.""" + mapper = RollupMapper( + upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"), + window=DayWindow(), + ) + assert len(mapper.to_upstream("2024-06-10")) == 24 + + def test_day_window_utc_yields_24_keys_on_dst_day(self): + """A UTC upstream mapper is unaffected by local DST: 24 keys even on a spring-forward date.""" + mapper = RollupMapper( + upstream_mapper=StartOfDayMapper(timezone="UTC", input_format="%Y-%m-%dT%H"), + window=DayWindow(), + ) + assert len(mapper.to_upstream("2024-03-10")) == 24 class TestWeekWindow: @@ -285,6 +301,17 @@ def test_serialize_round_trip(self): "2024-06-10", id="day", ), + pytest.param( + # The window carries no tz state - the timezone lives on the mapper and is + # serialized there, so the restored rollup must still reproduce the DST-correct + # 23-key set on a spring-forward day. + lambda: StartOfDayMapper( + timezone="America/New_York", input_format="%Y-%m-%dT%H", output_format="%Y-%m-%d" + ), + DayWindow(), + "2024-03-10", + id="day-local-tz-dst", + ), pytest.param( lambda: StartOfQuarterMapper(input_format="%Y-%m"), QuarterWindow(), diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/window.py b/task-sdk/src/airflow/sdk/definitions/partition_mappers/window.py index c7dacad19948d..3c45b80d0fffb 100644 --- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/window.py +++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/window.py @@ -58,7 +58,14 @@ class HourWindow(Window): class DayWindow(Window): - """Twenty-four consecutive hourly keys making up one day.""" + """ + The hourly keys making up one calendar day. + + Twenty-four with a UTC/naive upstream mapper; DST-aware with a local-timezone upstream + mapper (twenty-three on the spring-forward day, twenty-five on the fall-back day). The + DST-aware enumeration runs in the scheduler (airflow-core); this Task SDK definition only + declares the window for Dag authoring and serialization. + """ expected_decoded_type: ClassVar[type] = datetime From 98106c61f8d6ca7a0e52e853291bdd11df9c9c43 Mon Sep 17 00:00:00 2001 From: Ramachandra Nalam Date: Sat, 6 Jun 2026 17:53:13 -0700 Subject: [PATCH 2/3] Rename newsfragment to match PR number (#68150) The check-newsfragment-pr-number CI check requires the newsfragment file to be named after the PR number, not the issue number. --- airflow-core/newsfragments/{68004.bugfix.rst => 68150.bugfix.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{68004.bugfix.rst => 68150.bugfix.rst} (100%) diff --git a/airflow-core/newsfragments/68004.bugfix.rst b/airflow-core/newsfragments/68150.bugfix.rst similarity index 100% rename from airflow-core/newsfragments/68004.bugfix.rst rename to airflow-core/newsfragments/68150.bugfix.rst From 57774f6799264643af3d072be85d618721878016 Mon Sep 17 00:00:00 2001 From: Ramachandra Nalam Date: Sat, 6 Jun 2026 18:42:35 -0700 Subject: [PATCH 3/3] Make the newsfragment a single line The check-newsfragments-are-valid hook requires non-significant newsfragments to be a single line. --- airflow-core/newsfragments/68150.bugfix.rst | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/airflow-core/newsfragments/68150.bugfix.rst b/airflow-core/newsfragments/68150.bugfix.rst index 7dfa0c1747270..e1f532c42ed31 100644 --- a/airflow-core/newsfragments/68150.bugfix.rst +++ b/airflow-core/newsfragments/68150.bugfix.rst @@ -1,9 +1 @@ -Make ``RollupMapper`` calendar windows DST-aware - -``DayWindow`` no longer assumes every day has 24 hours. When paired with a local-timezone upstream -mapper it now enumerates the real local hours of the calendar day, yielding 23 expected upstream -keys on a spring-forward day and 25 on a fall-back day instead of a fixed 24. This fixes -asset-partitioned rollups being held forever on spring-forward days (an expected hourly key that -never occurs is no longer in the required set). With a UTC (or naive) upstream mapper the behavior -is unchanged (24 hours). On a fall-back day, the repeated local hour collapses to a single key -unless the upstream mapper's ``input_format`` carries the UTC offset (``%z``). +Make ``RollupMapper`` calendar windows DST-aware so a local-timezone calendar-day rollup is no longer held forever on a spring-forward day; ``DayWindow`` now enumerates the real local hours of the day (23 on a spring-forward day, 25 on a fall-back day) instead of a fixed 24, while a UTC or naive upstream mapper is unchanged.