BigFlow workflow is a series of jobs. Each job is a Python object which executes your data processing logic. It can execute anything that can be executed from Python code, for example:
- Dataproc process
- Apache Beam process
- BigQuery query
- Any Python code
The simplest workflow you can create looks like this:
import bigflow
class SimpleJob(bigflow.Job):
id = 'simple job'
def execute(self, context: bigflow.JobContext):
print(f'Running a simple job')
simple_workflow = bigflow.Workflow(
workflow_id='simple_workflow',
definition=[SimpleJob()],
)
You can run this workflow within a Python module:
simple_workflow.run()
simple_workflow.run_job('simple_job')
Output:
Running a simple job
Running a simple job
Running workflows and jobs from a module is useful for debugging. In any other case, we recommend using BigFlow CLI.
A job is just an object with a unique id
and the execute
method.
The id
parameter is a string that should be a valid Python variable name. For example — 'my_example_job'
, 'MY_EXAMPLE_JOB'
, 'job1234'
.
The Job execute
method has a single argument context
of a type bigflow.JobContext
.
It keeps execution timestamp, reference to workflow. You can find more information about context
and scheduling workflow scheduling options.
There are 3 additional parameters, that a job can supply to Airflow: retry_count
, retry_pause_sec
and execution_timeout
. The retry_count
parameter
determines how many times a job will be retried (in case of a failure). The retry_pause_sec
parameter says how long the pause between retries should be.
The execution_timeout
says how long Airflow should wait for job to finish. The default value for the execution_timeout
parameter
is 3 hours.
import bigflow
class SimpleRetriableJob(bigflow.Job):
def __init__(self, id):
self.id = id
self.retry_count = 20
self.retry_pause_sec = 100
def execute(self, context: bigflow.JobContext):
print("execution runtime as `datetime` object", context.runtime)
print("reference to workflow", context.workflow)
The Workflow
class takes 2 main parameters: workflow_id
and definition
.
The workflow_id
parameter is a string that should be a valid Python variable name. For example: 'my_example_workflow'
, 'MY_EXAMPLE_WORKFLOW'
, 'workflow1234'
.
The Workflow
class arranges jobs into a DAG, through the definition
parameter.
There are two ways of specifying the job arrangement. When your jobs are executed sequentially, simply pass them in a list of jobs:
import bigflow
class Job(bigflow.Job):
def __init__(self, id):
self.id = id
def execute(self, context: bigflow.JobContext):
print(f'Running job {self.id} at {context.runtime}')
example_workflow = bigflow.Workflow(
workflow_id='example_workflow',
definition=[Job('1'), Job('2')],
)
example_workflow.run()
Output:
Running job 1 at 2020-01-01 00:00:00
Running job 2 at 2020-01-01 00:00:00
When some of your jobs are executed concurrently, pass them using the Definition object. It allows you to create a graph of jobs (DAG).
Let us say that we want to create the following DAG:
/--job2--\
job1--> -->job4
\--job3--/
The implementation:
from bigflow import Workflow, Job, Definition
job1, job2, job3, job4 = Job('1'), Job('2'), Job('3'), Job('4')
graph_workflow = Workflow(
workflow_id='graph_workflow',
definition=Definition({
job1: [job2, job3],
job2: [job4],
job3: [job4],
}),
)
graph_workflow.run()
As you can see below, the Workflow.run
method executes jobs using the pre-order traversal:
Running job 1 at 2020-01-01 00:00:00
Running job 2 at 2020-01-01 00:00:00
Running job 3 at 2020-01-01 00:00:00
Running job 4 at 2020-01-01 00:00:00
The Workflow
class provides the run
and run_job
methods. When you run a single job through the Workflow.run_job
method,
without providing the runtime
parameter, the Workflow
class passes the current date-time (local time) as default.
import datetime
from bigflow import Workflow, Job
simple_workflow = Workflow(
workflow_id='simple_workflow',
definition=[Job('1')],
)
simple_workflow.run_job('1')
simple_workflow.run()
simple_workflow.run_job('1', datetime.datetime(year=1970, month=1, day=1))
simple_workflow.run(datetime.datetime(year=1970, month=1, day=1))
The Workflow.run
method ignores job parameters like retry_count
, retry_pause_sec
and execution_timeout
. It executes a workflow in a
sequential (non-parallel) way. It's not used by Airflow.
The most important parameter for a workflow is runtime
. BigFlow workflows process data in batches,
where batch means: all units of data having timestamps within a given period. The runtime
parameter defines this period.
When a workflow is deployed on Airflow, the runtime
parameter is taken from Airflow execution_date
.
It will be formatted as YYYY-MM-DD hh-mm-ss
.
The schedule_interval
parameter defines when a workflow should be run. It can be a cron expression or a "shortcut".
For example: '@daily'
, '@hourly'
, '@once'
, '0 0 * * 0'
.
The start_time_factory
parameter determines the first runtime
. Even though you provide the start-time
parameter
for the build
command, you might want to start different workflows in a different time. The start_time_factory
allows you to define a proper start time for each of your workflows, based on the provided start-time
.
Let us say that you have two workflows running hourly. You want the first workflow to start in the next hour after the
start-time
, and the second workflow to start one hour before the start-time
. So if you set the start-time
at
2020-01-01 12:00:00
, the first workflow starts at 2020-01-01 13:00:00
and the second starts
at 2020-01-01 11:00:00
.
To achieve that, you need the following factories:
from datetime import datetime, timedelta
# Workflow 1
def next_hour(start_time: datetime) -> datetime:
return start_time + timedelta(hours=1)
# Workflow 2
def prev_hour(start_time: datetime) -> datetime:
return start_time - timedelta(hours=1)
The start_time_factory
parameter should be a callable which takes a start-time
as parameter, and returns datetime
.
Returned datetime
is used as the final start-time
for a workflow.
The default value of the start_time_factory
supports daily workflows. It looks like this:
import datetime as dt
def daily_start_time(start_time: dt.datetime) -> dt.datetime:
td = dt.timedelta(hours=24)
return start_time.replace(hour=0, minute=0, second=0, microsecond=0) - td
This factory sets a processing start point as the day before a provided start-time
. Let us say that you set the start-time
parameter as 2020-01-02 00:00:00
. Then, the final start-time
is 2020-01-01 00:00:00
.
You can specify a different variable from which DAGs read config name. By default, that variable
is set to env
and this can be overridden with env_variable
option:
example_workflow = Workflow(
workflow_id='example_workflow',
env_variable='prod_env',
definition=[
SomeJob(),
],
)
When you run a workflow daily, runtime
means all data with timestamps within a given day.
For example:
import bigflow
from bigflow import Workflow
import datetime
class DailyJob(bigflow.Job):
id = 'daily_job'
def execute(self, context):
dt1 = context.runtime
dt2 = dt1 + datetime.timedelta(days=1, seconds=-1)
print(f'I should process data with timestamps from: {dt1} to {dt2}')
daily_workflow = Workflow(
workflow_id='daily_workflow',
schedule_interval='@daily',
definition=[
DailyJob(),
],
)
daily_workflow.run(datetime.datetime(2020, 1, 1))
Output:
I should process data with timestamps from 2020-01-01 00:00 to 2020-01-01 23:59:00
When you run a workflow hourly, runtime
means all data with timestamps within a given hour.
For example:
import datetime
import bigflow
from bigflow.workflow import hourly_start_time
class HourlyJob(bigflow.Job):
id = 'hourly_job'
def execute(self, context):
print(f'I should process data with timestamps from: {context.runtime} '
f'to {context.runtime + datetime.timedelta(minutes=59, seconds=59)}')
hourly_workflow = bigflow.Workflow(
workflow_id='hourly_workflow',
schedule_interval='@hourly',
start_time_factory=hourly_start_time,
definition=[HourlyJob()],
)
hourly_workflow.run(datetime.datetime(2020, 1, 1, 10))
Output:
I should process data with timestamps from 2020-01-01 10:00:00 to 2020-01-01 10:59:59