Replies: 6 comments 11 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
Is this not currently possible? Outlet DAG: from pendulum import datetime
from airflow.decorators import (
dag,
task,
)
from airflow.datasets import Dataset
@dag(
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2, # If a task fails, it will retry 2 times.
},
tags=["example"],
) # If set, this tag is shown in the DAG view of the Airflow UI
def outlet():
@task(outlets=[Dataset("outlet")])
def write_xcom(ti=None):
ti.xcom_push(key="outlet_xcom", value="xyz")
write_xcom()
outlet() Inlet DAG: import json
from pendulum import datetime
from airflow.decorators import (
dag,
task,
)
from airflow.datasets import Dataset
@dag(
schedule=[Dataset("outlet")],
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2, # If a task fails, it will retry 2 times.
},
tags=["example"],
)
def inlet():
@task()
def read_xcom(ti=None):
xcoms = ti.xcom_pull(
dag_id="outlet",
task_ids="write_xcom",
key="outlet_xcom",
include_prior_dates=True,
)
print(f"xcoms: {xcoms}")
read_xcom()
inlet()
|
Beta Was this translation helpful? Give feedback.
-
Never tested myself but the code looks good. So if this is really the way of working then I still see a high demand that docs have this added as example and maybe the example dataset DAGs shipped include this in example code :-D |
Beta Was this translation helpful? Give feedback.
-
Well… this does seem like a reasonable approach, but then… it tightly couples the two dags. There are some colleagues looking to version their dags by essentially adjusting the name of the file during deployment and using Using the approach you suggested would not allow decoupling/loose-coupling of the producer and consumer dag through the dataset. But it DOES make me think that maybe the feature would be to carry the "triggering information" with dataset triggers… if the consumer dag (inlet in your case) could be told the name of the dag and/or dataset (if multiple) that triggered it, then it could ask for xcomm information by dag_id… However, it makes me think "what if the producer ran multiple times before the downstream got its trigger" - not sure how often that would be the case - but… would the above solution allow for multiple separate triggers to carry their own xcom information related to their work? Conceptually, you want to be able to get an event about each dataset write about what happened and I feel like using XCOM has the chance of race-conditions or skipping events - if you go read it - does it just always get the "last value"? |
Beta Was this translation helpful? Give feedback.
-
I just tried this with
and the outlet task puts the I get varying results of several repeats, several So this workaround mechanism does not seem very predictable as an "event" mechanism to tie two dags together via a dataset + xcom |
Beta Was this translation helpful? Give feedback.
-
I just tried another tack - but found another shortcoming - I tried setting up a consumer dag with
I mean… since this is now a discussion, I really just want to learn/decide what a good, efficient, connected mechanism/pattern would be to have daily batch jobs as DAGs that can have multiple downstream things connected by some fixed/constant thing - a la dataset string, but also only do "their work" - i.e. the work related to the upstream job that wrote the dataset/outlet. |
Beta Was this translation helpful? Give feedback.
-
Description
Provide a mechanism to pass data (XCOM?) so that downstream DAGs could know more context about how/why/when/by-what they were triggerered.
Use case/motivation
In order to avoid writing monolithic DAGs, it would seem useful to have separate DAGs focused on discrete Input and Output transforms, which would also allow them to be retried/rescheduled as needed. One could imagine daily batch-processing comprised of several DAGs and think of using the dataset mechanism as a way to trigger efficiently. However, it seems that no information comes along with a dataset passed in each DAG's "schedule". If several days of daily tasks are (re-)scheduled, the outlet of a dataset would not be able to communicate to downstream DAGs what the "datestamp" was for them to process.
As of now the dataset is just a string and, when loosely coupling a producer/consumer via the Dataset, there is no way to communicate specific information about the producer's exact output. There also doesn't appear to be a way to mix-n-match scheduling based on a dataset as well as
@daily
e.g. so there's no way to connect a particular day's producer DAG with a consumer DAG.If a task could query its lineage and specifically get data / XCOM information from the DAG/task/Dataset that triggered it, then it could take efficient actions based on the previous task's specific output location (i.e. its datestamp directory if that's the convention, but could be anything, really if a general way of passing/receiving data were provided.)
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions