Skip to content

How to schedule an hourly job on a daily partitioned asset #14610

Answered by sryza
yusufshalaby asked this question in Q&A
Discussion options

You must be logged in to vote

Hey @yusufshalaby - something like this should work:

from dagster import DailyPartitionsDefinition

daily_partitions_def = DailyPartitionsDefinition(start_date=..., end_offset=1)

@asset(partitions_def=daily_partitions_def)
def asset1():
    ...

@schedule(cron_schedule="@hourly", job=define_asset_job("asset1_job", selection=[asset1]))
def schedule1(context):
    partition_date = context.scheduled_execution_time.replace(hour=0)
    partition_key = partition_date.strftime(daily_partitions_def.fmt)
    return RunRequest(partition_key=partition_key)

Note the use of end_offset=1. This allows there to be a partition for the current day. Without that, there's only partitions for days that have …

Replies: 3 comments 4 replies

Comment options

You must be logged in to vote
2 replies
@adityawarmanfw
Comment options

@sryza
Comment options

Answer selected by sryza
Comment options

You must be logged in to vote
2 replies
@sryza
Comment options

@yusufshalaby
Comment options

Comment options

You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
area: partitions Related to Partitions area: schedule Related to schedules
4 participants