Skip to content

Commit

Permalink
Merge pull request #13 from dataform-co/dataform_move_to_dataform_core
Browse files Browse the repository at this point in the history
Dataform move to dataform core
  • Loading branch information
DanLee authored Nov 9, 2020
2 parents 547d21f + 7084b6a commit 29bc03b
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 144 deletions.
59 changes: 0 additions & 59 deletions includes/crossdb.js

This file was deleted.

44 changes: 24 additions & 20 deletions includes/sessionized_events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const crossdb = require("./crossdb");
const sql = require("@dataform/sql")();
const segmentCommon = require("./common");

module.exports = (params) => {
Expand Down Expand Up @@ -42,16 +42,18 @@ select
*,
coalesce(
(
${crossdb.timestampDiff(`millisecond`,
crossdb.windowFunction({
func: "lag",
value: "timestamp",
ignore_nulls: false,
partition_fields: "user_id",
order_fields: "timestamp asc",
frame_clause: " " // supplying empty frame clause as frame clause is not valid for a lag
}),
`segment_events_mapped.timestamp`
${sql.timestamps.diff(`millisecond`,
sql.windowFunction(
"lag",
"timestamp",
false,
{
partitionFields: ["user_id"],
orderFields: ["timestamp asc"]
}
),
`
segment_events_mapped.timestamp `
)}
) >= ${params.sessionTimeoutMillis},
true
Expand All @@ -64,22 +66,24 @@ with_session_index as (
-- add a session_index (users first session = 1, users second session = 2 etc)
select
*,
${crossdb.windowFunction({
func: "sum",
value: "case when session_start_event then 1 else 0 end",
ignore_nulls: false,
partition_fields: "user_id",
order_fields: 'session_starts.timestamp asc',
frame_clause: "rows between unbounded preceding and current row"
})} as session_index
${sql.windowFunction(
"sum",
"case when session_start_event then 1 else 0 end",
false,
{
partitionFields: ["user_id"],
orderFields: ['session_starts.timestamp asc'],
frameClause: "rows between unbounded preceding and current row"
}
)} as session_index
from
session_starts
)
-- add a unique session_id to each session
select
*,
${crossdb.generateSurrogateKey(["session_index", "user_id"])} as session_id
${sql.surrogateKey(["session_index", "user_id"])} as session_id
from
with_session_index
Expand Down
78 changes: 43 additions & 35 deletions includes/sessions.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const segmentCommon = require("./common");
const crossdb = require("./crossdb");
const sql = require("@dataform/sql")();

module.exports = (params) => {

Expand All @@ -24,23 +24,27 @@ ${params.includePages ?
select distinct
session_id,
${Object.entries(segmentCommon.allPageFields(params)).map(
([key, value]) => `${crossdb.windowFunction({
func: "first_value",
value: value,
ignore_nulls: false,
partition_fields: "session_id",
order_fields: 'sessionized_pages.timestamp asc',
frame_clause: "rows between unbounded preceding and unbounded following",
})} as first_${value}`).join(",\n ")},
([key, value]) => `${sql.windowFunction(
"first_value",
value,
false,
{
partitionFields: ["session_id"],
orderFields: ['sessionized_pages.timestamp asc'],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as first_${value}`).join(",\n ")},
${Object.entries(segmentCommon.allPageFields(params)).map(
([key, value]) => `${crossdb.windowFunction({
func: "last_value",
value: value,
ignore_nulls: false,
partition_fields: "session_id",
order_fields: 'sessionized_pages.timestamp asc',
frame_clause: "rows between unbounded preceding and unbounded following",
})} as last_${value}`).join(",\n ")}
([key, value]) => `${sql.windowFunction(
"last_value",
value,
false,
{
partitionFields: ["session_id"],
orderFields: ['sessionized_pages.timestamp asc'],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as last_${value}`).join(",\n ")}
from
${ctx.ref(params.defaultConfig.schema, "segment_sessionized_pages")} as sessionized_pages
),` : ``}
Expand All @@ -50,23 +54,27 @@ ${params.includeScreens ?
select distinct
session_id,
${Object.entries(segmentCommon.allScreenFields(params)).map(
([key, value]) => `${crossdb.windowFunction({
func: "first_value",
value: value,
ignore_nulls: false,
partition_fields: "session_id",
order_fields: 'sessionized_screens.timestamp asc',
frame_clause: "rows between unbounded preceding and unbounded following",
})} as first_${value}`).join(",\n ")},
([key, value]) => `${sql.windowFunction(
"first_value",
value,
false,
{
partitionFields: ["session_id"],
orderFields: ['sessionized_screens.timestamp asc'],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as first_${value}`).join(",\n ")},
${Object.entries(segmentCommon.allScreenFields(params)).map(
([key, value]) => `${crossdb.windowFunction({
func: "last_value",
value: value,
ignore_nulls: false,
partition_fields: "session_id",
order_fields: 'sessionized_screens.timestamp asc',
frame_clause: "rows between unbounded preceding and unbounded following",
})} as last_${value}`).join(",\n ")}
([key, value]) => `${sql.windowFunction(
"last_value",
value,
false,
{
partitionFields: ["session_id"],
orderFields: ['sessionized_screens.timestamp asc'],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as last_${value}`).join(",\n ")}
from
${ctx.ref(params.defaultConfig.schema, "segment_sessionized_screens")} as sessionized_screens
),` : ``}
Expand All @@ -84,7 +92,7 @@ select
${ctx.when(global.dataform.projectConfig.warehouse == "bigquery", `struct(\n `)}
${segmentCommon.enabledEvents(params).map((event) =>
`count(segment_sessionized_events.${event}_id) as total_${event}s`).join(`,\n `)},
${crossdb.timestampDiff("millisecond", "min(segment_sessionized_events.timestamp)", "max(segment_sessionized_events.timestamp)")} as duration_millis
${sql.timestamps.diff("millisecond", "min(segment_sessionized_events.timestamp)", "max(segment_sessionized_events.timestamp)")} as duration_millis
${ctx.when(global.dataform.projectConfig.warehouse == "bigquery", `) as stats`)}
-- first values in the session for page fields
Expand Down Expand Up @@ -168,4 +176,4 @@ group by
)
select * from output`)
}
}
20 changes: 11 additions & 9 deletions includes/user_map.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const crossdb = require("./crossdb");
const sql = require("@dataform/sql")();
const segmentCommon = require("./common");

module.exports = (params) => {
Expand Down Expand Up @@ -33,14 +33,16 @@ ${segmentCommon.enabledEvents(params).map((event) =>
select distinct
anonymous_id,
${crossdb.windowFunction({
func: "last_value",
value: "user_id",
ignore_nulls: false,
partition_fields: "anonymous_id",
order_fields: "anonymous_id_user_id_pairs.timestamp asc",
frame_clause: "rows between unbounded preceding and unbounded following"
})} as user_id
${sql.windowFunction(
"last_value",
"user_id",
false,
{
partitionFields: ["anonymous_id"],
orderFields: ["anonymous_id_user_id_pairs.timestamp asc"],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as user_id
from
anonymous_id_user_id_pairs
where
Expand Down
38 changes: 21 additions & 17 deletions includes/users.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const crossdb = require("./crossdb");
const sql = require("@dataform/sql")();

let USER = `coalesce(
identifies.user_id,
Expand All @@ -21,23 +21,27 @@ module.exports = (params) => {
select distinct
${USER} as user_id,
${crossdb.windowFunction({
func: "first_value",
value: "identifies.timestamp",
ignore_nulls: true,
partition_fields: USER,
order_fields: "identifies.timestamp asc",
frame_clause: "rows between unbounded preceding and unbounded following",
})} as first_seen_at
${sql.windowFunction(
"first_value",
"identifies.timestamp",
true,
{
partitionFields: [USER],
orderFields: ["identifies.timestamp asc"],
frameClause: "rows between unbounded preceding and unbounded following"
}
)} as first_seen_at
${params.customUserFields.length ? `,` : ``}
${params.customUserFields.map(f => `${crossdb.windowFunction({
func: "first_value",
value: f,
ignore_nulls: true,
partition_fields: USER,
order_fields: "identifies.timestamp desc",
frame_clause: "rows between unbounded preceding and unbounded following",
})} as ${f}`).join(",\n ")}
${params.customUserFields.map(f => `${sql.windowFunction(
"first_value",
f,
true,
{
partitionFields: [USER],
orderFields: ["identifies.timestamp desc"],
frameClause: "rows between unbounded preceding and unbounded following",
}
)} as ${f}`).join(",\n ")}
from
${ctx.ref(params.defaultConfig.schema, "segment_user_map")} as segment_user_anonymous_map
left join ${ctx.ref(params.segmentSchema, "identifies")} as identifies
Expand Down
11 changes: 8 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"dependencies": {
"@dataform/core": "1.15.3"
"@dataform/core": "1.15.5",
"@dataform/sql": "0.2.0"
}
}

0 comments on commit 29bc03b

Please sign in to comment.