Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -726,12 +726,15 @@ error is a type mismatch: an identity-decoding mapper's ``expected_decoded_type`
``datetime``; ``RollupMapper`` detects the mismatch at construction time and
raises before the Dag is scheduled.

``DayWindow`` always enumerates twenty-four hourly steps. With an upstream mapper
configured for a local timezone that observes daylight-saving time, the spring-forward
day has only twenty-three real hours (one window member never has a matching event,
so the run is held indefinitely) and the fall-back day has twenty-five (the repeated
hour is dropped). Use a UTC-based upstream mapper for any rollup that crosses a DST
boundary; see the ``DayWindow`` class docstring for the full discussion.
``DayWindow`` is DST-aware. With a UTC (or naive) upstream mapper a day is always
twenty-four hours. With an upstream mapper configured for a local timezone that observes
daylight-saving time, ``DayWindow`` enumerates the *real* local hours of the calendar
day — twenty-three on the spring-forward day and twenty-five on the fall-back day — so
the expected key set matches the hours that actually occur and a spring-forward rollup is
no longer held forever waiting for a key (e.g. ``2024-03-10T02``) that never arrives. On
a fall-back day the local clock 01:00 occurs twice; unless the upstream mapper's
``input_format`` carries the UTC offset (``%z``), the two hours share a single key (the
rollup still does not hang). See the ``DayWindow`` class docstring for details.

Setting partition keys at runtime
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68150.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make ``RollupMapper`` calendar windows DST-aware so a local-timezone calendar-day rollup is no longer held forever on a spring-forward day; ``DayWindow`` now enumerates the real local hours of the day (23 on a spring-forward day, 25 on a fall-back day) instead of a fixed 24, while a UTC or naive upstream mapper is unchanged.
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ def multi_region_player_stats(self, outlet_events):
``StartOfDayMapper`` normalizes each upstream hourly timestamp (``%Y-%m-%dT%H:%M:%S``)
to its day-start (``%Y-%m-%d``); ``DayWindow`` declares the downstream run needs
all 24 hourly partitions before firing. Publishes ``daily_team_a`` so the
monthly rollup below can consume it.
monthly rollup below can consume it. (This mapper uses the default UTC timezone, so a
day is always 24 hours; with a local-timezone mapper ``DayWindow`` is DST-aware and
expects 23 or 25 partitions on DST-transition days.)
"""

@task(outlets=[daily_team_a])
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/partition_mappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ def to_downstream(self, key: str) -> str | Iterable[str]:
def to_upstream(self, downstream_key: str) -> frozenset[str]:
"""Return the complete set of upstream partition keys required for *downstream_key*."""
decoded = self.upstream_mapper.decode_downstream(downstream_key)
# Hand the upstream mapper's timezone to the window so calendar windows (DayWindow) can
# enumerate the real local hours across DST. Non-temporal mappers have no tzinfo -> None,
# in which case the window falls back to its fixed-count (UTC) behaviour.
tz = getattr(self.upstream_mapper, "tzinfo", None)
return frozenset(
self.upstream_mapper.encode_upstream(expected_upstream)
for expected_upstream in self.window.to_upstream(decoded)
for expected_upstream in self.window.to_upstream(decoded, tz)
)

def serialize(self) -> dict[str, Any]:
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/src/airflow/partition_mappers/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ def __init__(
timezone = parse_timezone(timezone)
self._timezone = timezone

@property
def tzinfo(self) -> Timezone | FixedTimezone:
"""Timezone this mapper localizes keys in (used by ``RollupMapper`` for DST-aware windows)."""
return self._timezone

def to_downstream(self, key: str) -> str:
dt = datetime.strptime(key, self.input_format)
if dt.tzinfo is None:
Expand Down
96 changes: 60 additions & 36 deletions airflow-core/src/airflow/partition_mappers/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, ClassVar

from airflow._shared.timezones.timezone import make_aware, utc

if TYPE_CHECKING:
from collections.abc import Iterable
from datetime import tzinfo


def _require_day_one(dt: datetime, window_cls: type) -> None:
Expand Down Expand Up @@ -89,8 +92,14 @@ class Window(ABC):
expected_decoded_type: ClassVar[type] = str

@abstractmethod
def to_upstream(self, decoded_downstream: Any) -> Iterable[Any]:
"""Yield each decoded upstream item composing *decoded_downstream*."""
def to_upstream(self, decoded_downstream: Any, tz: tzinfo | None = None) -> Iterable[Any]:
"""
Yield each decoded upstream item composing *decoded_downstream*.

*tz* is the paired upstream mapper's timezone (``None`` for naive / non-temporal
mappers). Windows whose member count depends on the local calendar (currently only
:class:`DayWindow`, across DST transitions) use it; others ignore it.
"""

def serialize(self) -> dict[str, Any]:
return {}
Expand All @@ -105,53 +114,68 @@ class HourWindow(Window):

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
# Minute steps within an hour are unaffected by DST (offsets shift on hour boundaries).
return (period_start + timedelta(minutes=i) for i in range(60))


class DayWindow(Window):
"""
Twenty-four consecutive hourly period-starts making up one day.

Arithmetic is done on naive datetime steps so the 24-hour stride is
unambiguous across DST transitions; the upstream mapper handles timezone
awareness when it encodes each upstream member back to a key string.

.. warning:: **DST edge cases with local-timezone upstream mappers**

``DayWindow`` always yields exactly 24 steps regardless of the local
calendar date. When the upstream mapper uses a local timezone
(e.g. ``StartOfDayMapper(timezone="America/New_York")``), DST gaps
and folds can cause a mismatch:

- **Spring-forward (clock skips ahead)**: the local day has fewer than
24 real hours. One naive step falls in the gap (e.g. 02:00 ET on
spring-forward day does not exist), so the upstream mapper encodes it
to the *next* local hour. That key (e.g. ``"2024-03-10T03"``) does
not match any upstream event — the rollup window can never be fully
satisfied.
- **Fall-back (clock repeats)**: the local day has 25 real hours, but
``DayWindow`` only enumerates 24 steps. The extra hour's upstream
events are never included in the expected set, so those events do not
contribute to any rollup.

**Mitigation**: use UTC ``input_format`` (e.g. ``%Y-%m-%dT%H%z``) and
ensure upstream producers emit UTC partition keys so local-clock
ambiguity never arises.
The hourly period-starts making up one calendar day.

With a UTC (or naive) upstream mapper a day is always 24 hours, so the
window yields the canonical 24 steps. When the paired upstream mapper uses
a local timezone (e.g. ``StartOfDayMapper(timezone="America/New_York")``)
the window is DST-aware: it enumerates the *real* local hours of that
calendar day by stepping through the period in UTC, so it yields 23 members
on a spring-forward day and 25 on a fall-back day instead of a fixed 24.
This keeps the expected upstream key set matched to the hours that actually
occur, so a spring-forward rollup is no longer held forever waiting for a
key (e.g. ``"2024-03-10T02"``) that never arrives.

.. note::

On a fall-back day the local clock 01:00 occurs twice. Both instants
map to the same wall-clock key unless the mapper's ``input_format``
carries the UTC offset (``%z``), so without ``%z`` the two hours
collapse to a single expected key (the window still does not hang; the
duplicate hour's events simply share one key).
"""

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
return (period_start + timedelta(hours=i) for i in range(24))
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
if tz is None:
# UTC / naive upstream mapper: a day is always 24 unambiguous hourly steps.
return (period_start + timedelta(hours=i) for i in range(24))
return self._local_hours(period_start, tz)

@staticmethod
def _local_hours(period_start: datetime, tz: tzinfo) -> list[datetime]:
"""
Enumerate the real local hour-starts of *period_start*'s calendar day in *tz*.

Steps in UTC (where ``+ timedelta`` is unambiguous, there being no DST) from the day
start to the next day start, converting each instant back to *tz*. This yields 23
members on spring-forward days, 25 on fall-back days, and 24 otherwise.
"""
start_utc = make_aware(period_start, tz).astimezone(utc)
end_utc = make_aware(period_start + timedelta(days=1), tz).astimezone(utc)
hours: list[datetime] = []
current = start_utc
while current < end_utc:
hours.append(current.astimezone(tz))
current += timedelta(hours=1)
return hours


class WeekWindow(Window):
"""Seven consecutive daily period-starts making up one week."""

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
# Day-grain steps are DST-safe (a calendar day is one key regardless of its length).
return (period_start + timedelta(days=i) for i in range(7))


Expand All @@ -167,7 +191,7 @@ class MonthWindow(Window):

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
_require_day_one(period_start, type(self))
next_month = period_start.month % 12 + 1
next_year = period_start.year + (1 if period_start.month == 12 else 0)
Expand All @@ -181,7 +205,7 @@ class QuarterWindow(Window):

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
_require_day_one(period_start, type(self))
return (_shift_months(period_start, i) for i in range(3))

Expand All @@ -191,6 +215,6 @@ class YearWindow(Window):

expected_decoded_type: ClassVar[type] = datetime

def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
def to_upstream(self, period_start: datetime, tz: tzinfo | None = None) -> Iterable[datetime]:
_require_day_one(period_start, type(self))
return (_shift_months(period_start, i) for i in range(12))
79 changes: 53 additions & 26 deletions airflow-core/tests/unit/partition_mappers/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,45 +52,61 @@ def test_yields_24_hourly_period_starts(self):
members = list(DayWindow().to_upstream(period_start))
assert members == [datetime(2024, 6, 10, h) for h in range(24)]

def test_day_window_yields_24_naive_steps_regardless_of_dst(self):
"""DayWindow always yields exactly 24 naive steps; it does not know about DST."""
# 2024-03-10: US Eastern spring-forward (clocks skip 02:00 → 03:00).
def test_day_window_tz_none_yields_24_naive_steps(self):
"""With no timezone (UTC / naive upstream mapper) DayWindow always yields 24 naive steps."""
# 2024-03-10 / 2024-11-03 are US Eastern DST days, but with tz=None the window is
# timezone-agnostic and yields a fixed 24 naive hourly steps.
spring_forward = datetime(2024, 3, 10)
assert list(DayWindow().to_upstream(spring_forward)) == [datetime(2024, 3, 10, h) for h in range(24)]

# 2024-11-03: US Eastern fall-back (clocks repeat 01:00 → 01:00).
fall_back = datetime(2024, 11, 3)
assert list(DayWindow().to_upstream(fall_back)) == [datetime(2024, 11, 3, h) for h in range(24)]

@pytest.mark.xfail(
reason=(
"DayWindow with a local-timezone upstream mapper cannot satisfy the rollup on a "
"spring-forward day: the 02:00 ET gap causes one naive step to encode to "
"03:00 ET, which upstream producers never emit. This is an accepted limitation "
"of naive 24-hour stepping — the documented mitigation is to use UTC input_format "
"so local-clock ambiguity never arises. See DayWindow's docstring."
),
strict=True,
)
def test_day_window_rollup_under_yields_on_spring_forward_with_local_tz(self):
def test_day_window_spring_forward_local_tz_yields_23_keys(self):
"""
Rollup with a local-timezone upstream mapper is unsatisfiable on spring-forward days.

DayWindow generates 24 naive steps. StartOfDayMapper(timezone="America/New_York")
encodes each step into a local-time key. On 2024-03-10 (spring-forward), 02:00
local time does not exist — the encoder produces "2024-03-10T03" for *two*
consecutive steps. One expected upstream key ("2024-03-10T02") is never emitted
by any real producer, so to_upstream returns a frozenset of 23 *distinct* keys.
The rollup window requires all 24 and therefore can never be satisfied.
On a spring-forward day a local-timezone rollup expects the 23 real local hours.

2024-03-10 US Eastern skips 02:00 → 03:00, so the day has 23 hours. DayWindow is DST-aware
and yields those 23 (no ``2024-03-10T02`` gap key), so the rollup can actually be satisfied
instead of waiting forever for a key no producer emits.
"""
mapper = RollupMapper(
upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"),
window=DayWindow(),
)
upstream_keys = mapper.to_upstream("2024-03-10")
# A correctly functioning rollup would expect exactly 24 distinct upstream keys.
# On spring-forward this assertion fails because only 23 distinct keys are produced.
assert len(upstream_keys) == 24
assert len(upstream_keys) == 23
assert "2024-03-10T02" not in upstream_keys

def test_day_window_fall_back_local_tz_does_not_hang(self):
"""
On a fall-back day the local clock 01:00 repeats.

Without ``%z`` in the upstream format the two 01:00 hours share a single key, so the window
yields 24 distinct keys (not 25). Crucially none of them is unsatisfiable, so the rollup no
longer hangs (full 25-key coverage would require ``%z`` in ``input_format``).
"""
mapper = RollupMapper(
upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"),
window=DayWindow(),
)
assert len(mapper.to_upstream("2024-11-03")) == 24

def test_day_window_non_dst_local_day_yields_24_keys(self):
"""A local-timezone day with no DST transition still yields exactly 24 keys."""
mapper = RollupMapper(
upstream_mapper=StartOfDayMapper(timezone="America/New_York", input_format="%Y-%m-%dT%H"),
window=DayWindow(),
)
assert len(mapper.to_upstream("2024-06-10")) == 24

def test_day_window_utc_yields_24_keys_on_dst_day(self):
"""A UTC upstream mapper is unaffected by local DST: 24 keys even on a spring-forward date."""
mapper = RollupMapper(
upstream_mapper=StartOfDayMapper(timezone="UTC", input_format="%Y-%m-%dT%H"),
window=DayWindow(),
)
assert len(mapper.to_upstream("2024-03-10")) == 24


class TestWeekWindow:
Expand Down Expand Up @@ -285,6 +301,17 @@ def test_serialize_round_trip(self):
"2024-06-10",
id="day",
),
pytest.param(
# The window carries no tz state - the timezone lives on the mapper and is
# serialized there, so the restored rollup must still reproduce the DST-correct
# 23-key set on a spring-forward day.
lambda: StartOfDayMapper(
timezone="America/New_York", input_format="%Y-%m-%dT%H", output_format="%Y-%m-%d"
),
DayWindow(),
"2024-03-10",
id="day-local-tz-dst",
),
pytest.param(
lambda: StartOfQuarterMapper(input_format="%Y-%m"),
QuarterWindow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ class HourWindow(Window):


class DayWindow(Window):
"""Twenty-four consecutive hourly keys making up one day."""
"""
The hourly keys making up one calendar day.

Twenty-four with a UTC/naive upstream mapper; DST-aware with a local-timezone upstream
mapper (twenty-three on the spring-forward day, twenty-five on the fall-back day). The
DST-aware enumeration runs in the scheduler (airflow-core); this Task SDK definition only
declares the window for Dag authoring and serialization.
"""

expected_decoded_type: ClassVar[type] = datetime

Expand Down
Loading