Skip to content

Latest commit

 

History

History
726 lines (560 loc) · 27 KB

derivers.adoc

File metadata and controls

726 lines (560 loc) · 27 KB

Derivers Guide

Derivers are used in Envelope pipelines to derive new data. Derivers transform data that has already been brought into the pipeline in previous steps, whether straight from inputs or from the results of previous derivers. Derivers can operate over one or multiple steps at a time. Derivers operate identically in batch and streaming modes. As a mapping back to Spark, derivers create new DataFrames from existing DataFrames.

Envelope provides a number of derivers out of the box and also allows custom derivers to be specified.

Provided derivers

SQL

The sql deriver is used to run SQL to derive new data in the pipeline. Queries submitted to the deriver are executed by Spark SQL. The results of the SQL query then becomes the data of the step that contains the deriver.

The query provided to the deriver can select from two types of sources:

  • From previous steps by referencing the previous step name as the table name

  • From the Hive metastore by referencing the Hive table name, and optionally with a Hive metastore database name prefix (i.e. databasename.tablename)

A query can be provided to a SQL deriver instance in one of two ways:

  • The query.literal configuration is used to provide the SQL query directly in the pipeline configuration file. This is good for very short queries or for rapid development.

  • The query.file configuration is used to provide the path to an HDFS file that contains the SQL query to run. This is good for long queries or for where the development of the query is done separately to the pipeline.

Note that the SQL deriver should only be used to run SELECT queries that derive new data. To write data outside of the pipeline the step should additionally specify a planner and output.

Parameters

Query parameters can be populated at runtime in two different ways:

  • If the query is provided using query.literal then an environment variable reference can be concatenated with the query string.

  • If the query is provided using either query.file or query.literal then a parameter string encoded in the form ${parameter_name} that is embedded within the query string itself will be replaced by the value of the deriver configuration parameter.parameter_name.

Example

For example, where traffic was a previous step in the pipeline:

deriver {
  type = sql
  query.literal = """
    SELECT
      UNIX_TIMESTAMP() * ${timestamp_multiplier} as_of_time
    , ROUND(AVG(number_of_vehicles), """${DECIMAL_PLACES}""") avg_num_veh
    , MIN(number_of_vehicles) min_num_veh
    , MAX(number_of_vehicles) max_num_veh
    , MIN(measurement_time) first_meas_time
    , MAX(measurement_time) last_meas_time
    FROM traffic"""
  parameter {
    timestamp_multiplier = ${TIMESTAMP_MULTIPLIER}
  }
}

This shows both methods for populating parameters. In this example ${DECIMAL_PLACES} and ${TIMESTAMP_MULTIPLIER} are both populated from the environment variables DECIMAL_PLACES and TIMESTAMP_MULTIPLIER. The concatentation usage of ${DECIMAL_PLACES} does not require the parameter section entry, but only applies if the query is provided by query.literal. The embedded usage of ${timestamp_multiplier} does require the parameter section entry, but will also apply if the query is provided by query.file.

Morphline

The morphline deriver is used to run Morphline transformations over the records of a single dependency of the step defined by the step.name parameter.

The Morphline transformation is provided to the Envelope pipeline by a local file to the Spark executors. The local file is retrieved from the location in the morphline.file configuration. The local file can be provided to the Spark executors from spark-submit using the --files option.

The ID of the specific transformation within the Morphline file is specified with the morphline.id configuration.

The deriver requires the output schema of the Morphline transformation to be provided. The schema can be provided in a number of ways, as detailed in the Inputs Guide.

Each input row must correlate to a single output row, unless error.on.empty is set to false, in which case an empty output is permissible. Any fields that are not nullable must be specified in the output record.

Nest

The nest deriver is used to nest the data of one step within another by a common join key. This is useful for denormalizing a one-to-many relationship without repeating values on the one-cardinality side. This type of data modeling is known as a supernova schema.

To configure the deriver to nest a one-to-many relationship, specify:

  • The one-cardinality step name in nest.into

  • The many-cardinality step name in nest.from

  • The join key field names in key.field.names

  • The name of the nested field on the derivation in nested.field.name

Example

Consider the following simple example where we have a customers table and an orders table (a one-to-many relationship because a customer can have many orders but an order can only belong to one customer) and we want to nest the orders for a customer on to the customer’s record so that we can query across the two data sets without the cost of joining the two at runtime.

customers:

customer_id name

10000

Jane

10001

Joe

orders:

order_id product_name customer_id

1000

Envelopes

10000

1001

Stamps

10000

1002

Pens

10000

1003

Paper

10001

To nest the orders step into the customers step we could run a subsequent step with:

...
steps {
  customers {
    ...
  }
  orders {
    ...
  }
  customers_nested {
    dependencies = [customers, orders]
    deriver {
      type = nest
      nest.from = orders
      nest.into = customers
      key.field.names = [customer_id]
      nested.field.name = customer_orders
    }
    ...
  }
  ...
 }
...

Which would produce the derived result:

customers_nested:

customer_id name customer_orders

10000

Jane

order_id product_name customer_id

1000

Envelopes

10000

1001

Stamps

10000

1002

Pens

10000

10001

Joe

order_id product_name customer_id

1003

Paper

10001

In Impala if was then written to a Parquet table the data could be queried with syntax like:

SELECT c.customer_name, COUNT(o.order_id) FROM customers_nested c, c.customer_orders o;

For more information on querying nested tables using Impala, see the complex types documentation.

Passthrough

The passthrough deriver simply unions all of its dependencies together. All of the dependencies must have the same schema.

Pivot

The pivot deriver is used to pivot (otherwise known as transpose) key-value-pair data where the derivation has a column per unique key. This can be useful when the source data model of the pipeline defines attributes for an entity via key-value-pairs but the transformed data model of the pipeline should have distinct columns per attribute for simpler and more efficient analytics querying.

To configure the deriver to pivot a previous step that contains attributes for an entity defines as key-value-pairs, specify:

  • The name of the previous step to pivot with step.name

  • The fields that define the entity key with entity.key.field.names

  • The field that defines the key of the key-value-pairs with pivot.key.field.name

  • The field that defines the value of the key-value-pairs with pivot.value.field.name

  • The method to use for retrieving the entity attributes with pivot.keys.source. For the deriver to dynamically find the distinct keys of the key-value-pairs use dynamic. To provide a static list of keys use static.

  • The static list of keys with pivot.keys.list, when using the static method for retrieving pivot keys.

Example

Consider the following simple example where we have a key-value-pairs step that captures the attributes of each customer with one record per attribute per customer, and we want to derive a pivoted (transposed) step that captures the same attributes of each customer but with one record per customer.

customers_kvp:

customer_id key value

10000

name

Jane

10000

state

NY

10000

balance

50000.0

10001

name

Joe

10001

state

CA

10001

balance

30000.0

To pivot the customers_kvp step we could run a subsequent step with:

...
steps {
  customers_kvp {
    ...
  }
  customers_pivoted {
    dependencies = [customers_kvp]
    deriver {
      type = pivot
      step.name = customers_kvp
      entity.key.field.names = [customer_id]
      pivot.key.field.name = key
      pivot.value.field.name = value
      pivot.keys.source = dynamic
    }
    ...
  }
  ...
 }
...

Which would produce the derived result:

customers_kvp:

customer_id name state balance

10000

Jane

NY

50000.0

10001

Joe

CA

30000.0

Exclude

The exclude deriver executes a LEFT ANTI JOIN on two designated dependencies on a set of common fields between the two. Commonly, this deriver is used for easy de-duplication within a pipeline.

The equivalent SQL statement would read:

SELECT Left.* FROM Left LEFT ANTI JOIN Right USING (field1, field2)

Select

The select deriver includes or excludes required set of columns in output from an input dependency. Deriver takes either a include-fields list of columns that needs to be included in output or a exclude-fields list of columns that needs to be removed from output. Both include-fields and exclude-fields list of column cannot be provided at the same time.

Data Quality

The dq deriver can be used to perform data quality checks on a dataset using a set of user-defined rules. Rules can be applied at two scopes: at dataset or row level. For dataset scope, the rules are evaluated against the dataset as a whole and the derived result is a dataset containing one row per rule indicating a pass or fail. The schema of the dataset is name: String, result: Boolean. For example, the result might be:

name result

namecheck

true

agerange

false

Row level scope takes the list of rules and applies them to every row for the defined input dependency. The results of the checks are appended to the rows as a field of type map<string, boolean> called results by default. The results would look something like:

name age results

Ian

null

{"namenotnull":true,"agerange":false}

Webster

21

{"namenotnull":true,"agerange":true}

Envelope has a number of built-in rules (see below) but allows for custom user-defined rules via fully-qualified class name. See the config guide for specific configuration parameters.

Row Scope Rules

The following row-level rules are provided:

  • checknulls - check for the null values in one or more fields in a row

  • enum - check one or more fields against a list of allowed values (non-floating point numerics and strings)

  • range - check one or more numeric fields is between lower and upper bounds (inclusive)

  • regex - check one or more string fields against an allowed pattern

Dataset Scope Rules

The following rules are defined at the dataset scope:

  • count - ensure the dataset has an expected count. The count may either statically defined or loaded as a dependency from another step. If the latter, the Dataset must contain a single row with a single field of type long.

  • checkschema - ensure the dataset matches the schema. Currently only supports primitive types.

In addition, any defined row-level rule can be applied at the dataset scope. In this case, the deriver simply logically ANDs the individual results from each row check into a single boolean result for the rule.

If specifying multiple dependencies, the user must specify to which dependency the dataset-level rules should be applied using the dataset configuration parameter.

If using multiple dataset level checks on the same dataset it is recommended to employ the cache hint on the dependency containing the data to be checked.

Example Configuration

An example configuration containing both dataset and row-level DQ derivers is as follows:

...

steps {
  dqparams {
    input {
      type = filesystem
      format = json
      path = "hdfs:///tmp/dqparams"
    }
  }

  mydata {
    input {
      type = filesystem
      format = json
      path = "hdfs:///tmp/data"
    }
  }

  checkmydata {
    dependencies = [mydata,dqparams]
    deriver {
      type = dq
      scope = dataset
      dataset = mydata
      rules {
        r1 {
          type = count
          expected.dependency = dqparams
        }
        r2 {
          type = checkschema
          schema {
            type = flat
            field.names = ["name", "address", "age"]
            field.types = ["string", "string", "int"]
          }
        }
        r3 {
          // row-level rule being run in dataset scope
          type = regex
          fields = ["name"]
          regex = "[a-zA-Z' ]{1,}"
        }
        r4 {
          // row-level rule beingf run in dataset scope
          type = enum
          fields = ["name"]
          values = ["Ian","Jeremy","Webster"]
          fieldtype = string
          case-sensitive = false
        }
      }
    }
  }

  checkrows {
    dependencies = [mydata]
    deriver {
      type = dq
      scope = row
      rules {
        r1 {
          type = checknulls
          fields = [ "name", "address", "age" ]
        }
        r2 {
          type = regex
          fields = ["name"]
          regex = "[a-zA-Z' ]{1,}"
        }
        r3 {
          type = range
          fields = ["age"]
          fieldtype = "int"
          range = [0,150]
          ignore-nulls = true
        }
      }
    }
  }
}
...

Developing Custom Rules

Users wishing to specify custom rules can extend either the RowRule or DatasetRule interface. Row level rules should implement a check(Row row) method returning a boolean result. Dataset scope rules should implement a check(Dataset<Row> dataset, Map<String, Dataset<Row>> stepDependencies) method which returns a Dataset with a row per rule with the schema name: String, result: Boolean. Row level rules are automatically wrapped in DatasetRowRuleWrapper when used in a dataset scope.

The custom rules may also implement the ProvidesAlias interface which allows an alias to be used instead of the fully-qualified class name in Envelope config files. The implementation must be placed in a META-INF/services/com.cloudera.labs.envelope.derive.dq.DatasetRule or META-INF/services/com.cloudera.labs.envelope.derive.dq.RowRule file on the classpath.

Distinct

The distinct deriver simply returns the distinct rows from its dependency. If there is more than one dependency then configuration parameter step must be used to identify which step should distinct operation be applied to. This parameter is optional when deriver has only one dependency.

Example Configuration

A step with multiple dependencies that deduplicates rows in mydata dataset:

  ...
  dedup {
    dependencies = [some_dependency, mydata, other_dependency, one_more_dependency]
    deriver {
      type = distinct
      step = mydata
    }
  }
  ...

In-List

The in-list deriver filters rows in a dataset by comparing one of the columns to a list of values. This list could be statically defined as a literal in the deriver’s configuration, or it could be dynamically generated from one of the steps in dependencies. In the latter case, the resulting list can be batched to manage its construction and filtering; the default batch size is 1000.

Example Configurations

Step that selects from the airports dataset only the rows that have airport_code for Newark (EWR) and Pittsburgh (PIT):

  ...
  literal_list {
    dependencies = [airports, other_dependencies, ...]
    deriver {
      type = in-list
      step = airports
      field = airport_code
      values.literal = [EWR, PIT]
    }
  }
  ...

Another example where in-list deriver filters data from the airports dataset, but this time the rows are selected by airport_code via look-up in another dataset:

  ...
  reference_list {
    dependencies = [airports, delays_top_ten, other_dependencies, ...]
    deriver {
      type = in-list
      step = airports
      field = airport_code
      values.reference.step = delays_top_ten
      values.reference.field = airport_fk
    }
  }
  ...
Caution
In-list filtering using a reference step can be very slow if there are many values in the reference step. A standard SQL join might be more efficient if filtering over more than 1000 values.

Hash

The hash deriver can be used to append a new column that is populated with the MD5 hash of all the pre-existing fields. This can be useful for providing a (extremely likely) unique value for a row for assisting with change data capture, and for the requirements of data vault modeling.

The MD5 hash is applied to the concatenation of all of the pre-existing values of a row. By default the concatenation will use an empty string delimiter and will replace nulls with the string '__NULL__'. The appended column will contain the 32 byte hexadecimal string representation of the MD5 hash.

The name of the dependency step to be hashed can be specified with the step configuration. If this configuration is not provided then the step must only have one dependency.

The delimiter and null replacement strings can be overridden using the delimiter and null-string configurations.

Hash example

For the step previous_step:

col1 col2 col3 col4 col5

hello

1

NULL

-1.0

true

And the default behavior of the deriver:

...
  hash_step {
    dependency = [previous_step]
    deriver {
      type = hash
    }
  }
...

This data will be generated by hash_step:

col1 col2 col3 col4 col5 hash

hello

1

NULL

-1.0

true

4891a9d87f8f46a5c8c19c3059864146

Latest

The latest deriver can be used to filter a dataset for only the latest record of each key. This can be useful if the dependency dataset has the history of versions of each key, but just the latest version of each key needs to be selected. The term 'latest' is defined by the record with the highest 'timestamp' value of the key, where that value can be of any data type.

Note
In general a valid dataset will not have more than one version of a key with the same timestamp. However, in the scenario where more than one version of a key has the same highest timestamp only one will be selected but it is not defined which of those it will be.

The dependency step to run the deriver on is specified by the step configuration. The list of key field names is specified by the key-fields configuration. The field name of the timestamp is specified by the timestamp-field configuration.

Latest example

For the step previous_step:

key value ts

1001

hello

2018-10-19 14:30:12

1001

world

2018-10-20 05:19:46

1002

beep

2018-10-19 14:30:12

1002

boop

2018-10-20 05:19:46

And this configuration of the deriver:

...
  latest_step {
    dependency = [previous_step]
    deriver {
      type = latest
      key-field-names = [key]
      timestamp-field = ts
    }
  }
...

This data will be generated by latest_step:

key value ts

1001

world

2018-10-20 05:19:46

1002

boop

2018-10-20 05:19:46

Translate

The translate deriver can be used to translate a single field value into multiple fields. This is useful when a field contains a serialized record and where the serialized record’s fields need to be individually accessed. When the deriver translates a field it removes the pre-translated field and appends the translated fields to the end of the row.

Rows that could not be translated (because the translator threw an exception) will be available as a separate step with the name of the step that the translate deriver is specified in, plus the string "_errored". e.g. Errored rows from a step named 'customers' will be found in the step 'customers_errored'.

The step configuration specifies the dependency step that will be used. The field configuration specifies the name of the field that will be translated. The translator configuration specifies the translator that will translate the field.

Example

For the step previous_step:

col1 col2 col3

1000

hello:world

false

And this configuration of the deriver:

...
  translate_step {
    dependency = [previous_step]
    deriver {
      type = translate
      field = col2
      translator {
        type = delimited
        delimiter = ":"
        schema {
          type = flat
          field.names = [col4, col5]
          field.types = [string, string]
        }
      }
    }
  }
...

This data will be generated by translate_step:

col1 col3 col4 col5

1000

false

hello

world

Spark ML

The sparkml deriver can be used to execute Spark ML pipeline models over a previous step. This can be useful for executing machine learning algorithms in an Envelope pipeline in a distributed fashion. Spark ML supports many common machine learning algorithms.

The pipeline model should be created in advance using Spark ML and saved using the PipelineModel#save method. This deriver can then reference the saved pipeline model via the model-path configuration.

The step configuration can be used to specify which step to execute the model over, however it is optional when the deriver step only contains one dependency.

For an example of creating a Spark ML pipeline model see the Spark ML documentation.

Parse JSON

The parse-json deriver can be used to parse JSON record strings into corresponding fields on the original record. The deriver uses Spark’s built-in JSON parsing functionality.

The step and field configurations are used to specify the dependency step and the field within that step that contains the JSON strings.

By default the deriver will place the top-level fields from the parsed JSON as fields on the original record, and remove the JSON string. If there is a field name conflict with existing fields the deriver can instead place the parsed fields within a struct field on the original record. This can be set by specifying the as-struct configuration to true, and the struct-field configuration to the name to be given to the struct field.

The schema of the JSON records must be specified using the schema configuration.

Spark’s JSON parser allows multiple options to change the default parsing behaviour. Refer to the Spark documentation for available options. These options can be set as configurations in the deriver with the option. configuration prefix. For example, the configuration option.mode = FAILFAST will set the mode option to FAILFAST.

Example

For the step previous_step:

col1 col2 col3

1000

{"col4": 1, "col5": "hello"}

false

And this configuration of the deriver:

...
  parse_json_step {
    dependency = [previous_step]
    deriver {
      type = parse-json
      step = previous_step
      field = col2
      schema {
        type = flat
        field.names = [col4, col5]
        field.types = [int, string]
      }
    }
  }
...

This data will be generated by parse_json_step:

col1 col3 col4 col5

1000

false

1

hello

Custom derivers

In cases that Envelope does not provide a deriver that meets the requirements for a particular derivation a custom deriver can be developed and provided instead.

Envelope is pluggable so that Envelope itself does not need to be modified. Instead a separate jar that only contains the deriver(s) for the pipeline can be created.

To create a new deriver, first start a new Java or Scala project that has a dependency on the Envelope version you are using. You do not need to include Envelope in the packaged jar.

For example, if you are using Maven:

<dependency>
  <groupId>com.cloudera.labs.envelope</groupId>
  <artifactId>envelope-core</artifactId>
  <version>**Envelope version being used here**</version>
  <scope>provided</scope>
</dependency>

With the configured project you can develop the deriver by adding a class that implements the Deriver interface.

The two methods in the interface are:

  • configure to receive the configurations of the deriver section of the step. This can be used to retrieve any custom configurations required by the deriver.

  • derive to run a derivation. The dependencies argument provides the name and Spark DataFrame for each of the dependencies of the step that contains the deriver. The return value is the DataFrame that represents the derivation. Access to the SparkSession object is available from the static method Contexts#getSparkSession.

To reference the deriver in your pipeline simply use the deriver’s fully qualified class name (or alias—​see below) as the deriver type. For example:

...
deriver {
   type = com.yourcompany.envelope.deriver.CustomDeriver
   customproperty1 = ...
   ...
}
...

Using Aliases

To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.deriver.Deriver file on the class path - the usual method is to package the file with your JAR.

Adding at runtime

With the project compiled into a jar file the deriver can be submitted as part of the Envelope pipeline similarly to:

spark-submit --jars customderiver.jar envelope-*.jar pipeline.conf
Note
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.

The jar file can contain multiple derivers, and other pluggable classes such as custom inputs, outputs, etc.

When developing a custom deriver keep in mind:

  • Derivers are only for deriving new data, and should not lead to side effects outside of the deriver, such as writing to an output or changing external metadata.

  • Derivers are often highly reusable, so avoid hard-coding values or field names into the deriver and have them be given at runtime through configuration instead.

  • Derivers are usually most efficient when they operate only on the Dataset/DataFrame API. If possible avoid converting to the RDD API and then back again.

  • You can look at the code of the provided derivers for hints as to how structure your own deriver.

  • There are utility classes in the .utils package that may already provide some of the functionality you need to put together your derivation logic.