Repetitions are a mechanism to allow your Envelope application to re-run batch steps according to some regularly assessed criteria. Although you can configure repetitions for any data step, steps are only repeated as part of a streaming job; that is, unless you have a streaming input somewhere in your configuration, no steps will be repeated.
The canonical use-case for repetitions is to re-load small reference datasets either in response to some external prompt (e.g. flag file, message on a topic, etc) or on a regular schedule (e.g. every 12 hours).
Although some repetition implementations are provided out of the box, they are completely
pluggable. In order to develop new repetitions simply implement the com.cloudera.labs.envelope.repetition.Repetition
interface, or extend the com.cloudera.labs.envelope.repetition.AbstractRepetition
class.
There are two implementations provided as part of core Envelope:
-
ScheduledRepetition
- run the step according to a defined schedule, e.g. every 2 hours. -
FlagFileRepetition
- run the step based on the presence of a file on HDFS
See the configuration guide for details of how to configure these repetitions.
Repetitions are associated with batch steps. When the step is created at the beginning of the
Envelope job any repetitions for the step are instantiated. Typically, repetition implementations
kick off a regular task as a scheduled thread to check periodically whether the repetition criteria
have been met. If the repetition decides it is time to repeat - it is triggered - it should submit the step to
a utility class Repetitions
which keeps track of all the steps that should be repeated. The
streaming job executing in Runner
will check in with Repetitions
on each micro-batch for
any steps that should be repeated. If there are Repetitions
returns the set of steps and
clears the state ready for new repetitions. Runner
takes the set of steps to be repeated and systematically
marks those steps and their descendants as not submitted. These batch steps and descendants are then
re-run in a subsequent step in the micro-batch.
Repetitions
maintains the current list of to-be-repeated steps as a set such that if the same
step is triggered for repetition multiple times between micro-batch runs it is re-run only
once. In addition, there is a configurable minimum interval between re-runs to prevent the
same step being re-run too regularly.
As well as keeping track of steps that need to be re-run, the Repetitions
class manages a central
ScheduledExecutorPool since that is such a common method of Repetition
instances periodically
checking whether they need to trigger a step re-run.
In addition, an AbstractRepetition
provides some helper methods to make it simple to submit a
step to be repeated.
As already mentioned, repetitions are configured on a step level. They only really make sense for
independent batch input steps, although this is not strictly enforced. One or more repetitions
can be configured for a step by adding a repetitions
section to the step configuration.
Here’s an example:
steps { repeater { input { type = "com.cloudera.labs.envelope.repetition.DummyBatchInput" numrows = 10 } repetitions { everyhour { type = "schedule" every = "1h" } adhoc { type = "flagfile" file = "hdfs:///data/symbols/_ready" } } } }
In this example, the "repeater" step has two configured repetition instances, one scheduled to re-run the step every hour and the other in response to the presence of a flag file on HDFS.
New repetition instances must implement the Repetition
API. The AbstractRepetition
class
has useful state and methods and it is recommended to extend this but this is not mandatory.
The AbstractRepetition
maintains state for the repetition name, the step it is attached to
as well as providing a handy repeatStep()
method for when the step needs to be - well - repeated.
In order to check periodically it is typical for Repetition
classes to implement the Runnable
interface
and schedule themselves for regular running in the central executor pool. Note that this may not
be required in every case.
The Repetition
interface only requires that implementers include a configure
method that
accepts three parameters: the step instance for this repetition, a name for the repetition and the configuration
sub-section for the repetition from the config file. The configure method should set up
any resources required to run the repetition, such as scheduled threads, handles to external
resources etc.
As an example, let’s look at how ScheduledRepetition
is implemented. It’s short enough to simply
paste the entire code here:
public class ScheduledRepetition extends AbstractRepetition implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ScheduledRepetition.class); private static final String FREQUENCY_CONFIG = "every"; @Override public void configure(BatchStep step, String name, Config config) { super.configure(step, name, config); ConfigUtils.assertConfig(config, FREQUENCY_CONFIG); Repetitions.get().submitRegularTask(this, config.getDuration(FREQUENCY_CONFIG, TimeUnit.MILLISECONDS)); } @Override public void run() { LOG.info("Triggering repetition [" + name + "] for step [" + step.getName() + "] at [" + System.currentTimeMillis() + "]"); repeatStep(); } }
Here we can see that the configure
method calls the same method on the parent class then
validates that all required configuration is present using the ConfigUtils
class. Finally,
the method submits the instance as a task to be run at a defined regular interval using the
central executor service in Repetitions
which is obtained using the static get()
method.
Although not used in this case, the submitRegularTask
method of Repetitions
returns the
task as a ScheduledFuture
in case the Repetition
instance wants to cancel itself in the
future. Note the use of the Typesafe Config getDuration
helper method which lets us express
durations and intervals in human readable terms such as "5m" or "1d".
The actual work of checking to see if a repetition should be triggered is implemented in the
run()
method (so it gets executed every time the regular task is executed). Here, the
implementation is super simple - since this step should be run at the rate specified in the
configuration we simply call repeatStep
and Runner
and Repetitions
do the rest.