Skip to content

Commit

Permalink
Merge pull request #4 from polinabee/hash_comparison
Browse files Browse the repository at this point in the history
Implemented hash comparison for SCD Type 2 updates
  • Loading branch information
lewish authored May 9, 2023
2 parents 030eb83 + cf0a5e1 commit 2f3772b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

.df-credentials.json
node_modules/
.idea/
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ scd("source_data_scd", {
uniqueKey: "user_id",
// A field that stores a timestamp or date of when the row was last changed.
timestamp: "updated_at",
// The source table to build slowly changing dimensions from.
source: {
schema: "dataform_scd_example",
name: "source_data",
// A field that stores the hash value of the fields that we want to track changes in. If you do not want to use the hash comparison, you may omit this field or set it to null
hash: "hash_value", // OPTIONAL
// The source table to build slowly changing dimensions from.
source: {
schema: "dataform_scd_example",
name: "source_data",
},
// Any configuration parameters to apply to the incremental table that will be created.
incrementalConfig: {
Expand All @@ -44,6 +46,12 @@ For more advanced customization of outputs, see the [example.js](https://github.

Slowly changing dimensions can only by updated as quickly as these models are run. These models should typically be scheduled to run every day or every hour, depending on the granularity of changes you want to capture.

### Hash comparison option

Depending on your data update method, you may want to use the hash field option to compare rows on each execution and only add the ones that have been changed or added. To do this, please make sure your table contains a hash field created using the hash function of your choice. You can find a list of the hash functions available in BigQuery [here](https://cloud.google.com/bigquery/docs/reference/standard-sql/hash_functions). On each incremental run, the query will compare the hashes for each unique identifier to the ones in the updated table. It will only keep the rows where the hash has changed or where the row ID is not found in the current data.

If you do not want to use the hash comparison, simply omit the hash parameter from the config file or set it to `null`. If you do this, all rows with an updated timestamp will be added to the `{name}_updates` table, even if the data did not otherwise change.

## Data models

This package will create two relations in the warehouse, for a given `name` these will be:
Expand Down
6 changes: 5 additions & 1 deletion definitions/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const { updates, view } = scd("source_data_scd", {
uniqueKey: "user_id",
// A field that stores a timestamp or date of when the row was last changed.
timestamp: "updated_at",
// A field that stores the hash value of the fields that we want to track changes in. If you do not want to use the hash comparison, you may omit this field or set it to null
hash: "hash_value",
// The source table to build slowly changing dimensions from.
source: {
schema: "dataform_scd_example",
Expand All @@ -16,7 +18,7 @@ const { updates, view } = scd("source_data_scd", {
// Any tags that will be added to actions.
tags: ["slowly-changing-dimensions"],
// Optional documentation of table columns
columns: {user_id: "User ID", some_field: "Data Field", updated_at: "Timestamp for updates"},
columns: {user_id: "User ID", some_field: "Data Field", hash_value: "Hash of all fields to compare",updated_at: "Timestamp for updates"},
// Any configuration parameters to apply to the incremental table that will be created.
incrementalConfig: {
bigquery: {
Expand All @@ -27,5 +29,7 @@ const { updates, view } = scd("source_data_scd", {

// Additional customization of the created models can be done by using the returned actions objects.
updates.config({
// You can specify the output schema here if it is different than the default
schema: "dataform_scd_example",
description: "Updates table for SCD",
});
18 changes: 13 additions & 5 deletions definitions/source_data.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,32 @@ config {
WITH example_dataset AS (
SELECT
user_id,
some_field,
field_a,
field_b,
updated_at
FROM
(
SELECT
1 as user_id,
'b' AS some_field,
'b' AS field_a,
12.3 AS field_b,
date_add(current_date(), interval 1 day) AS updated_at
)
UNION ALL
(
SELECT
2 as user_id,
'a' AS some_field,
'c' AS field_a,
23.4 AS field_b,
date_add(current_date(), interval 0 day) AS updated_at
)
)
SELECT
*
*,
md5(concat(
field_a,
cast(field_b AS string)
)
) AS hash_value
FROM
example_dataset
example_dataset
29 changes: 22 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Builds a type-2 slowly changing dimensions table and view.
*/
module.exports = (
name,
{ uniqueKey, timestamp, source, tags, incrementalConfig, columns = {} }
name,
{ uniqueKey, hash, timestamp, source, tags, incrementalConfig, columns = {} }
) => {
// Create an incremental table with just pure updates, for a full history of the table.
const updates = publish(`${name}_updates`, {
Expand All @@ -12,15 +12,30 @@ module.exports = (
columns,
...incrementalConfig,
}).query(
(ctx) => `
!!hash ?
(ctx) => `
${ctx.when(
ctx.incremental(), `with ids_to_update as \
(select ${uniqueKey}, ${hash} from ${ctx.ref(source)}\
except distinct \
(select ${uniqueKey}, ${hash} from ${ctx.self()}))`
)}
select * from ${ctx.ref(source)}
${ctx.when(
ctx.incremental(),
`where ${timestamp} > (select max(${timestamp}) from ${ctx.self()})
and ${uniqueKey} in (select ${uniqueKey} from ids_to_update)`
)}`
:
(ctx) => `
select * from ${ctx.ref(source)}
${ctx.when(
ctx.incremental(),
`where ${timestamp} > (select max(${timestamp}) from ${ctx.self()})`
)}
`
)}`
);


// Create a view on top of the raw updates table that contains computed valid_from and valid_to fields.
const view = publish(name, {
type: "view",
Expand All @@ -31,7 +46,7 @@ module.exports = (
scd_valid_to: `The timestamp until which this row is valid for the given ${uniqueKey}, or null if this it the latest value.`,
},
}).query(
(ctx) => `
(ctx) => `
select
*,
${timestamp} as scd_valid_from,
Expand All @@ -43,4 +58,4 @@ module.exports = (

// Returns the tables so they can be customized.
return { view, updates };
};
};

0 comments on commit 2f3772b

Please sign in to comment.