Envelope provides the ability for a pipeline to make a decision that will determine which steps will be run. This is achieved by including a decision step that, when itself is run, decides which subsequent steps of the pipeline to remove and which ones to keep.
A decision step makes a decision that returns a true or false result. The if-true-steps
configuration of a decision step specifies which of its dependent steps will be kept if the result is true. The remaining dependent steps will be kept if the result is false.
A decision step can make a decision using one of three methods, which are outlined with examples in the section below.
The literal
decision method takes the true or false result directly from the configuration of the decision step. This method would be useful if the result is provided by a parameter, which in turn can be populated by a spark-submit
argument or an environment variable.
In this self-contained example the value of the ${result}
parameter will determine whether run_if_true
and run_after_run_if_true
, or run_if_false
and run_after_run_if_false
, are run:
application.name = Decision step by literal steps { decide { type = decision if-true-steps = [run_if_true] method = literal result = ${result} } run_if_true { dependencies = [decide] deriver { type = sql query.literal = "SELECT true" } print.data.enabled = true } run_after_run_if_true { dependencies = [run_if_true] deriver { type = sql query.literal = "SELECT 'No, really, it was true!'" } print.data.enabled = true } run_if_false { dependencies = [decide] deriver { type = sql query.literal = "SELECT false" } print.data.enabled = true } run_after_run_if_false { dependencies = [run_if_false] deriver { type = sql query.literal = "SELECT 'No, really, it was false!'" } print.data.enabled = true } }
This pipeline could be run with ${result}
populated by using an argument after the configuration file:
spark-submit envelope-*.jar pipeline.conf result=true
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
|
The step_by_key
decision method takes the result from the data of a previous step, where the result is looked up in that data by a specific key. This method would be useful for making decisions on data quality results that provide a true or false result for each dataset-scoped check.
The data of the step must contain only two columns: first a string (the key), and second a boolean (the result).
In this self-contained example the corresponding value of the test1
key in the generate
step will determine whether run_if_true
and run_after_run_if_true
, or run_if_false
and run_after_run_if_false
, are run:
application.name = Decision step by step by key steps { generate { deriver { type = sql query.literal = "SELECT 'test1', true UNION ALL SELECT 'test2', false" } } decide { dependencies = [generate] type = decision if.true.steps = [run_if_true] decision.method = step_by_key step = generate key = test1 } run_if_true { dependencies = [decide] deriver { type = sql query.literal = "SELECT true" } print.data.enabled = true } run_after_true { dependencies = [run_if_true] deriver { type = sql query.literal = "SELECT 'No, really, it was true!'" } print.data.enabled = true } run_if_false { dependencies = [decide] deriver { type = sql query.literal = "SELECT false" } print.data.enabled = true } run_after_false { dependencies = [run_if_false] deriver { type = sql query.literal = "SELECT 'No, really, it was false!'" } print.data.enabled = true } }
The step_by_value
decision method takes the result from the single boolean value of a previous step. This method would be useful when a previous step has a deriver that aggregates into a single result.
The data of the step must contain a single boolean column and only a single row.
In this self-contained example the sole value of aggregate
step will determine whether run_if_true
and run_after_run_if_true
, or run_if_false
and run_after_run_if_false
, are run:
application.name = Decision step by step by value steps { generate { deriver { type = sql query.literal = "SELECT 'test1' AS key, true AS result UNION ALL SELECT 'test2' AS key, false AS result" } } aggregate { deriver { type = sql query.literal = "SELECT MIN(result) = true AS result FROM generate" } } decide { dependencies = [aggregate] type = decision if.true.steps = [run_if_true] decision.method = step_by_key step = generate key = test1 } run_if_true { dependencies = [decide] deriver { type = sql query.literal = "SELECT true" } print.data.enabled = true } run_after_true { dependencies = [run_if_true] deriver { type = sql query.literal = "SELECT 'No, really, it was true!'" } print.data.enabled = true } run_if_false { dependencies = [decide] deriver { type = sql query.literal = "SELECT false" } print.data.enabled = true } run_after_false { dependencies = [run_if_false] deriver { type = sql query.literal = "SELECT 'No, really, it was false!'" } print.data.enabled = true } }