You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am running a flow that starts with a S3KeySensor to detect incoming files. It checks for files with a certain suffix. The first step once such a file is discovered, is to change the suffix; the Sensor is marked with wait_for_downstream=True, so that it is only checked again after the suffix change, and doesn't trigger until a new file arrives.
This doesn't work -- the sensor doesn't wait for the downstream task to finish; as a result, it triggers another rename attempt, and that fails because by the time it is launched, the other rename has succeeded.
To investigate this, I changed the lines 155-159 in the file airflow/ti_deps/deps/prev_dagrun_dep.py to
# There was a DAG run, but the task wasn't active back then.ifcatchupandlast_dagrun.execution_date<ti.task.start_date:
self._push_past_deps_met_xcom_if_needed(ti, dep_context)
dbg_reason=f"First instance of this task. Last dagrun {last_dagrun.execution_date!r} Task start {ti.task.start_date!r}"yieldself._passing_status(reason=dbg_reason)
return
(I used the less-than-perfectly-readable !r because I suspected the problem was related to timezone or maybe even naive-vs-aware datetime issues)
This produced the following output in the scheduler log (grepped relevant lines only):
Explanation: One of the tests relevant to wait_for_downstream is, has this task even existed at the time of the last DAG run? If not, then, obviously, it didn't have dependencies that we need to check. That check is done by comparing the start_date on the task (not instance) to the execution_date on the last DAG run -- assuming that the start_date on the task records when that task was put in place.
But what you can see in the log is that start_date on the task is updated continuously; in fact, it matches the schedule time of the current DAG run, which is (at least in my case) almost always later than the execution time of the previous run.
This is also reflected through the web UI -- if you open the details of a task instance after some future runs have occurred, and click "more details", you can see an instance start-date that is older than the task start-date.
Am I missing something fundamental, or is it a bug?
(the code modifications etc were done on 2.9.1 but AFAICT 2.9.2 shows exactly the same behavior)
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I am running a flow that starts with a
S3KeySensor
to detect incoming files. It checks for files with a certain suffix. The first step once such a file is discovered, is to change the suffix; the Sensor is marked withwait_for_downstream=True
, so that it is only checked again after the suffix change, and doesn't trigger until a new file arrives.This doesn't work -- the sensor doesn't wait for the downstream task to finish; as a result, it triggers another rename attempt, and that fails because by the time it is launched, the other rename has succeeded.
To investigate this, I changed the lines 155-159 in the file
airflow/ti_deps/deps/prev_dagrun_dep.py
to(I used the less-than-perfectly-readable
!r
because I suspected the problem was related to timezone or maybe even naive-vs-aware datetime issues)This produced the following output in the scheduler log (grepped relevant lines only):
Explanation: One of the tests relevant to
wait_for_downstream
is, has this task even existed at the time of the last DAG run? If not, then, obviously, it didn't have dependencies that we need to check. That check is done by comparing thestart_date
on the task (not instance) to theexecution_date
on the last DAG run -- assuming that thestart_date
on the task records when that task was put in place.But what you can see in the log is that
start_date
on the task is updated continuously; in fact, it matches the schedule time of the current DAG run, which is (at least in my case) almost always later than the execution time of the previous run.This is also reflected through the web UI -- if you open the details of a task instance after some future runs have occurred, and click "more details", you can see an instance start-date that is older than the task start-date.
Am I missing something fundamental, or is it a bug?
(the code modifications etc were done on 2.9.1 but AFAICT 2.9.2 shows exactly the same behavior)
Beta Was this translation helpful? Give feedback.
All reactions