Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2247,9 +2247,8 @@ def duration_expression_update(
return query.values(
{
"end_date": end_date,
"duration": (
(func.strftime("%s", end_date) - func.strftime("%s", cls.start_date))
+ func.round((func.strftime("%f", end_date) - func.strftime("%f", cls.start_date)), 3)
"duration": func.round(
(func.julianday(end_date) - func.julianday(cls.start_date)) * 86400, 3
),
}
)
Expand Down
37 changes: 36 additions & 1 deletion airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from sqlalchemy import delete, func, inspect as sa_inspect, select
from sqlalchemy import delete, func, inspect as sa_inspect, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import load_only
from sqlalchemy.orm.attributes import set_committed_value
Expand Down Expand Up @@ -1803,6 +1803,41 @@ def test_set_duration_empty_dates(self):
ti.set_duration()
assert ti.duration is None

@pytest.mark.backend("sqlite")
@pytest.mark.parametrize(
("start_date", "end_date", "expected_duration"),
[
(
timezone.datetime(2026, 6, 7, 12, 0, 1, 200000),
timezone.datetime(2026, 6, 7, 12, 0, 3, 500000),
2.3,
),
(
timezone.datetime(2026, 6, 7, 12, 0, 59, 200000),
timezone.datetime(2026, 6, 7, 12, 1, 0, 500000),
1.3,
),
(
timezone.datetime(2026, 6, 7, 12),
timezone.datetime(2026, 6, 7, 13),
3600.0,
),
],
)
def test_duration_expression_update_sqlite(
self, create_task_instance, session, start_date, end_date, expected_duration
):
ti = create_task_instance()
ti.start_date = start_date
session.flush()

query = update(TI).where(TI.id == ti.id)
session.execute(TI.duration_expression_update(end_date, query, session.get_bind()))
ti.refresh_from_db(session=session)

assert ti.end_date == end_date
assert ti.duration == expected_duration

def test_outlet_asset_extra(self, dag_maker: DagMaker, session: Session):
from airflow.sdk.definitions.asset import Asset

Expand Down
Loading