Skip to content

Commit

Permalink
Merge pull request #481 from consensusnetworks/enhancement/analytics-cdk
Browse files Browse the repository at this point in the history
Update the analytics CDK stack
  • Loading branch information
hawyar authored Dec 5, 2023
2 parents 1d8ad27 + e0d42b5 commit 35d8f6e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 75 deletions.
9 changes: 3 additions & 6 deletions common/data/src/providers/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ export class Schema {

let type: GlueType = glue.Schema[typeKey]

if (name.endsWith("_at")) type = glue.Schema.TIMESTAMP
if (name.endsWith("_balance")) type = glue.Schema.BIG_INT
if (name == "amount") type = glue.Schema.BIG_INT
if (name === "price") type = glue.Schema.FLOAT
if (name == "timestamp") type = glue.Schema.TIMESTAMP

const comment = property.description
return { name, type, comment }
Expand Down Expand Up @@ -84,10 +81,10 @@ export class Schema {

const comment = property.description
if (comment.includes("PK")) column += " PRIMARY KEY"

const defaultValue = property.default
if (defaultValue !== undefined) column += ` DEFAULT ${defaultValue}`

return column
})

Expand Down
53 changes: 15 additions & 38 deletions common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,43 @@
"$comment": "analytics",
"title": "Event",
"type": "object",
"description": "Event schema covers all events emitted by the Casimir contracts (e.g. Factory, Manager, ..)",
"properties": {
"chain": {
"type": "string",
"description": "The chain which the event belongs to (e.g. iotex, ethereum)"
},
"network": {
"type": "string",
"description": "Network type (e.g. mainnet, testnet)"
},
"provider": {
"type": "string",
"description": "The provider used to source the event (e.g. infura, consensus)"
"description": "Network type (e.g. mainnet, goerli)"
},
"type": {
"type": "string",
"description": "The type of event (e.g. block, transaction)"
},
"height": {
"block_number": {
"type": "integer",
"description": "The height of the block the event belongs to"
"description": "Block number"
},
"block": {
"block_hash": {
"type": "string",
"description": "The block hash"
"description": "Block hash"
},
"transaction": {
"tx_hash": {
"type": "string",
"description": "The transaction hash"
},
"received_at": {
"timestamp": {
"type": "integer",
"description": "Timestamp of the event in unix format"
},
"sender": {
"type": "string",
"description": "The sender's address"
"description": "The timestamp of the log"
},
"recipient": {
"contract": {
"type": "string",
"description": "The recipient's address"
"description": "The contract address which emitted the event"
},
"sender_balance": {
"event": {
"type": "string",
"description": "The sender's balance at the time of the event"
"description": "Type of casimir contract event (e.g. StakeDeposit, StakeRebalanced, WithrawalFulfilled)"
},
"recipient_balance": {
"sender": {
"type": "string",
"description": "The recipient's balance at the time of the event"
"description": "The sender's address"
},
"amount": {
"type": "string",
"description": "The amount transferred in the event"
},
"price": {
"type": "string",
"description": "The exchange price of the coin at the time of the event"
},
"gas_fee": {
"type": "integer",
"description": "The gas fee paid for the transaction"
}
}
}
34 changes: 17 additions & 17 deletions infrastructure/cdk/src/providers/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { Construct } from "constructs"
import * as cdk from "aws-cdk-lib"
import * as s3 from "aws-cdk-lib/aws-s3"
import * as glue from "@aws-cdk/aws-glue-alpha"
import { Schema, eventSchema, actionSchema } from "@casimir/data"
import { Schema, eventSchema } from "@casimir/data"
import { kebabCase, pascalCase, snakeCase } from "@casimir/format"
import { Config } from "./config"
import { AnalyticsStackProps } from "../interfaces/StackProps"
import { CfnWorkGroup } from "aws-cdk-lib/aws-athena"

/**
* Data analytics stack
Expand All @@ -19,38 +20,37 @@ export class AnalyticsStack extends cdk.Stack {
const config = new Config()

const eventColumns = new Schema(eventSchema).getGlueColumns()
const actionColumns = new Schema(actionSchema).getGlueColumns()

const database = new glue.Database(this, config.getFullStackResourceName(this.name, "database"), {
databaseName: snakeCase(config.getFullStackResourceName(this.name, "database"))
const database = new glue.Database(this, config.getFullStackResourceName(this.name, "database", config.dataVersion), {
databaseName: snakeCase(config.getFullStackResourceName(this.name, "database", config.dataVersion)),
})

const eventBucket = new s3.Bucket(this, config.getFullStackResourceName(this.name, "event-bucket", config.dataVersion), {
bucketName: kebabCase(config.getFullStackResourceName(this.name, "event-bucket", config.dataVersion))
})

const actionBucket = new s3.Bucket(this, config.getFullStackResourceName(this.name, "action-bucket", config.dataVersion), {
bucketName: kebabCase(config.getFullStackResourceName(this.name, "action-bucket", config.dataVersion))
})

new s3.Bucket(this, config.getFullStackResourceName(this.name, "output-bucket", config.dataVersion), {
const outputBucket = new s3.Bucket(this, config.getFullStackResourceName(this.name, "output-bucket", config.dataVersion), {
bucketName: kebabCase(config.getFullStackResourceName(this.name, "output-bucket", config.dataVersion))
})

new CfnWorkGroup(this, config.getFullStackResourceName(this.name, "workGroup", config.dataVersion), {
name: config.getFullStackResourceName(this.name, "workGroup", config.dataVersion),
recursiveDeleteOption: true,
state: "ENABLED",
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${outputBucket.bucketName}/`,
},
},
tags: [{ key: "version", value: config.dataVersion.toString() }],
})

new glue.Table(this, config.getFullStackResourceName(this.name, "event-table", config.dataVersion), {
database: database,
tableName: snakeCase(config.getFullStackResourceName(this.name, "event-table", config.dataVersion)),
bucket: eventBucket,
columns: eventColumns,
dataFormat: glue.DataFormat.JSON,
})

new glue.Table(this, config.getFullStackResourceName(this.name, "action-table", config.dataVersion), {
database: database,
tableName: snakeCase(config.getFullStackResourceName(this.name, "action-table", config.dataVersion)),
bucket: actionBucket,
columns: actionColumns,
dataFormat: glue.DataFormat.JSON,
})
}
}
19 changes: 5 additions & 14 deletions infrastructure/cdk/test/analytics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as cdk from "aws-cdk-lib"
import * as assertions from "aws-cdk-lib/assertions"
import { Config } from "../src/providers/config"
import { AnalyticsStack } from "../src/providers/analytics"
import { Schema, eventSchema, actionSchema } from "@casimir/data"
import { Schema, eventSchema, } from "@casimir/data"

test("Analytics stack created", () => {
const config = new Config()
Expand Down Expand Up @@ -30,17 +30,8 @@ test("Analytics stack created", () => {
expect(columnName).toEqual(name)
}

const actionTable = Object.keys(resource).filter(key => key.includes("ActionTable"))[0]
const actionColumns = resource[actionTable].Properties.TableInput.StorageDescriptor.Columns
const actionGlueSchema = new Schema(actionSchema).getGlueColumns()


for (const column of actionColumns) {
const { Name: name, Type: type } = column
const columnName = Object.keys(actionSchema.properties).filter(key => key === name)[0]
const columnType = actionGlueSchema.filter(key => key.name === name)[0].type.inputString

expect(columnType).toEqual(type)
expect(columnName).toEqual(name)
}
const workgroup = analyticsTemplate.findResources("AWS::Athena::WorkGroup")
const outputBucket = Object.keys(analyticsTemplate.findResources("AWS::S3::Bucket")).filter(key => key.includes("OutputBucket"))[0]
const workgroupRef = workgroup[Object.keys(workgroup)[0]].Properties.WorkGroupConfiguration.ResultConfiguration.OutputLocation["Fn::Join"][1][1]["Ref"]
expect(workgroupRef).toEqual(outputBucket)
})

0 comments on commit 35d8f6e

Please sign in to comment.