-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Turn SCD example in to a real package
- Loading branch information
Showing
6 changed files
with
129 additions
and
49 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
Common data models for creating (type-2 slowly changing dimensions tables)[https://en.wikipedia.org/wiki/Slowly_changing_dimension] from mutable data sources in [Dataform](https://github.com/dataform-co/dataform). | ||
|
||
## Supported warehouses | ||
|
||
- BigQuery | ||
- Redshift/PG | ||
- Snowflake | ||
|
||
_If you would like us to add support for another warehouse, please get in touch via [email](mailto:[email protected]) or [Slack](https://slack.dataform.co/)_ | ||
|
||
## Installation | ||
|
||
Add the package to your `package.json` file in your Dataform project. You can find the most up to package version on the [releases page](https://github.com/dataform-co/dataform-scd/releases). | ||
|
||
## Configure the package | ||
|
||
Create a new JS file in your `definitions/` folder create an SCD table with the following example: | ||
|
||
```js | ||
const scd = require("dataform-scd"); | ||
|
||
scd("source_data_scd", { | ||
// A unique identifier for rows in the table. | ||
uniqueKey: "user_id", | ||
// A field that stores a timestamp or date of when the row was last changed. | ||
timestamp: "updated_at", | ||
// The source table to build slowly changing dimensions from. | ||
source: { | ||
schema: "dataform_scd_example", | ||
name: "source_data", | ||
}, | ||
// Any configuration parameters to apply to the incremental table that will be created. | ||
incrementalConfig: { | ||
bigquery: { | ||
partitionBy: "updated_at", | ||
}, | ||
}, | ||
}); | ||
``` | ||
|
||
For more advanced customization of outputs, see the [example.js](https://github.com/dataform-co/dataform-scd/blob/master/definitions/example.js). | ||
|
||
### Scheduling | ||
|
||
Slowly changing dimensions can only by updated as quickly as these models are run. These models should typically be scheduled to run every day or every hour, depending on the granularity of changes you want to capture. | ||
|
||
## Data models | ||
|
||
This package will create two relations in the warehouse, for a given `name` these will be: | ||
|
||
- `{name}` - a view with `scd_valid_from` and `scd_valid_to` fields | ||
- `{name}_updates` - an incremental table that stores the change history of the source table |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
{ | ||
"steps": [ | ||
{ | ||
"name": "docker.io/node", | ||
"entrypoint": "npm", | ||
"args": ["install"] | ||
}, | ||
{ | ||
"name": "docker.io/dataformco/dataform", | ||
"args": ["compile"] | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,29 @@ | ||
const scd = require("../includes/scd.js"); | ||
const scd = require("../index"); | ||
|
||
scd("source_data_scd", { | ||
/** | ||
* Create an SCD table on top of the fake table defined in source_data.sqlx. | ||
*/ | ||
const { updates, view } = scd("source_data_scd", { | ||
// A unique identifier for rows in the table. | ||
uniqueKey: "user_id", | ||
// A field that stores a timestamp or date of when the row was last changed. | ||
timestamp: "updated_at", | ||
// The source table to build slowly changing dimensions from. | ||
source: { | ||
schema: "dataform_scd_example", | ||
name: "source_data" | ||
name: "source_data", | ||
}, | ||
// Any tags that will be added to actions. | ||
tags: ["slowly-changing-dimensions"], | ||
// Any configuration parameters to apply to the incremental table that will be created. | ||
incrementalConfig: { | ||
bigquery: { | ||
partitionBy: "updated_at" | ||
} | ||
} | ||
partitionBy: "updated_at", | ||
}, | ||
}, | ||
}); | ||
|
||
// Additional customization of the created models can be done by using the returned actions objects. | ||
updates.config({ | ||
description: "Updates table for SCD", | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/** | ||
* Builds a type-2 slowly changing dimensions table and view. | ||
*/ | ||
module.exports = ( | ||
name, | ||
{ uniqueKey, timestamp, source, tags, incrementalConfig } | ||
) => { | ||
// Create an incremental table with just pure updates, for a full history of the table. | ||
const updates = publish(`${name}_updates`, { | ||
type: "incremental", | ||
tags, | ||
...incrementalConfig, | ||
}).query( | ||
(ctx) => ` | ||
select * from ${ctx.ref(source)} | ||
${ctx.when( | ||
ctx.incremental(), | ||
`where ${timestamp} > (select max(${timestamp}) from ${ctx.self()})` | ||
)} | ||
` | ||
); | ||
|
||
// Create a view on top of the raw updates table that contains computed valid_from and valid_to fields. | ||
const view = publish(name, { | ||
type: "view", | ||
tags, | ||
columns: { | ||
scd_valid_from: `The timestamp from which this row is valid for the given ${uniqueKey}.`, | ||
scd_valid_to: `The timestamp until which this row is valid for the given ${uniqueKey}, or null if this it the latest value.`, | ||
}, | ||
}).query( | ||
(ctx) => ` | ||
select | ||
*, | ||
${timestamp} as scd_valid_from, | ||
lead(${timestamp}) over (partition by ${uniqueKey} order by ${timestamp} asc) as scd_valid_to | ||
from | ||
${ctx.ref(`${name}_updates`)} | ||
` | ||
); | ||
|
||
// Returns the tables so they can be customized. | ||
return { view, updates }; | ||
}; |