Derivers are used in Envelope pipelines to derive new data. Derivers transform data that has already been brought into the pipeline in previous steps, whether straight from inputs or from the results of previous derivers. Derivers can operate over one or multiple steps at a time. Derivers operate identically in batch and streaming modes. As a mapping back to Spark, derivers create new DataFrames from existing DataFrames.
Envelope provides a number of derivers out of the box and also allows custom derivers to be specified.
The sql
deriver is used to run SQL to derive new data in the pipeline. Queries submitted to the deriver are executed by Spark SQL. The results of the SQL query then becomes the data of the step that contains the deriver.
The query provided to the deriver can select from two types of sources:
-
From previous steps by referencing the previous step name as the table name
-
From the Hive metastore by referencing the Hive table name, and optionally with a Hive metastore database name prefix (i.e.
databasename.tablename
)
A query can be provided to a SQL deriver instance in one of two ways:
-
The
query.literal
configuration is used to provide the SQL query directly in the pipeline configuration file. This is good for very short queries or for rapid development. -
The
query.file
configuration is used to provide the path to an HDFS file that contains the SQL query to run. This is good for long queries or for where the development of the query is done separately to the pipeline.
Note that the SQL deriver should only be used to run SELECT queries that derive new data. To write data outside of the pipeline the step should additionally specify a planner and output.
Query parameters can be populated at runtime in two different ways:
-
If the query is provided using
query.literal
then an environment variable reference can be concatenated with the query string. -
If the query is provided using either
query.file
orquery.literal
then a parameter string encoded in the form${parameter_name}
that is embedded within the query string itself will be replaced by the value of the deriver configurationparameter.parameter_name
.
For example, where traffic
was a previous step in the pipeline:
deriver { type = sql query.literal = """ SELECT UNIX_TIMESTAMP() * ${timestamp_multiplier} as_of_time , ROUND(AVG(number_of_vehicles), """${DECIMAL_PLACES}""") avg_num_veh , MIN(number_of_vehicles) min_num_veh , MAX(number_of_vehicles) max_num_veh , MIN(measurement_time) first_meas_time , MAX(measurement_time) last_meas_time FROM traffic""" parameter { timestamp_multiplier = ${TIMESTAMP_MULTIPLIER} } }
This shows both methods for populating parameters. In this example ${DECIMAL_PLACES}
and ${TIMESTAMP_MULTIPLIER}
are both populated from the environment variables DECIMAL_PLACES
and TIMESTAMP_MULTIPLIER
. The concatentation usage of ${DECIMAL_PLACES}
does not require the parameter
section entry, but only applies if the query is provided by query.literal
. The embedded usage of ${timestamp_multiplier}
does require the parameter section entry, but will also apply if the query is provided by query.file
.
The morphline
deriver is used to run Morphline transformations over the records of a single dependency of the step defined by the step.name
parameter.
The Morphline transformation is provided to the Envelope pipeline by a local file to the Spark executors. The local file is retrieved from the location in the morphline.file
configuration. The local file can be provided to the Spark executors from spark-submit
using the --files
option.
The ID of the specific transformation within the Morphline file is specified with the morphline.id
configuration.
The deriver requires the output schema of the Morphline transformation to be provided. The schema can be provided in a number of ways, as detailed in the Inputs Guide.
Each input row must correlate to a single output row, unless error.on.empty
is set to false
, in which case an empty output is permissible. Any fields that are not nullable must be specified in the output record.
The nest
deriver is used to nest the data of one step within another by a common join key. This is useful for denormalizing a one-to-many relationship without repeating values on the one-cardinality side. This type of data modeling is known as a supernova schema.
To configure the deriver to nest a one-to-many relationship, specify:
-
The one-cardinality step name in
nest.into
-
The many-cardinality step name in
nest.from
-
The join key field names in
key.field.names
-
The name of the nested field on the derivation in
nested.field.name
Consider the following simple example where we have a customers table and an orders table (a one-to-many relationship because a customer can have many orders but an order can only belong to one customer) and we want to nest the orders for a customer on to the customer’s record so that we can query across the two data sets without the cost of joining the two at runtime.
customers
:
customer_id | name |
---|---|
10000 |
Jane |
10001 |
Joe |
orders
:
order_id | product_name | customer_id |
---|---|---|
1000 |
Envelopes |
10000 |
1001 |
Stamps |
10000 |
1002 |
Pens |
10000 |
1003 |
Paper |
10001 |
To nest the orders
step into the customers
step we could run a subsequent step with:
... steps { customers { ... } orders { ... } customers_nested { dependencies = [customers, orders] deriver { type = nest nest.from = orders nest.into = customers key.field.names = [customer_id] nested.field.name = customer_orders } ... } ... } ...
Which would produce the derived result:
customers_nested
:
customer_id | name | customer_orders | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
10000 |
Jane |
|
||||||||||||
10001 |
Joe |
|
In Impala if was then written to a Parquet table the data could be queried with syntax like:
SELECT c.customer_name, COUNT(o.order_id) FROM customers_nested c, c.customer_orders o;
For more information on querying nested tables using Impala, see the complex types documentation.
The passthrough
deriver simply unions all of its dependencies together. All of the dependencies must have the same schema.
The pivot
deriver is used to pivot (otherwise known as transpose) key-value-pair data where the derivation has a column per unique key. This can be useful when the source data model of the pipeline defines attributes for an entity via key-value-pairs but the transformed data model of the pipeline should have distinct columns per attribute for simpler and more efficient analytics querying.
To configure the deriver to pivot a previous step that contains attributes for an entity defines as key-value-pairs, specify:
-
The name of the previous step to pivot with
step.name
-
The fields that define the entity key with
entity.key.field.names
-
The field that defines the key of the key-value-pairs with
pivot.key.field.name
-
The field that defines the value of the key-value-pairs with
pivot.value.field.name
-
The method to use for retrieving the entity attributes with
pivot.keys.source
. For the deriver to dynamically find the distinct keys of the key-value-pairs usedynamic
. To provide a static list of keys usestatic
. -
The static list of keys with
pivot.keys.list
, when using thestatic
method for retrieving pivot keys.
Consider the following simple example where we have a key-value-pairs step that captures the attributes of each customer with one record per attribute per customer, and we want to derive a pivoted (transposed) step that captures the same attributes of each customer but with one record per customer.
customers_kvp
:
customer_id | key | value |
---|---|---|
10000 |
name |
Jane |
10000 |
state |
NY |
10000 |
balance |
50000.0 |
10001 |
name |
Joe |
10001 |
state |
CA |
10001 |
balance |
30000.0 |
To pivot the customers_kvp
step we could run a subsequent step with:
... steps { customers_kvp { ... } customers_pivoted { dependencies = [customers_kvp] deriver { type = pivot step.name = customers_kvp entity.key.field.names = [customer_id] pivot.key.field.name = key pivot.value.field.name = value pivot.keys.source = dynamic } ... } ... } ...
Which would produce the derived result:
customers_kvp
:
customer_id | name | state | balance |
---|---|---|---|
10000 |
Jane |
NY |
50000.0 |
10001 |
Joe |
CA |
30000.0 |
The exclude
deriver executes a LEFT ANTI JOIN
on two designated dependencies on a set of common fields between the two. Commonly, this deriver is used for easy de-duplication within a pipeline.
The equivalent SQL statement would read:
SELECT Left.* FROM Left LEFT ANTI JOIN Right USING (field1, field2)
The select
deriver includes or excludes required set of columns in output from an input dependency. Deriver takes either a include-fields
list of columns that needs to be included in output or a exclude-fields
list of columns that needs to be removed from output. Both include-fields
and exclude-fields
list of column cannot be provided at the same time.
The dq
deriver can be used to perform data quality checks on a dataset using a set of user-defined
rules. Rules can be applied at two scopes: at dataset or row level. For dataset scope, the rules are
evaluated against the dataset as a whole and the derived result is a dataset containing one row per rule indicating a pass or fail. The
schema of the dataset is name: String, result: Boolean
. For
example, the result might be:
name | result |
---|---|
namecheck |
true |
agerange |
false |
Row level scope takes the list of rules and applies them to every row for the defined input dependency.
The results of the checks are appended to the rows as a field of type map<string, boolean>
called
results
by default. The results would look something like:
name | age | results |
---|---|---|
Ian |
null |
{"namenotnull":true,"agerange":false} |
Webster |
21 |
{"namenotnull":true,"agerange":true} |
Envelope has a number of built-in rules (see below) but allows for custom user-defined rules via fully-qualified class name. See the config guide for specific configuration parameters.
The following row-level rules are provided:
-
checknulls
- check for the null values in one or more fields in a row -
enum
- check one or more fields against a list of allowed values (non-floating point numerics and strings) -
range
- check one or more numeric fields is between lower and upper bounds (inclusive) -
regex
- check one or more string fields against an allowed pattern
The following rules are defined at the dataset scope:
-
count
- ensure the dataset has an expected count. The count may either statically defined or loaded as a dependency from another step. If the latter, the Dataset must contain a single row with a single field of type long. -
checkschema
- ensure the dataset matches the schema. Currently only supports primitive types.
In addition, any defined row-level rule can be applied at the dataset scope. In this case, the deriver simply logically ANDs the individual results from each row check into a single boolean result for the rule.
If specifying multiple dependencies, the user must specify to which dependency the dataset-level rules
should be applied using the dataset
configuration parameter.
If using multiple dataset level checks on the same dataset it is recommended to employ the cache
hint
on the dependency containing the data to be checked.
An example configuration containing both dataset and row-level DQ derivers is as follows:
...
steps {
dqparams {
input {
type = filesystem
format = json
path = "hdfs:///tmp/dqparams"
}
}
mydata {
input {
type = filesystem
format = json
path = "hdfs:///tmp/data"
}
}
checkmydata {
dependencies = [mydata,dqparams]
deriver {
type = dq
scope = dataset
dataset = mydata
rules {
r1 {
type = count
expected.dependency = dqparams
}
r2 {
type = checkschema
schema {
type = flat
field.names = ["name", "address", "age"]
field.types = ["string", "string", "int"]
}
}
r3 {
// row-level rule being run in dataset scope
type = regex
fields = ["name"]
regex = "[a-zA-Z' ]{1,}"
}
r4 {
// row-level rule beingf run in dataset scope
type = enum
fields = ["name"]
values = ["Ian","Jeremy","Webster"]
fieldtype = string
case-sensitive = false
}
}
}
}
checkrows {
dependencies = [mydata]
deriver {
type = dq
scope = row
rules {
r1 {
type = checknulls
fields = [ "name", "address", "age" ]
}
r2 {
type = regex
fields = ["name"]
regex = "[a-zA-Z' ]{1,}"
}
r3 {
type = range
fields = ["age"]
fieldtype = "int"
range = [0,150]
ignore-nulls = true
}
}
}
}
}
...
Users wishing to specify custom rules can extend either the RowRule
or DatasetRule
interface. Row level rules should implement a check(Row row)
method returning a boolean result. Dataset
scope rules should implement a check(Dataset<Row> dataset, Map<String, Dataset<Row>> stepDependencies)
method which returns a Dataset with a row per rule with the schema name: String, result: Boolean
.
Row level rules are automatically wrapped in DatasetRowRuleWrapper
when used in a dataset scope.
The custom rules may also implement the ProvidesAlias
interface which allows an alias to be used instead of the fully-qualified class name in Envelope config files. The implementation must be placed in a META-INF/services/com.cloudera.labs.envelope.derive.dq.DatasetRule
or META-INF/services/com.cloudera.labs.envelope.derive.dq.RowRule
file on the classpath.
The distinct
deriver simply returns the distinct rows from its dependency. If there is more than one dependency then configuration parameter step
must be used to identify which step should distinct
operation be applied to. This parameter is optional when deriver has only one dependency.
The in-list
deriver filters rows in a dataset by comparing one of the columns to a list of values. This list could be statically defined as a literal in the deriver’s configuration, or it could be dynamically generated from one of the steps in dependencies
. In the latter case, the resulting list can be batched to manage its construction and filtering; the default batch size is 1000
.
Step that selects from the airports
dataset only the rows that have airport_code
for Newark (EWR) and Pittsburgh (PIT):
... literal_list { dependencies = [airports, other_dependencies, ...] deriver { type = in-list step = airports field = airport_code values.literal = [EWR, PIT] } } ...
Another example where in-list
deriver filters data from the airports
dataset, but this time the rows are selected by airport_code
via look-up in another dataset:
... reference_list { dependencies = [airports, delays_top_ten, other_dependencies, ...] deriver { type = in-list step = airports field = airport_code values.reference.step = delays_top_ten values.reference.field = airport_fk } } ...
Caution
|
In-list filtering using a reference step can be very slow if there are many values in the reference step. A standard SQL join might be more efficient if filtering over more than 1000 values. |
The hash
deriver can be used to append a new column that is populated with the MD5 hash of all the pre-existing fields. This can be useful for providing a (extremely likely) unique value for a row for assisting with change data capture, and for the requirements of data vault modeling.
The MD5 hash is applied to the concatenation of all of the pre-existing values of a row. By default the concatenation will use an empty string delimiter and will replace nulls with the string '__NULL__'. The appended column will contain the 32 byte hexadecimal string representation of the MD5 hash.
The name of the dependency step to be hashed can be specified with the step
configuration. If this configuration is not provided then the step must only have one dependency.
The delimiter and null replacement strings can be overridden using the delimiter
and null-string
configurations.
For the step previous_step
:
col1 | col2 | col3 | col4 | col5 |
---|---|---|---|---|
hello |
1 |
NULL |
-1.0 |
true |
And the default behavior of the deriver:
... hash_step { dependency = [previous_step] deriver { type = hash } } ...
This data will be generated by hash_step
:
col1 | col2 | col3 | col4 | col5 | hash |
---|---|---|---|---|---|
hello |
1 |
NULL |
-1.0 |
true |
4891a9d87f8f46a5c8c19c3059864146 |
The latest
deriver can be used to filter a dataset for only the latest record of each key. This can be useful if the dependency dataset has the history of versions of each key, but just the latest version of each key needs to be selected. The term 'latest' is defined by the record with the highest 'timestamp' value of the key, where that value can be of any data type.
Note
|
In general a valid dataset will not have more than one version of a key with the same timestamp. However, in the scenario where more than one version of a key has the same highest timestamp only one will be selected but it is not defined which of those it will be. |
The dependency step to run the deriver on is specified by the step
configuration. The list of key field names is specified by the key-fields
configuration. The field name of the timestamp is specified by the timestamp-field
configuration.
For the step previous_step
:
key | value | ts |
---|---|---|
1001 |
hello |
2018-10-19 14:30:12 |
1001 |
world |
2018-10-20 05:19:46 |
1002 |
beep |
2018-10-19 14:30:12 |
1002 |
boop |
2018-10-20 05:19:46 |
And this configuration of the deriver:
... latest_step { dependency = [previous_step] deriver { type = latest key-field-names = [key] timestamp-field = ts } } ...
This data will be generated by latest_step
:
key | value | ts |
---|---|---|
1001 |
world |
2018-10-20 05:19:46 |
1002 |
boop |
2018-10-20 05:19:46 |
The translate
deriver can be used to translate a single field value into multiple fields.
This is useful when a field contains a serialized record and where the serialized record’s fields need to be individually accessed.
When the deriver translates a field it removes the pre-translated field and appends the translated fields to the end of the row.
Rows that could not be translated (because the translator threw an exception) will be available as a separate step with the name of the step that the translate deriver is specified in, plus the string "_errored". e.g. Errored rows from a step named 'customers' will be found in the step 'customers_errored'.
The step
configuration specifies the dependency step that will be used.
The field
configuration specifies the name of the field that will be translated.
The translator
configuration specifies the translator that will translate the field.
For the step previous_step
:
col1 | col2 | col3 |
---|---|---|
1000 |
hello:world |
false |
And this configuration of the deriver:
... translate_step { dependency = [previous_step] deriver { type = translate field = col2 translator { type = delimited delimiter = ":" schema { type = flat field.names = [col4, col5] field.types = [string, string] } } } } ...
This data will be generated by translate_step
:
col1 | col3 | col4 | col5 |
---|---|---|---|
1000 |
false |
hello |
world |
The sparkml
deriver can be used to execute Spark ML pipeline models over a previous step.
This can be useful for executing machine learning algorithms in an Envelope pipeline in a distributed fashion.
Spark ML supports many common machine learning algorithms.
The pipeline model should be created in advance using Spark ML and saved using the PipelineModel#save
method.
This deriver can then reference the saved pipeline model via the model-path
configuration.
The step
configuration can be used to specify which step to execute the model over, however it is optional when the deriver step only contains one dependency.
For an example of creating a Spark ML pipeline model see the Spark ML documentation.
The parse-json
deriver can be used to parse JSON record strings into corresponding fields on the original record. The deriver uses Spark’s built-in JSON parsing functionality.
The step
and field
configurations are used to specify the dependency step and the field within that step that contains the JSON strings.
By default the deriver will place the top-level fields from the parsed JSON as fields on the original record, and remove the JSON string.
If there is a field name conflict with existing fields the deriver can instead place the parsed fields within a struct field on the original record.
This can be set by specifying the as-struct
configuration to true
, and the struct-field
configuration to the name to be given to the struct field.
The schema of the JSON records must be specified using the schema
configuration.
Spark’s JSON parser allows multiple options to change the default parsing behaviour.
Refer to the Spark documentation for available options.
These options can be set as configurations in the deriver with the option.
configuration prefix.
For example, the configuration option.mode = FAILFAST
will set the mode
option to FAILFAST
.
For the step previous_step
:
col1 | col2 | col3 |
---|---|---|
1000 |
{"col4": 1, "col5": "hello"} |
false |
And this configuration of the deriver:
... parse_json_step { dependency = [previous_step] deriver { type = parse-json step = previous_step field = col2 schema { type = flat field.names = [col4, col5] field.types = [int, string] } } } ...
This data will be generated by parse_json_step
:
col1 | col3 | col4 | col5 |
---|---|---|---|
1000 |
false |
1 |
hello |
In cases that Envelope does not provide a deriver that meets the requirements for a particular derivation a custom deriver can be developed and provided instead.
Envelope is pluggable so that Envelope itself does not need to be modified. Instead a separate jar that only contains the deriver(s) for the pipeline can be created.
To create a new deriver, first start a new Java or Scala project that has a dependency on the Envelope version you are using. You do not need to include Envelope in the packaged jar.
For example, if you are using Maven:
<dependency> <groupId>com.cloudera.labs.envelope</groupId> <artifactId>envelope-core</artifactId> <version>**Envelope version being used here**</version> <scope>provided</scope> </dependency>
With the configured project you can develop the deriver by adding a class that implements the Deriver interface.
The two methods in the interface are:
-
configure
to receive the configurations of thederiver
section of the step. This can be used to retrieve any custom configurations required by the deriver. -
derive
to run a derivation. Thedependencies
argument provides the name and Spark DataFrame for each of the dependencies of the step that contains the deriver. The return value is the DataFrame that represents the derivation. Access to the SparkSession object is available from the static methodContexts#getSparkSession
.
To reference the deriver in your pipeline simply use the deriver’s fully qualified class name (or alias—see below) as the deriver type. For example:
... deriver { type = com.yourcompany.envelope.deriver.CustomDeriver customproperty1 = ... ... } ...
To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias
interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.deriver.Deriver
file on the class path - the usual method is to package the file with your JAR.
With the project compiled into a jar file the deriver can be submitted as part of the Envelope pipeline similarly to:
spark-submit --jars customderiver.jar envelope-*.jar pipeline.conf
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
|
The jar file can contain multiple derivers, and other pluggable classes such as custom inputs, outputs, etc.
When developing a custom deriver keep in mind:
-
Derivers are only for deriving new data, and should not lead to side effects outside of the deriver, such as writing to an output or changing external metadata.
-
Derivers are often highly reusable, so avoid hard-coding values or field names into the deriver and have them be given at runtime through configuration instead.
-
Derivers are usually most efficient when they operate only on the Dataset/DataFrame API. If possible avoid converting to the RDD API and then back again.
-
You can look at the code of the provided derivers for hints as to how structure your own deriver.
-
There are utility classes in the .utils package that may already provide some of the functionality you need to put together your derivation logic.