How to schedule an hourly job on a daily partitioned asset #14610
-
I have daily asset partitions and would like to refresh the current day's partition every hour. How can this be done without changing my asset partition scheme? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
Hey @yusufshalaby - something like this should work: from dagster import DailyPartitionsDefinition
daily_partitions_def = DailyPartitionsDefinition(start_date=..., end_offset=1)
@asset(partitions_def=daily_partitions_def)
def asset1():
...
@schedule(cron_schedule="@hourly", job=define_asset_job("asset1_job", selection=[asset1]))
def schedule1(context):
partition_date = context.scheduled_execution_time.replace(hour=0)
partition_key = partition_date.strftime(daily_partitions_def.fmt)
return RunRequest(partition_key=partition_key) Note the use of |
Beta Was this translation helpful? Give feedback.
-
Thanks! Slightly out of scope of the original question, but is there a way to pass a partition key range instead of just a partition key? I basically want a schedule with nightly refreshes of all partitions and hourly refreshes of the current day's partition. I can accomplish this right now with run_config_fn but it would be nice to incorporate the partitioning functionality. |
Beta Was this translation helpful? Give feedback.
-
I know this thread hasn't been active for awhile, but I'm having trouble with the suggested implementation, specifically at midnight. Ideally, at midnight on Is there a way to handle this problem where I'd like the full partition window to be 24 hours but instead it's showing at 0 hours? Any help would be appreciated! |
Beta Was this translation helpful? Give feedback.
Hey @yusufshalaby - something like this should work:
Note the use of
end_offset=1
. This allows there to be a partition for the current day. Without that, there's only partitions for days that have …