diff --git a/examples/terraform/aws/kinesis/time_travel/config/const.libsonnet b/examples/terraform/aws/kinesis/time_travel/config/const.libsonnet index 04fdc7ef..e3d453ad 100644 --- a/examples/terraform/aws/kinesis/time_travel/config/const.libsonnet +++ b/examples/terraform/aws/kinesis/time_travel/config/const.libsonnet @@ -1,10 +1,12 @@ local sub = import '../../../../../../build/config/substation.libsonnet'; { + is_process: [ + sub.cnd.str.eq({ obj: { src: 'event.category' }, value: 'process' }), + sub.cnd.str.eq({ obj: { src: 'event.type' }, value: 'start' }), + ], kv_store: sub.kv_store.aws_dynamodb({ table_name: 'substation', attributes: { partition_key: 'PK', sort_key: 'SK', ttl: 'TTL', value: 'cache' }, }), - field: 'context', - field_exists: sub.cnd.num.len.gt({ obj: { src: $.field }, value: 0 }), } diff --git a/examples/terraform/aws/kinesis/time_travel/config/enrichment/config.jsonnet b/examples/terraform/aws/kinesis/time_travel/config/enrichment/config.jsonnet index c0bc284a..b56e9581 100644 --- a/examples/terraform/aws/kinesis/time_travel/config/enrichment/config.jsonnet +++ b/examples/terraform/aws/kinesis/time_travel/config/enrichment/config.jsonnet @@ -1,3 +1,4 @@ +// Puts process metadata into the KV store. local sub = import '../../../../../../../build/config/substation.libsonnet'; local const = import '../const.libsonnet'; @@ -5,14 +6,11 @@ local const = import '../const.libsonnet'; // The concurrency is set to 1 to ensure that the KV store is not updated in parallel. concurrency: 1, transforms: [ - // If the field exists, then put the value into the KV store. If the data stream is - // at risk of write heavy activity, then consider first querying the KV store to see - // if the value already exists and only writing if it does not. + // If the event is a process, then store the process metadata in the KV store + // indexed by the PID. The data is stored in the KV store for 90 days. sub.pattern.tf.conditional( - condition=sub.cnd.all(const.field_exists), - // The ttl_offset is low for the purposes of this example. It should be set to a - // value that is appropriate for the data stream (usually hours or days). - transform=sub.tf.enrich.kv_store.set({ obj: { src: 'ip', trg: const.field }, ttl_offset: '30s', kv_store: const.kv_store }), + condition=sub.cnd.all(const.is_process), + transform=sub.tf.enrich.kv_store.set({ obj: { src: 'process.pid', trg: 'process' }, prefix: 'process', ttl_offset: std.format('%dh', 24 * 90), kv_store: const.kv_store, close_kv_store: false }), ), ], } diff --git a/examples/terraform/aws/kinesis/time_travel/config/subscriber/config.jsonnet b/examples/terraform/aws/kinesis/time_travel/config/subscriber/config.jsonnet deleted file mode 100644 index a2f1b8c2..00000000 --- a/examples/terraform/aws/kinesis/time_travel/config/subscriber/config.jsonnet +++ /dev/null @@ -1,16 +0,0 @@ -local sub = import '../../../../../../../build/config/substation.libsonnet'; -local const = import '../const.libsonnet'; - -{ - concurrency: 2, - transforms: [ - // If the field doesn't exist, then get the value from the KV store. - // The value should have been previously placed into the store by the - // enrichment node. - sub.pattern.tf.conditional( - condition=sub.cnd.none(const.field_exists), - transform=sub.tf.enrich.kv_store.get({ obj: { src: 'ip', trg: const.field }, kv_store: const.kv_store }), - ), - sub.tf.send.stdout(), - ], -} diff --git a/examples/terraform/aws/kinesis/time_travel/config/transform/config.jsonnet b/examples/terraform/aws/kinesis/time_travel/config/transform/config.jsonnet new file mode 100644 index 00000000..3305a7d4 --- /dev/null +++ b/examples/terraform/aws/kinesis/time_travel/config/transform/config.jsonnet @@ -0,0 +1,28 @@ +// All values in the KV store were put there by the `enrichment` function. +local sub = import '../../../../../../../build/config/substation.libsonnet'; +local const = import '../const.libsonnet'; + +{ + concurrency: 2, + transforms: [ + // process.* + // + // This is only applied to non-process events. + sub.pattern.tf.conditional( + condition=sub.cnd.none(const.is_process), + transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.pid', trg: 'process' }, prefix: 'process', kv_store: const.kv_store }), + ), + // process.parent.* + sub.pattern.tf.conditional( + condition=sub.cnd.num.len.gt({ obj: { src: 'process.parent.pid' }, value: 0 }), + transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.parent.pid', trg: 'process.parent' }, prefix: 'process', kv_store: const.kv_store }), + ), + // process.parent.parent.* + sub.pattern.tf.conditional( + condition=sub.cnd.num.len.gt({ obj: { src: 'process.parent.parent.pid' }, value: 0 }), + transform=sub.tf.enrich.kv_store.get({ obj: { src: 'process.parent.parent.pid', trg: 'process.parent.parent' }, prefix: 'process', kv_store: const.kv_store }), + ), + // Print the results. + sub.tf.send.stdout(), + ], +} diff --git a/examples/terraform/aws/kinesis/time_travel/data.jsonl b/examples/terraform/aws/kinesis/time_travel/data.jsonl new file mode 100644 index 00000000..c0c390c5 --- /dev/null +++ b/examples/terraform/aws/kinesis/time_travel/data.jsonl @@ -0,0 +1,4 @@ +{"event":{"category":"network","type":"connection"},"process":{"name":"Spotify","pid":"d3a6c0b9d3751559f206e12fb1b8f226"},"server":{"ip":"35.186.224.39","port":443},"@timestamp":"2024-03-29T04:02:38.470000Z"} +{"event":{"category":"process","type":"start"},"process":{"command_line":"/sbin/launchd","name":"launchd","pid":"f23e8b548d2e5e1ef3e122a9c5e08a63","start":"2024-03-13T16:17:45.000000Z","parent":{"pid":"b745f7a7c3a98ac5f087be7420e6e3f9"}}} +{"event":{"category":"process","type":"start"},"process":{"command_line":"/usr/libexec/runningboardd","name":"runningboardd","pid":"8faae8aa27f9b4faff6fd98e60201e3d","start":"2024-03-13T16:17:49.000000Z","parent":{"pid":"f23e8b548d2e5e1ef3e122a9c5e08a63"}}} +{"event":{"category":"process","type":"start"},"process":{"command_line":"/Applications/Spotify.app/Contents/MacOS/Spotify","name":"Spotify","pid":"d3a6c0b9d3751559f206e12fb1b8f226","start":"2024-03-13T16:29:17.000000Z","parent":{"pid":"8faae8aa27f9b4faff6fd98e60201e3d"}}} diff --git a/examples/terraform/aws/kinesis/time_travel/post_deploy.sh b/examples/terraform/aws/kinesis/time_travel/post_deploy.sh new file mode 100644 index 00000000..6de657d7 --- /dev/null +++ b/examples/terraform/aws/kinesis/time_travel/post_deploy.sh @@ -0,0 +1,2 @@ +sleep 5 +AWS_DEFAULT_REGION=$AWS_REGION python3 ../build/scripts/aws/kinesis/put_records.py substation terraform/aws/kinesis/time_travel/data.jsonl --print-response diff --git a/examples/terraform/aws/kinesis/time_travel/terraform/_resources.tf b/examples/terraform/aws/kinesis/time_travel/terraform/_resources.tf index fd5f7661..ff885f0c 100644 --- a/examples/terraform/aws/kinesis/time_travel/terraform/_resources.tf +++ b/examples/terraform/aws/kinesis/time_travel/terraform/_resources.tf @@ -1,68 +1,15 @@ -data "aws_caller_identity" "caller" {} - -# KMS encryption key that is shared by all Substation infrastructure -module "kms" { - source = "../../../../../../build/terraform/aws/kms" - - config = { - name = "alias/substation" - policy = data.aws_iam_policy_document.kms.json - } -} - -# This policy is required to support encrypted SNS topics. -# More information: https://repost.aws/knowledge-center/cloudwatch-receive-sns-for-alarm-trigger -data "aws_iam_policy_document" "kms" { - # Allows CloudWatch to access encrypted SNS topic. - statement { - sid = "CloudWatch" - effect = "Allow" - actions = [ - "kms:Decrypt", - "kms:GenerateDataKey" - ] - - principals { - type = "Service" - identifiers = ["cloudwatch.amazonaws.com"] - } - - resources = ["*"] - } - - # Default key policy for KMS. - # https://docs.aws.amazon.com/kms/latest/developerguide/determining-access-key-policy.html - statement { - sid = "KMS" - effect = "Allow" - actions = [ - "kms:*", - ] - - principals { - type = "AWS" - identifiers = ["arn:aws:iam::${data.aws_caller_identity.caller.account_id}:root"] - } - - resources = ["*"] - } -} - module "appconfig" { source = "../../../../../../build/terraform/aws/appconfig" config = { - name = "substation" - environments = [{ - name = "example" - }] + name = "substation" + environments = [{ name = "example" }] } } # Repository for the core Substation application. module "ecr" { source = "../../../../../../build/terraform/aws/ecr" - kms = module.kms config = { name = "substation" @@ -73,7 +20,6 @@ module "ecr" { # Repository for the autoscaling application. module "ecr_autoscale" { source = "../../../../../../build/terraform/aws/ecr" - kms = module.kms config = { name = "autoscale" @@ -83,29 +29,17 @@ module "ecr_autoscale" { # SNS topic for Kinesis Data Stream autoscaling alarms. resource "aws_sns_topic" "autoscaling_topic" { - name = "autoscale" - kms_master_key_id = module.kms.id -} - -# API Gateway that sends data to Kinesis. -module "gateway" { - source = "../../../../../../build/terraform/aws/api_gateway/kinesis_data_stream" - # Always required for the Kinisis Data Stream integration. - kinesis_data_stream = module.kinesis - - config = { - name = "gateway" - } + name = "autoscale" } # Kinesis Data Stream that stores data sent from pipeline sources. module "kinesis" { source = "../../../../../../build/terraform/aws/kinesis_data_stream" - kms = module.kms config = { name = "substation" autoscaling_topic = aws_sns_topic.autoscaling_topic.arn + shards = 1 } access = [ @@ -113,15 +47,12 @@ module "kinesis" { module.lambda_autoscaling.role.name, # Consumes data from the stream. module.lambda_enrichment.role.name, - module.lambda_subscriber.role.name, - # Publishes data to the stream. - module.gateway.role.name, + module.lambda_transform.role.name, ] } module "dynamodb" { source = "../../../../../../build/terraform/aws/dynamodb" - kms = module.kms config = { name = "substation" @@ -143,6 +74,6 @@ module "dynamodb" { access = [ module.lambda_enrichment.role.name, - module.lambda_subscriber.role.name, + module.lambda_transform.role.name, ] } diff --git a/examples/terraform/aws/kinesis/time_travel/terraform/autoscaler.tf b/examples/terraform/aws/kinesis/time_travel/terraform/autoscaler.tf index c0d95021..f196d78e 100644 --- a/examples/terraform/aws/kinesis/time_travel/terraform/autoscaler.tf +++ b/examples/terraform/aws/kinesis/time_travel/terraform/autoscaler.tf @@ -10,11 +10,6 @@ module "lambda_autoscaling" { image_uri = "${module.ecr_autoscale.url}:v1.2.0" image_arm = true } - - depends_on = [ - module.appconfig.name, - module.ecr_autoscale.url, - ] } resource "aws_sns_topic_subscription" "autoscaling_subscription" { diff --git a/examples/terraform/aws/kinesis/time_travel/terraform/enrichment.tf b/examples/terraform/aws/kinesis/time_travel/terraform/enrichment.tf index 79655353..32eb059e 100644 --- a/examples/terraform/aws/kinesis/time_travel/terraform/enrichment.tf +++ b/examples/terraform/aws/kinesis/time_travel/terraform/enrichment.tf @@ -14,11 +14,6 @@ module "lambda_enrichment" { "SUBSTATION_DEBUG" : true } } - - depends_on = [ - module.appconfig.name, - module.ecr.url, - ] } resource "aws_lambda_event_source_mapping" "lambda_enrichment" { @@ -27,5 +22,8 @@ resource "aws_lambda_event_source_mapping" "lambda_enrichment" { maximum_batching_window_in_seconds = 5 batch_size = 100 parallelization_factor = 1 - starting_position = "LATEST" + # In this example, we start from the beginning of the stream, + # but in a prod environment, you may want to start from the end + # of the stream to avoid processing old data ("LATEST"). + starting_position = "TRIM_HORIZON" } diff --git a/examples/terraform/aws/kinesis/time_travel/terraform/subscriber.tf b/examples/terraform/aws/kinesis/time_travel/terraform/transform.tf similarity index 57% rename from examples/terraform/aws/kinesis/time_travel/terraform/subscriber.tf rename to examples/terraform/aws/kinesis/time_travel/terraform/transform.tf index feff9332..12523b29 100644 --- a/examples/terraform/aws/kinesis/time_travel/terraform/subscriber.tf +++ b/examples/terraform/aws/kinesis/time_travel/terraform/transform.tf @@ -1,31 +1,29 @@ -module "lambda_subscriber" { +module "lambda_transform" { source = "../../../../../../build/terraform/aws/lambda" appconfig = module.appconfig config = { - name = "subscriber" + name = "transform" description = "Substation node that reads from Kinesis with a delay to support enrichment" image_uri = "${module.ecr.url}:v1.2.0" image_arm = true env = { - "SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/subscriber" + "SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/transform" "SUBSTATION_LAMBDA_HANDLER" : "AWS_KINESIS_DATA_STREAM" "SUBSTATION_DEBUG" : true } } - - depends_on = [ - module.appconfig.name, - module.ecr.url, - ] } -resource "aws_lambda_event_source_mapping" "lambda_subscriber" { +resource "aws_lambda_event_source_mapping" "lambda_transform" { event_source_arn = module.kinesis.arn - function_name = module.lambda_subscriber.arn + function_name = module.lambda_transform.arn maximum_batching_window_in_seconds = 15 batch_size = 100 parallelization_factor = 1 - starting_position = "LATEST" + # In this example, we start from the beginning of the stream, + # but in a prod environment, you may want to start from the end + # of the stream to avoid processing old data ("LATEST"). + starting_position = "TRIM_HORIZON" }