Skip to content

Latest commit

 

History

History
143 lines (113 loc) · 6.87 KB

repetitions.adoc

File metadata and controls

143 lines (113 loc) · 6.87 KB

Repetitions Guide

Repetitions are a mechanism to allow your Envelope application to re-run batch steps according to some regularly assessed criteria. Although you can configure repetitions for any data step, steps are only repeated as part of a streaming job; that is, unless you have a streaming input somewhere in your configuration, no steps will be repeated.

The canonical use-case for repetitions is to re-load small reference datasets either in response to some external prompt (e.g. flag file, message on a topic, etc) or on a regular schedule (e.g. every 12 hours).

Although some repetition implementations are provided out of the box, they are completely pluggable. In order to develop new repetitions simply implement the com.cloudera.labs.envelope.repetition.Repetition interface, or extend the com.cloudera.labs.envelope.repetition.AbstractRepetition class.

There are two implementations provided as part of core Envelope:

  • ScheduledRepetition - run the step according to a defined schedule, e.g. every 2 hours.

  • FlagFileRepetition - run the step based on the presence of a file on HDFS

See the configuration guide for details of how to configure these repetitions.

How repetitions work

Repetitions are associated with batch steps. When the step is created at the beginning of the Envelope job any repetitions for the step are instantiated. Typically, repetition implementations kick off a regular task as a scheduled thread to check periodically whether the repetition criteria have been met. If the repetition decides it is time to repeat - it is triggered - it should submit the step to a utility class Repetitions which keeps track of all the steps that should be repeated. The streaming job executing in Runner will check in with Repetitions on each micro-batch for any steps that should be repeated. If there are Repetitions returns the set of steps and clears the state ready for new repetitions. Runner takes the set of steps to be repeated and systematically marks those steps and their descendants as not submitted. These batch steps and descendants are then re-run in a subsequent step in the micro-batch.

Repetitions maintains the current list of to-be-repeated steps as a set such that if the same step is triggered for repetition multiple times between micro-batch runs it is re-run only once. In addition, there is a configurable minimum interval between re-runs to prevent the same step being re-run too regularly.

As well as keeping track of steps that need to be re-run, the Repetitions class manages a central ScheduledExecutorPool since that is such a common method of Repetition instances periodically checking whether they need to trigger a step re-run.

In addition, an AbstractRepetition provides some helper methods to make it simple to submit a step to be repeated.

Configuration of Repetitions

As already mentioned, repetitions are configured on a step level. They only really make sense for independent batch input steps, although this is not strictly enforced. One or more repetitions can be configured for a step by adding a repetitions section to the step configuration.

Here’s an example:

steps {
  repeater {
    input {
      type = "com.cloudera.labs.envelope.repetition.DummyBatchInput"
      numrows = 10
    }
    repetitions {
      everyhour {
        type = "schedule"
        every = "1h"
      }
      adhoc {
        type = "flagfile"
        file = "hdfs:///data/symbols/_ready"
      }
    }
  }
}

In this example, the "repeater" step has two configured repetition instances, one scheduled to re-run the step every hour and the other in response to the presence of a flag file on HDFS.

Developing a Repetition

New repetition instances must implement the Repetition API. The AbstractRepetition class has useful state and methods and it is recommended to extend this but this is not mandatory. The AbstractRepetition maintains state for the repetition name, the step it is attached to as well as providing a handy repeatStep() method for when the step needs to be - well - repeated. In order to check periodically it is typical for Repetition classes to implement the Runnable interface and schedule themselves for regular running in the central executor pool. Note that this may not be required in every case.

The Repetition interface only requires that implementers include a configure method that accepts three parameters: the step instance for this repetition, a name for the repetition and the configuration sub-section for the repetition from the config file. The configure method should set up any resources required to run the repetition, such as scheduled threads, handles to external resources etc.

As an example, let’s look at how ScheduledRepetition is implemented. It’s short enough to simply paste the entire code here:

public class ScheduledRepetition extends AbstractRepetition implements Runnable {

  private static final Logger LOG = LoggerFactory.getLogger(ScheduledRepetition.class);

  private static final String FREQUENCY_CONFIG = "every";

  @Override
  public void configure(BatchStep step, String name, Config config) {
    super.configure(step, name, config);
    ConfigUtils.assertConfig(config, FREQUENCY_CONFIG);
    Repetitions.get().submitRegularTask(this, config.getDuration(FREQUENCY_CONFIG, TimeUnit.MILLISECONDS));
  }

  @Override
  public void run() {
    LOG.info("Triggering repetition [" + name + "] for step [" +
        step.getName() + "] at [" + System.currentTimeMillis() + "]");
    repeatStep();
  }

}

Here we can see that the configure method calls the same method on the parent class then validates that all required configuration is present using the ConfigUtils class. Finally, the method submits the instance as a task to be run at a defined regular interval using the central executor service in Repetitions which is obtained using the static get() method. Although not used in this case, the submitRegularTask method of Repetitions returns the task as a ScheduledFuture in case the Repetition instance wants to cancel itself in the future. Note the use of the Typesafe Config getDuration helper method which lets us express durations and intervals in human readable terms such as "5m" or "1d".

The actual work of checking to see if a repetition should be triggered is implemented in the run() method (so it gets executed every time the regular task is executed). Here, the implementation is super simple - since this step should be run at the rate specified in the configuration we simply call repeatStep and Runner and Repetitions do the rest.

Specifying an alias

The ProvidesAlias interface allows for shorter aliases to be used in Envelope config files. This functionality only works if the fully qualified classname is placed in a META-INF/services/com.cloudera.labs.envelope.repetition.Repetition file on the classpath.