Skip to content

Latest commit

 

History

History
1822 lines (1197 loc) · 60.2 KB

configurations.adoc

File metadata and controls

1822 lines (1197 loc) · 60.2 KB

Configuration specification

This page specifies the configurations available in Envelope.

Example configuration

As illustration, a typical Envelope batch application that reads HDFS JSON files, extracts a subset of data, and writes the results to S3 in Parquet might have the following configuration.

application {
  name = Envelope configuration example
  executor.instances = 3
  executor.memory = 4G
}
steps {
  exampleInput {
    input {
      type = filesystem
      path = "hdfs://..."
      format = json
    }
  }
  exampleStep {
    dependencies = [exampleInput]
    deriver {
      type = sql
      query.literal = "SELECT MY_UPPER(foo) AS foo FROM exampleInput WHERE MY_LOWER(bar) = 'blag'"
    }
    planner {
      type = append
    }
    output {
      type = filesystem
      path = "s3a://..."
      format = parquet
    }
  }
}
udfs : [
  {
    name = my_upper
    class = com...
  },
  {
    name = my_lower
    class = com...
  }
]

Application

Application-level configurations have the application. prefix.

Configuration suffix Description

name

The application name in YARN.

executor.instances

The number of executors to be requested for the application. If not specified then Spark dynamic allocation will be used.

executor.initial.instances

The initial number of executors to be requested for the application when using Spark dynamic allocation. This can help more quickly warm up the job’s resources without using a static allocation.

executor.cores

The number of cores per executor. Default is 1.

executor.memory

The amount of memory per executor. Default is 1G.

batch.milliseconds

The length of the micro-batch in milliseconds. Default is 1000. Ignored if the application does not have a streaming input.

pipeline.threads

The number of threads that Envelope will use to run pipeline steps. This is effectively a limit on the number of outputs that can be writing at once. Default is 20.

spark.conf.*

Used to pass configurations directly to Spark. The spark.conf. prefix is removed and the configuration is set in the SparkConf object used to create the Spark context.

hive.enabled

Enables Hive support. Default is true. Must be enabled before reading and writing data stored in Apache Hive. Setting the value to false when Hive integration is not required avoids the associated overhead.

configuration.validation.enabled

Enables upfront validation of the provided Envelope configuration. Default is true.

driver.memory

The amount of memory allocated for a Spark driver. Please note that this configuration is only applicable when application is deployed in cluster mode, and will cause an exception if deployment mode is client. To set driver memory for the applications running in client mode, use Spark’s command line argument --driver-memory.

security.check-interval

How often the security token manager in the driver checks if tokens need refreshing. Default is "60s". Accepts any Typesafe duration string. Recommended to leave at default.

security.renew-factor

At what proportion of a token’s lifetime to request a new token. Defaults to 0.8. Recommended to leave at default.

config-loader

The config loader object that will provide configurations to be merged into the base configuration at the start of the pipeline and for every micro-batch. Note this is typically only required when streaming pipelines require dynamic refreshing of configurations.

Steps

Step configurations have the steps.[stepname]. prefix. All steps can have the below configurations.

Configuration suffix Description

type

The step type. Envelope supports data, loop, decision, task. Default data.

dependencies

The list of step names that Envelope will submit before submitting this step.

Data steps

Data steps can, additionally to the step configurations, have the below configurations.

Configuration suffix Description

cache.enabled

If true then Envelope will cache the step’s DataFrame at the storage level specified by cache.storage.level. Default false.

cache.storage.level

If specified then Envelope will change the step’s DataFrame cache storage levels to value specified. Available storage levels are DISK_ONLY , DISK_ONLY_2 , MEMORY_ONLY , MEMORY_ONLY_2 , MEMORY_ONLY_SER , MEMORY_ONLY_SER_2 , MEMORY_AND_DISK , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER , MEMORY_AND_DISK_SER_2 and OFF_HEAP. Default MEMORY_ONLY.

hint.small

If true then Envelope will mark the step’s DataFrame as small enough to be used in broadcast joins. Default false.

print.schema.enabled

If true then Envelope will print the step’s DataFrame’s schema to the driver logs. This can be useful for debugging the schema of intermediate data. Default false.

print.data.enabled

If true then Envelope will print the step’s DataFrame’s data to the driver logs. This can be useful for debugging intermediate results. Default false.

print.data.limit

The maximum number of records to print when print.data.enabled is true. This can be useful for avoiding overloading the driver logs with too many printed records. Default unlimited.

repartition.partitions

The number of DataFrame partitions to repartition the step data by. In Spark this will run DataFrame#repartition.

repartition.columns

A list of DataFrame columns to repartition the step data by. In Spark this will run DataFrame#repartition. Per standard Spark convention, this function will repartition to the number of partitions defined by the Spark SQL configuration spark.sql.shuffle.partitions yet can be combined with the configuration repartition.partitions to change this default. The list values must identify a DataFrame column name only; no expressions are evaluated.

coalesce.partitions

The number of DataFrame partitions to coalesce the step data by. In Spark this will run DataFrame#coalesce.

Loop steps

Loop steps can, additionally to the step configurations, have the below configurations. For more information on loop steps see the looping guide.

Configuration suffix Description

mode

The mode for Envelope to run the iterations of the loop in. If parallel then Envelope will run all iterations of the loop in parallel. If serial then Envelope will run each iteration of the loop in serial order. Note that the order of the step source may not be guaranteed.

parameter

The parameter that Envelope will replace in strings in the configuration of the steps that are dependent on the loop step. For a parameter value iteration_value Envelope will replace the text ${iteration_value} with the iteration value. If no parameter is given then Envelope will not perform parameter replacement.

source

The source of the iteration values for the loop. Envelope supports range, list, and step. range loops over an inclusive range of integers. list loops over an ordered list of values. step loops over values retrieved from the DataFrame of a previous data step.

range.start

If using the range source, the first integer of the range to loop over.

range.end

If using the range source, the last integer of the range to loop over.

list

If using the list source, the list of values to loop over.

step

If using the step source, the name of the previous data step to retrieve the values from. The previous data step must contain only one field, and must not contain more than 1000 values.

Decision steps

Decision steps can, additionally to the step configurations, have the below configurations. For more information on decision steps see the decisions guide.

Configuration suffix Description

if-true-steps

Required. The list of dependent step names that will be kept if the decision result is true. The steps listed must directly depend on the decision step. The remaining directly dependent steps of the decision step will be kept if the decision result is false. Any steps subsequently dependent on the removed steps will also be removed.

method

Required. The method by which the decision step will make the decision. Envelope supports literal, step_by_key, step_by_value.

result

Required if method is literal. The true or false result for the decision.

step

Required if method is step_by_key or step_by_value. The name of the previous step from which to extract the decision result.

key

Required if method is step_by_key. The specific key of the previous step to look up the boolean result by.

Task steps

Task steps can, additionally to the step configurations, have the below configurations. For more information on task steps see the tasks guide.

Configuration suffix Description

class

Required. The alias or fully qualified class name of the Task implementation.

Inputs

Input configurations belong to data steps, and have the steps.[stepname].input. prefix. For more information on inputs see the inputs guide.

Configuration suffix Description

type

The input type to be used. Envelope provides filesystem, hive, jdbc, kafka, kudu. To use a custom input, specify the fully qualified name or alias of the Input implementation class.

Filesystem

Input type = filesystem.

Configuration suffix Description

path

The Hadoop filesystem path to read as the input. Typically a Cloudera EDH will point to HDFS by default. Use s3a:// for Amazon S3.

format

The file format of the files of the input directory. Envelope supports formats parquet, json, csv, input-format, text.

schema

Optional. Applies to csv and json input formats. Refer to the Schema documentation. If a schema is not provided for csv/json input, Spark will infer the schema as described in the org.apache.spark.sql.DataFrameReader documentation (unless infer-schema = false)

separator

(csv) Spark option sep; sets the single character as a separator for each field and value. (default ,)

encoding

(csv) Spark option encoding; decodes the CSV files by the given encoding type. (default UTF-8)

quote

(csv) Spark option quote; sets the single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. (default ")

escape

(csv) Spark option escape; sets the single character used for escaping quotes inside an already quoted value. (default \)

comment

(csv) Spark option comment; sets the single character used for skipping lines beginning with this character. By default, it is disabled. (default empty string)

header

(csv) Spark option header; uses the first line as names of columns. (default false)

infer-schema

(csv) Spark option inferSchema; infers the input schema automatically from data. It requires one extra pass over the data. (default false)

ignore-leading-ws

(csv) Spark option ignoreLeadingWhiteSpace; defines whether or not leading whitespaces from values being read should be skipped. (default false)

ignore-trailing-ws

(csv) Spark option ignoreTrailingWhiteSpace; defines whether or not trailing whitespaces from values being read should be skipped. (default false)

null-value

(csv) Spark option nullValue; sets the string representation of a null value. This applies to all supported types including the string type. (default empty string)

nan-value

(csv) Spark option nanValue; sets the string representation of a "non-number" value. (default NaN)

positive-infinity

(csv) Spark option positiveInf; sets the string representation of a positive infinity value. (default Inf)

negative-infinity

(csv) Spark option negativeInf; sets the string representation of a negative infinity value. (default -Inf)

date-format

(csv) Spark option dateFormat; sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. (default yyyy-MM-dd)

timestamp-format

(csv) Spark option timestampFormat; sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. (default yyyy-MM-dd’T’HH:mm:ss.SSSZZ)

max-columns

(csv) Spark option maxColumns; defines a hard limit of how many columns a record can have. (default 20480)

max-chars-per-column

(csv) Spark option maxCharsPerColumn; defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length. (default -1)

max-malformed-logged

(csv) Spark option maxMalformedLogPerPartition; sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored. (default 10)

mode

(csv) Spark option mode; allows a mode for dealing with corrupt records during parsing.

PERMISSIVE: sets other fields to null when it meets a corrupted record. When a schema is set by user, it sets null for extra fields.

DROPMALFORMED: ignores the whole corrupted records.

FAILFAST: throws an exception when it meets corrupted records.

(default PERMISSIVE)

format-class

(input-format) The org.apache.hadoop.mapreduce.InputFormat canonical class name.

translator

(input-format, text) The Translator class to use to convert the InputFormat’s Key/Value pairs into Dataset Rows. See Translators for details. This is optional for text, and if it is omitted then the input will read the whole lines into a single string field named value.

Hive

Input type = hive.

Configuration suffix Description

table

The Hive metastore table name (including database prefix, if required) to read as the input.

JDBC

Input type = jdbc.

Configuration suffix Description

url

The JDBC URL for the remote database.

tablename

The name of the table of the remote database to be read as the input.

username

The username to use to connect to the remote database.

password

The password to use to connect to the remote database.

Kafka

Input type = kafka.

Configuration suffix Description

brokers

The hosts and ports of the brokers of the Kafka cluster, in the form host1:port1,host2:port2,…​,hostn:portn.

topics

The list of Kafka topics to be consumed.

group.id

The Kafka consumer group ID for the input. When offset management is enabled use a unique group ID for each pipeline so that Envelope can track one execution of the pipeline to the next. If not provided Envelope will use a random UUID for each pipeline execution.

window.enabled

If true then Envelope will enable Spark Streaming windowing on the input. Ignored if the step does not contain a streaming input. Default false.

window.milliseconds

The duration in milliseconds of the Spark Streaming window for the input.

window.slide.milliseconds

The interval in milliseconds at which the Spark Streaming window operation is performed if using sliding windows.

offsets.manage

If true, Envelope will manage the Kafka offsets that have been processed so that application restarts will continue where in the topic that they left off. Default true. Unless offset.output is set, Kafka’s internal offset commit API will be used.

offsets.output

If offsets.manage is true then this output specification can be used to define external alternatives (rather than Kafka’s internal offset commit API) for where Envelope will store and retrieve the latest offsets that have been successfully processed. The output must be support random upsert mutations (e.g. Kudu, HBase).

parameter.*

Used to pass configurations directly to Kafka. The parameter. prefix is removed and the configuration is set in the Kafka parameters map object used to create the Kafka direct stream.

Kudu

Input type = kudu.

Configuration suffix Description

connection

The hosts and ports of the masters of the Kudu cluster, in the form "host1:port1,host2:port2,…​,hostn:portn".

table.name

The name of the Kudu table to be read as the input.

Translators

Translator configurations belong to data steps, and have the steps.[stepname].input.translator. prefix. For more information on translators, see the Translators section of the Inputs Guide.

Configuration suffix Description

type

The translator type to be used. Envelope provides avro, delimited, kvp, morphline, protobuf, raw. To use a custom translator, specify the fully qualified name or alias of the Translator implementation class.

append.raw.enabled

If true then the translator will append the raw fields to the translated row. Each appended field name will be prefixed with an underscore, e.g. _value. Default false.

Avro

Translator type = avro.

Configuration suffix Description

schema

The schema to translate to. Refer to the Schema documentation

Delimited

Translator type = delimited.

Configuration suffix Description

delimiter

The delimiter that separates the fields of the message.

delimiter-regex

If true, the delimiter string will be interpreted as a regular expression. Default false (interpret as a literal value).

schema

The schema to translate to. Refer to the Schema documentation

timestamp.formats

Optional list of timestamp format patterns. For timestamp field type, one or more patterns may be supplied in Joda timestamp format. If this configuration is supplied, timestamp format must confirm to one of these pattens to be considered validity. For performance sensitive processing, list patterns in order of probability of occurrence. If this configuration is not supplied, timestamp data must confirm to ISO 8601 date, time or datetime format.

KVP

Translator type = kvp.

Configuration suffix Description

delimiter.kvp

The delimiter that separates the key-value pairs of the message.

delimiter.field

The delimiter that separates the the key and value of each key-value pair.

schema

The schema to translate to. Refer to the Schema documentation

timestamp.formats

Optional list of timestamp format patterns. For timestamp field type, one or more patterns may be supplied in Joda timestamp format. If this configuration is supplied, timestamp format must confirm to one of these pattens to be considered validity. For performance sensitive processing, list patterns in order of probability of occurrence. If this configuration is not supplied, timestamp data must confirm to ISO 8601 date, time or datetime format.

Morphline

Translator type = morphline.

Configuration suffix Description

encoding.key

The character set of the incoming key and is stored in the Record field, _attachment_key_charset. This must match the encoding of the Envelope input. The key value is stored in the field, _attachment_key.

encoding.message

The character set of the incoming message and is stored in the Record field, _attachment_charset. This must match the encoding of the Envelope input. The message value is stored in the field, _attachment.

morphline.file

The filename of the Morphline configuration found in the local directory of the executor. See the --files option for spark-submit.

morphline.id

The optional identifier of the Morphline pipeline within the configuration file.

schema

The schema to translate to. Refer to the Schema documentation

error.on.empty

If true then all input rows must map to an output row, otherwise an error will be thrown. Default true.

Protobuf

Translator type = protobuf.

Configuration suffix Description

schema

The schema to translate to. Refer to the Schema documentation (Currently only the Protobuf schema type is supported for the Protobuf translator, schema.type = protobuf)

Raw

Translator type = raw.

This translator has no additional configurations.

Derivers

Deriver configurations belong to data steps, and have the steps.[stepname].deriver. prefix. For more information on derivers see the derivers guide.

Configuration suffix Description

type

The deriver type to be used. Envelope provides morphline, nest, passthrough, sql, pivot, exclude, distinct, select, in-list, hash, latest, dq, translate, sparkml, parse-json. To use a custom deriver, specify the fully qualified name or alias of the Deriver implementation class.

Morphline

Deriver type = morphline.

Configuration suffix Description

step.name

The name of the dependency step whose records will be run through the Morphline pipeline.

morphline.file

The filename of the Morphline configuration found in the local directory of the executor. See the --files option for spark-submit.

morphline.id

The optional identifier of the Morphline pipeline within the configuration file.

schema

The schema definition. Refer to the Schema documentation

Nest

Deriver type = nest.

Configuration suffix Description

nest.into

The name of the step whose records will be appended with the nesting of nest.from. Must be a dependency of the encapsulating step.

nest.from

The name of the step whose records will be nested into nest.into. Must be a dependency of the encapsulating step.

key.field.names

The list of field names that make up the common key of the two steps. This key will be used to determine which nest.from records will be nested into each nest.into record. There should only be one record in nest.into for each unique key of nest.from.

nested.field.name

The name to be given to the appended field that contains the nested records.

Passthrough

Deriver type = passthrough.

This deriver has no custom configurations.

SQL

Deriver type = sql.

Configuration suffix Description

query.literal

The literal query to be submitted to Spark SQL. Previously submitted steps can be referenced as tables by their step name.

query.file

The path to the file containing the query to be submitted to Spark SQL.

parameter.parameter_name (or any parameter.*)

All references to '${parameter_name}' within the query string will be replaced with the value of this configuration. For more information see the derivers guide.

Pivot

Deriver type = pivot.

Configuration suffix Description

step.name

The name of the dependency step that will be pivoted.

entity.key.field.names

The list of field names that represents the entity key to group on. The derived DataFrame will contain one record per distinct entity key.

pivot.key.field.name

The field name of the key to pivot on. It is expected that there will only be one of each pivot key per entity key. The derived DataFrame will contain one additional column per distinct pivot key.

pivot.value.field.name

The field name of the value to be pivoted.

pivot.keys.source

The source of the keys to pivot into additional columns. If static then pivot.keys.list provides the list of keys. If dynamic then the list of keys is determined dynamically from the step, at the cost of additional computation time. Default is dynamic.

pivot.keys.list

The list of keys to pivot into additional columns. Only used if pivot.keys.source is set to static.

Exclude

Deriver type = exclude.

Configuration suffix Description

compare

The name of the dataset whose records will be compared and if matched, then excluded from the output of the current step.

with

The name of the dataset whose records will supply the matching patterns for the comparison. The records are not modified; this step only queries the dataset.

field.names

The name of the fields used to match between the two datasets. The field names must be identical in name and type. A row is excluded if all of the fields are equal between the datasets.

Select

Deriver type = select.

Configuration suffix Description

step

The name of the dependency step from which to select columns as output of the current step.

include-fields

List of column names that are required in output for the current step. If input dataset schema doesn’t contain column name(s) then deriver will generate a runtime error.

exclude-fields

List of column names that are not required in output for the current step. If input dataset schema doesn’t contain column name(s) then deriver will generate a runtime error. Both include-fields and exclude-fields cannot be provided at same time.

Data quality

Deriver type = dq.

Configuration suffix Description

scope

Required. The scope at which to apply the DQ deriver. dataset or row.

rules

Required. A nested object of rules. Each defined object should contain a field type, which defines the type of the DQ rule, either a built-in or a fully-qualified classname. Type specific configs are listed below.

checknulls

fields

Required. The list of fields to check. The contents should be a list of strings.

enum

fields

Required. String list of field names.

fieldtype

Optional. Type of the field to check for defined values: must be string, long, int, or decimal. Defaults to string.

values

Required. List of values. For strings and decimals define the values using string literals. For integral types use number literals.

case-sensitive

Optional. For string values, whether the value matches should be case-sensitive. Defaults to true.

range

fields

Required. List of field names on which to apply the range checks.

fieldtype

Optional. The field type to use when doing range checks. Range values will be interpreted as this type. Must be numeric: allowed values are int, long, double, float, decimal. Take care when using floating point values as exact boundary matches may not behave as expected - use decimal if exact boundaries are required. Defaults to long.

range

Required. Two element list of numeric literals, e.g. [1,10] or [1.5,10.45]. Both boundaries are inclusive.

ignore-nulls

Optional. If true then range check will pass for a null value, or if false will fail. Defaults to false.

regex

fields

Required. String list of field names, which should all have type string.

regex

Required. Regular expression with which to match field values. Note that extra escape parameters are not required. For example to match any number up to 999 you could use: \d{1,3}.

count

expected.literal

Either this or expected.dependency required. A long literal with the expected number of rows in the dataset.

expected.dependency

Either this or expected.literal required. A string indicating the dependency in which the expected count is defined. It must be a dataframe with a single field of type long.

checkschema

schema

The schema definition. Refer to the Schema documentation

exactmatch

Optional. Whether the schema of the Rows must exactly match the specified schema. If false the actual row can contain other fields not specified in the fields configuration. Those that are specified must match both name and type. Defaults to false.

Distinct

Deriver type = distinct.

Configuration suffix Description

step

The name of the dataset whose records will be deduplicated. Only required if there is more than one dependency, otherwise optional.

In-List

Deriver type = in-list.

Configuration suffix Description

step

The name of the dataset whose records will be filtered based on the supplied list of values. Only required if there is more than one dependency, otherwise optional. If provided, the dataset must be present in the list of dependencies.

field

The name of the field in dataset’s schema whose values will be compared with the supplied list of values. Only required if dataset schema contains more than one field, otherwise optional.

values.literal

A list of values that will be used as a filter against designated field’s content. Required unless the list is going to be derived via reference (see below).

values.reference.step

Step whose records will be used to generate a set of values to filter records against. Can only be specified when literal list (values) is not provided. If specified, the name must be present in the dependencies.

values.reference.field

The name of the field in values.reference.step schema whose values will be used to populate the filter. Only required if values.reference.step is specified, and its schema has more than one field.

values.reference.batch-size

The size of the filter batch when generating the values of the IN list. Defaults to 1000.

Hash

Deriver type = hash.

Configuration suffix Description

step

The name of the dataset whose records will be hashed. Only required if there is more than one dependency, otherwise optional.

hash-field

The name of the field that will be added with the hash string. Default 'hash'.

delimiter

The delimiter that the deriver will use to concatenate the field values of a row. Default empty string.

null-string

The string that the deriver will use in place of NULLs when concatenating the field values of a row. Default '__NULL__'.

include-fields

The list of field names that will contribute to the hash. Default is all fields are included. Can not be used with exclude-fields.

exclude-fields

The list of field names that will not contribute to the hash. Default is no fields are excluded. Can not be used with include-fields.

Latest

Deriver type = latest.

Configuration suffix Description

step

The name of the dataset whose records will be filtered. Only required if there is more than one dependency, otherwise optional.

key-fields

The list of field names that make up the key of the dataset. The result of this deriver will be exactly one record per unique key in the dependency step dataset.

timestamp-field

The name of the field used to order the records for an individual key. Only the record with the highest value of this field for a key will be included in the deriver result.

Translate

Deriver type = translate.

Configuration suffix Description

step

The name of the dataset that contains the field to be translated.

field

The name of the field to be translated.

translator

The configuration object for the translator that will translate the field. See the derivers guide for more information on this syntax.

Spark ML

Deriver type = sparkml.

Configuration suffix Description

step

The name of the dataset that the pipeline model will be executed over. Only required if there is more than one dependency, otherwise optional.

model-path

The path to the pipeline model directory that was created by the Spark ML pipeline model save.

Parse JSON

Derive type = parse-json.

Configuration suffix Description

step

The name of the dependency step that contains the field that contains the JSON strings.

field

The name of the field that contains the JSON strings.

as-struct

Whether to place the parsed fields within their own struct field, instead of directly on the record. Default false.

struct-field

If as-struct is true, then the name of the struct field to place the parsed fields within.

schema

The schema of the JSON records. Refer to the Schema documentation

option.*

Passes through Spark-specific JSON options to Spark. For example, option.mode = FAILFAST will set the option mode to FAILFAST. Refer to the Spark JSON documentation for available options.

Partitioners

Partitioner configurations belong to data steps, and have the steps.[stepname].partitioner. prefix.

Configuration suffix Description

type

The partitioner type to be used. Envelope provides range, hash, uuid. To use a custom partitioner, specify the fully qualified name or alias of the ConfigurablePartitioner implementation class. If no partitioner type is specified, Envelope will use the range partitioner.

Planners

Planner configurations belong to data steps, and have the steps.[stepname].planner. prefix. For more information on planners see the planners guide.

Configuration suffix Description

type

The planner type to be used. Envelope provides append, bitemporal, delete, eventtimeupsert, history, overwrite, upsert. To use a custom planner, specify the fully qualified name or alias of the Planner implementation class.

Append

Planner type = append.

Configuration suffix Description

fields.key

The list of field names that make up the natural key of the record. Only required if uuid.key.enabled is true.

field.last.updated

The field name for the last updated attribute. If specified then Envelope will add this field and populate it with the system timestamp string.

uuid.key.enabled

If true then Envelope will overwrite the first key field with a UUID string.

Bitemporal

Planner type = bitemporal.

Configuration suffix Description

fields.key

The list of field names that make up the natural key of the record.

fields.values

The list of field names that are used to determine if an arriving record is different to an existing record.

fields.timestamp

The list of field names of the event time of the record.

fields.event.time.effective.from

The list of field names of the event-time effective-from timestamp attribute on the output.

fields.event.time.effective.to

The list of field names of the event-time effective-to timestamp attribute on the output.

fields.system.time.effective.from

The list of field names of the system-time effective-from timestamp attribute on the output.

fields.system.time.effective.to

The list of field names of the system-time effective-to timestamp attribute on the output.

field.surrogate.key

The field name of the surrogate key string attribute on the output. If this configuration is set the planner will populate the field with a UUID string for new records.

field.current.flag

The field name of the current flag attribute on the output.

current.flag.value.yes

The flag indicating current record. Overrides the default value (Y).

current.flag.value.no

The flag indicating non-current record. Overrides the default value (N).

carry.forward.when.null

If true then Envelope will overwrite null values of the arriving record with the corresponding values of the most recent existing record for the same key.

time.model.event

The time model for interpreting the event time of the arriving and existing records, and for generating the event time effective from/to values.

time.model.system

The time model for interpreting the system time of the existing records, and for generating the system time effective from/to values.

Event time upsert

Planner type = eventtimeupsert.

Configuration suffix Description

fields.key

The list of field names that make up the natural key of the record.

field.last.updated

The field name for the last updated attribute. If specified then Envelope will add this field and populate it with the system timestamp.

fields.timestamp

The list of field names of the event time of the record.

fields.values

The list of field names that are used to determine if an arriving record is different to an existing record.

field.surrogate.key

The field name of the surrogate key string attribute on the output. If this configuration is set the planner will populate the field with a UUID string for new records.

time.model.event

The time model for interpreting the event time of the arriving and existing records.

time.model.last.updated

The time model for generating the last updated values.

History

Planner type = history.

Configuration suffix Description

fields.key

The list of field names that make up the natural key of the record.

fields.values

The list of field names that are used to determine if an arriving record is different to an existing record.

fields.timestamp

The list of field names of the event time of the record.

fields.effective.from

The list of field names of the event-time effective-from timestamp attribute on the output.

fields.effective.to

The list of field names of the event-time effective-to timestamp attribute on the output.

field.current.flag

The field name of the current flag attribute on the output.

current.flag.value.yes

The flag indicating current record. Overrides the default value (Y).

current.flag.value.no

The flag indicating non-current record. Overrides the default value (N).

fields.last.updated

The list of field names for the last updated attribute. If specified then Envelope will add this field and populate it with the system timestamp.

field.surrogate.key

The field name of the surrogate key string attribute on the output. If this configuration is set the planner will populate the field with a UUID string for new records.

carry.forward.when.null

If true then Envelope will overwrite null values of the arriving record with the corresponding values of the most recent existing record for the same key.

time.model.event

The time model for interpreting the event time of the arriving and existing records, and for generating the effective from/to values.

time.model.last.updated

The time model for generating the last updated values.

Overwrite

Planner type = overwrite.

This deriver has no custom configurations.

Delete

Planner type = delete.

This deriver has no custom configurations.

Upsert

Planner type = upsert.

Configuration suffix Description

field.last.updated

The field name for the last updated attribute. If specified then Envelope will add this field and populate it with the system timestamp string.

Time models

Time model configurations belong to planners, and have the steps.[stepname].planner.time.model.[timename] prefix. For more information on time models see the planners guide.

Configuration suffix Description

type

The time model type to be used. Envelope provides longmillis, nanoswithseqnum, stringdate, stringdatetime, timestamp. To use a custom output, specify the fully qualified name or alias of the TimeModel implementation class.

Long milliseconds

Time model type = longmillis.

This time model has no custom configurations.

Nanoseconds with sequence number

Time model type = nanoswithseqnum.

This time model has no custom configurations.

String date

Time model type = stringdate.

Configuration suffix Description

format

The Java SimpleDateFormat format of the date values. Default "yyyy-MM-dd".

String date-time

Time model type = stringdatetime.

Configuration suffix Description

format

The Java SimpleDateFormat format of the date-time values. Default "yyyy-MM-dd HH:mm:ss.SSS".

Timestamp

Time model type = timestamp.

This time model has no custom configurations.

Outputs

Output configurations belong to data steps, and have the steps.[stepname].output. prefix.

Configuration suffix Description

type

The output type to be used. Envelope provides filesystem, hive, jdbc, kafka, kudu, log, hbase, zookeeper. To use a custom output, specify the fully qualified name or alias of the Output implementation class.

Filesystem

Output type = filesystem.

Configuration suffix Description

path

The Hadoop filesystem path to write as the output. Typically a Cloudera EDH will point to HDFS by default. Use s3a:// for Amazon S3.

format

The file format for the files of the output directory. Envelope supports formats parquet, csv and json.

partition.by

The list of columns to partition the write output. Optional.

separator

(csv) Spark option sep; sets the single character as a separator for each field and value. (default ,)

quote

(csv) Spark option quote; sets the single character used for escaping quoted values where the separator can be part of the value. (default ")

escape

(csv) Spark option escape; sets the single character used for escaping quotes inside an already quoted value. (default \)

escape-quotes

(csv) Spark option escapeQuotes; a flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character. (default true)

quote-all

(csv) Spark option quoteAll; a flag indicating whether all values should always be enclosed in quotes. Default is to only escape values containing a quote character. (default false)

header

(csv) Spark option header; writes the names of columns as the first line. (default false)

null-value

(csv) Spark option nullValue; sets the string representation of a null value. (default empty string)

compression

(csv) Spark option compression; compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy, and deflate). (default null)

date-format

(csv) Spark option dateFormat; sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. (default yyyy-MM-dd)

timestamp-format

(csv) Spark option timestampFormat; sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. (default yyyy-MM-dd’T’HH:mm:ss.SSSZZ)

Hive

Output type = hive.

Configuration suffix Description

table

The name of the Hive table targeted for write. The name can include the database prefix, e.g. example.SampleTableName. If the table does not exist, Envelope will create a Parquet-formatted table. If the table has been created outside of Envelope, the format is determined and managed by Hive itself, i.e. any Hive SerDe.

location

Optional. The HDFS location for the underlying files of a table. Typically only defined during table creation, during which the table is created as EXTERNAL, otherwise the table is created in the default Hive warehouse and set to MANAGED.

partition.by

Optional. The list of Hive table partition names to dynamically partition the write by.

align.columns

If true then Envelope will attempt to align the output schema by matching (case-insensitive, unless spark.sql.caseSensitive is set) the step’s column names with those of the target Hive table. Step columns without a match in the target table will not be included in the aligned output, and similarly, target Hive table columns not available in the step schema will be NULL.

options

Used to pass additional configuration parameters. The parameters are set as a Map object and passed directly to the Spark DataFrameWriter.

JDBC

Output type = jdbc.

Configuration suffix Description

url

The JDBC URL for the remote database.

tablename

The name of the table of the remote database to write as the output.

username

The username to use to connect to the remote database.

password

The password to use to connect to the remote database.

Kafka

Output type = kafka.

Configuration suffix Description

brokers

Required. The hosts and ports of the brokers of the Kafka cluster, in the form host1:port1,host2:port2,…​,hostn:portn.

topic

Required. The Kafka topic to write to.

serializer.type

Required. The type of serialization to use for writing the row in to the topic. Valid types are delimited and avro.

serializer.field.delimiter

Required if serializer.type is delimited. The delimiter string to separate the field values with.

serializer.use.for.null

Used if serializer.type is delimited. The string to use if a field value is null. Defaults to the empty string.

serializer.schema.path

Required if serializer.type is avro. The path to the Avro schema file for serializing the rows, e.g. hdfs:/your/path/to/schema.avsc.

parameter.*

Used to pass configurations directly to the Kafka client. The parameter. prefix is removed and the configuration is set in the Kafka parameters map object used to create the KafkaProducer.

Kudu

Output type = kudu.

Configuration suffix Description

connection

The hosts and ports of the masters of the Kudu cluster, in the form "host1:port1,host2:port2,…​,hostn:portn".

table.name

The name of the Kudu table to write to.

insert.ignore

Ignore duplicate rows in Kudu (default: true)

ignore.missing.columns

Ignore writing columns that do not exist in the Kudu schema (default: false)

secure

Is the target Kudu cluster secured by Kerberos? This must be set to true for automatic token management to take effect, which is a requirement for random output planners. (default: true if hadoop.security.authentication is kerberos, false otherwise).

Log

Output type = log.

Configuration suffix Description

delimiter

The delimiter string to separate the field values with. Default is ,.

level

The log4j level for the written logs. Default is INFO.

HBase

Output type = hbase.

Configuration suffix Description

table.name

Required. The table for the output, specified in the format [namespace:]name, e.g. envelopetest:test.

zookeeper

Optional. In non-secure setups it is not a strict requirement to supply an hbase-site.xml file on the classpath, so the ZooKeeper quorum can be specified with this property with the usual HBase configuration syntax. Note that this will supersede any quorum specified in any hbase-site.xml file on the classpath.

hbase.conf.*

Optional. Pass-through options to set on the HBase connection. The hbase.conf prefix will be stripped. For example:

…​. hbase { conf { hbase.client.retries.number = 5 hbase.client.operation.timeout = 30000 } } …​.

Note that non-String parameters are automatically cast to Strings, but the underlying HBase code will do any required conversions from String.

mapping.serde

Optional. The fully qualified class name of the implementation to use when converting Spark Row objects into HBase Put s and Get s and converting HBase Result s into Row s. Defaults to default, which is maps to com.cloudera.labs.envelope.utils.hbase.HBase.DefaultMappingSerde. The default serde configuration syntax adheres as closely as possible to that of the Spark-HBase DataSource at the expense of some additional functionality - this is with a view to moving to the HBaseRelation at some point in the future.

mapping.rowkey.columns

Required for default serde. The ordered list columns which comprise the HBase row key. These are expected to be separated by mapping.rowkey.separator in HBase, e.g. ["symbol", "transacttime"].

mapping.rowkey.separator

Optional. The separator to use when constructing the row key. This is interpreted as a Unicode string so for binary separators use the \uXXXX syntax. Defaults to “:”.

mapping.columns

Required for default serde. A map of column definitions specifying how to map Row fields into HBase columns. Each column requires three attributes: the column family cf, the column qualifier col and the column type type. The columns which comprise the row key are denoted with cf = rowkey. Supported types are int, long, boolean, float, double and string. For example:

…​. mapping.columns { symbol { cf = "rowkey" col = "symbol" type = "string" } transacttime { cf = "rowkey" col = "transacttime" type = "long" } clordid { cf = "cf1" col = "clordid" type = "string" } orderqty { cf = "cf1" col = "orderqty" type = "int" } } …​.

batch.size

Optional. An integer value with default 1000. The number of mutations to accumulate before making an HBase RPC call. For larger cell sizes you may want to reduce this number or increase the relevant client buffers.

ZooKeeper

Output type = zookeeper.

Configuration suffix Description

connection

The ZooKeeper quorum to connect to, in the format host1:port1,…​.

schema

The schema definition. Refer to the Schema documentation

key.field.names

The list of field names that constitute the unique key of the output. Must be a subset of field.names. Must always be provided in the same order across pipeline executions.

znode.prefix

The znode path prefix that the data will be stored under. Used to isolate the use of the output from other uses of the output, and from non-Envelope paths in ZooKeeper. Default /envelope.

session.timeout.millis

The client session timeout in milliseconds. Default 1000.

connection.timeout.millis

The client connection timeout in milliseconds. Default 10000.

Tasks

For more information on tasks see the tasks guide.

Task configurations are provided at the task step level (i.e. alongside type and class).

Exception

Task class = exception.

Configuration suffix Description

message

The message that will be included on the exception. Mandatory.

Impala DDL

Task class = impala_ddl.

Configuration suffix Description

host

(Required.) The Impala daemon or load balancer fully qualified hostname to which to connect.

port

(Optional.) Port on which to connect to the Impala daemon. Defaults to 21050.

auth

(Optional.) Authentication method to use. Allowed values are none, kerberos, or ldap. Whichever method is specified here will be used, but Envelope will default to either none or kerberos depending on whether it detects that the cluster is secured or not.

debug

(Optional.) Display debug information about authentication.

krb-keytab

(Required if using auth=kerberos and no --keytab supplied.) The location of the keytab file relative to the driver process.

krb-user-principal

(Required if using auth=kerberos and no --principal supplied.) The Kerberos principal to authenticate with from the keytab.

krb-realm

(Optional.) If using a non-default realm, specify it in this parameter. Otherwise the default realm is extracted from /etc/krb5.conf.

krb-ticket-renew-interval

(Optional.) Time in seconds in which to re-obtain a Kerberos TGT. If not specified it is derived from the default ticket lifetime of TGTs from /etc/krb5.conf.

ssl

(Optional.) Whether TLS is enabled on the JDBC connection to Impala. Defaults to false.

ssl-truststore

(Optional.) JKS truststore to use when validating the Impala TLS server certificate. Defaults to the in-built JRE truststore.

ssl-truststore-password

(Optional.) If the supplied truststore requires a password to read certificates, supply it here. Defaults to empty.

username

(Required if using auth=ldap.) Username to authenticate with.

password

(Required if using auth=ldap.) Password to authenticate with.

query.type

(Required.) The DDL operation to perform. Currently supported: refresh, invalidate, add_partition, drop_partition.

query.table

(Required.) The table name for the DDL operation.

query.partition.spec

(Required if operation is add_partition and range not supplied.) The partition specification for an HDFS-backed table, e.g. "ymd = 20190101, country = 'US'".

query.partition.location

(Optional.) A location of a partition on HDFS to be specified in the DDL operation.

query.partition.range

(Required if operation is add_partition and partition spec is not supplied.) A config subsection with the following possible parameters.

query.partition.range.value

(Required if using range and boundaries not supplied.) An absolute numeric value for the lower bound of a Kudu range partition.

query.partition.range.start

(Required if using range and value not supplied.) An absolute numeric value for the lower bound of a Kudu range partition. Defaults to inclusive.

query.partition.range.end

(Required if using range and value not supplied.) An absolute numeric value for the upper bound of a Kudu range partition. Defaults to exclusive.

query.partition.range.inclusivity

(Optional.) A string indicating the range operator of the lower and upper bound, "i" for inclusive and "e" for exclusive. Allowed values are "ie", "ii", "ei", "ee".

Repetitions

For more information on repetitions see the repetitions guide.

The general configuration parameters for repetitions are:

Configuration suffix Description

type

Required. The repetition type to be used. Envelope provides scheduled and flagfile. To use a custom repetition, specify the fully qualified name of the class (or alias) implementing the Repetition interface.

min-repeat-interval

Optional. To prevent steps being reloaded too frequently, this represents the minimum interval between repetitions. The value is interpreted as a Typesafe Config duration, e.g. 60s. 5m, 1d or, without suffix, as raw milliseconds, e.g. 3600000. Defaults to 60s.

Scheduled

Repetition type = scheduled.

Configuration suffix Description

every

Required. The interval between repetitions. The value is interpreted as a Typesafe Config duration, e.g. 60s. 5m, 1d or, without suffix, as raw milliseconds, e.g. 3600000. No default.

Flag file

Repetition type = flagfile.

Configuration suffix Description

file

Required. The path to the flag file. Accepts a fully qualified URI (recommended). If not qualified with a filesystem scheme, the default filesystem implementation will be used (usually HDFS).

trigger

Optional. The mode of the trigger functionality. Can either be present or modified. With present, as soon as the file is detected a repetition is triggered and the flag file is deleted. In modified mode, the file is checked for presence or a modification time greater than the last time the step was loaded. The file is not deleted in modified mode. Defaults to present.

poll-interval

Optional. How often the flag file will be checked. The value is interpreted as a Typesafe Config duration, e.g. 60s. 5m, 1d or, without suffix, as raw milliseconds, e.g. 3600000.Defaults to 10s.

fail-after

To prevent intermittent failures to contact the filesystem from killing the job, the repetition will only raise an exception after this many consecutive failures. Defaults to 10.

Validations

Envelope automatically validates the pipeline configuration before starting execution. This feature can be disabled by setting configuration.validation.enabled = false either at the top-level for the whole pipeline, or within any scope that would be validated.

The configurations of a custom Envelope plugin (e.g. a custom deriver) can also be validated by implementing the ProvidesValidations interface. In the less common case that the plugin has its own plugins (similarly to how the data quality deriver has pluggable rules) then the higher-level plugin can implement the InstantiatesComponents interface to provide its own plugins to Envelope for configuration validation. For both of these interfaces see the Envelope code for various examples of their implementations.

User-defined functions

Spark SQL user-defined functions (UDFs) are provided with a list of UDF specifications under udfs, where each specification has the following:

Configuration suffix Description

name

The name of the UDF that will be used in SQL queries.

class

The fully qualified class name of the UDF implementation.

Schema

Envelope provides a number of ways to define the schema for components. Data type mappings are outlined in the Data Type Support section below.

Flat

Schema type = flat.

Configuration suffix Description

field.names

The list of field names in the schema.

field.types

The list of field types in the schema

Avro

Schema type = avro.

Configuration suffix Description

filepath

The path to a file containing the Avro schema definition.

literal

The literal JSON string defining the Avro schema.

Protobuf

Schema type = protobuf.

Configuration suffix Description

descriptor.filepath

The path to a Protobuf descriptor file defining the schema.

descriptor.message

Optional Protobuf schema message parameter.

Data Type Support

Flat

Envelope supports the following Spark data types when defining a schema in-line (for example, using schema.type = flat):

  • string

  • byte

  • short

  • int

  • long

  • float

  • double

  • decimal(precision,scale) or (decimal, which defaults to (10,0) per DataTypes.createDecimalType)

  • boolean

  • binary

  • date

  • timestamp

Avro

When using an Avro schema to define the Spark schema (for example, schema.type = avro), either via an inline Avro literal or a supporting Avro file, the following Spark data types are supported:

Avro Type Data Type

record

StructType

array

Array

map

Map (note: keys must be Strings)

union

StructType (each column representing the union elements, named memberN)

bytes, fixed

Binary

string, enum

String

int

Integer

long

Long

float

Float

double

Double

boolean

Boolean

null

Null

date (LogicalType, as long)

Date

timestamp-millis (LogicalType, as long)

Timestamp

decimal (LogicalType, as bytes)

Decimal

Protobuf

When using a Protobuf schema to define the Spark schema (for example, schema.type = protobuf), the following Spark data types are supported:

Protobuf FieldDescriptor Type Data Type

BOOLEAN

Boolean

BYTE_STRING

Binary

DOUBLE

Double

ENUM

String

FLOAT

Float

INT

Integer

LONG

Long

MESSAGE

StructType

STRING

String

double

Double

boolean

Boolean