Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Logstash >= 2.0 #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 0 additions & 73 deletions lib/logstash-input-dynamodb_jars.rb

This file was deleted.

6 changes: 4 additions & 2 deletions lib/logstash/inputs/DynamoDBLogParser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ def parse_scan(log, new_image_size)
@hash_template["dynamodb"]["sequenceNumber"] = "0"
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase

@hash_template["eventParser"] = "scan"
return parse_view_type(@hash_template)
end

public
def parse_stream(log)
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
data_hash = JSON.parse(@mapper.writeValueAsString(log))["internalObject"]
data_hash["eventParser"] = "stream"
return parse_view_type(data_hash)
end

private
Expand Down
63 changes: 42 additions & 21 deletions lib/logstash/inputs/dynamodb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,26 @@ def register

public
def run(logstash_queue)
begin
run_with_catch(logstash_queue)
rescue LogStash::ShutdownSignal
exit_threads
until @queue.empty?
@logger.info("Flushing rest of events in logstash queue")
event = @queue.pop()
queue_event(@parser.parse_stream(event), logstash_queue, @host)
end # until [email protected]?
end # begin
end # def run(logstash_queue)
$exit = false;
$logstash_queue = logstash_queue
run_with_catch(logstash_queue)
end

public
def stop
$exit = true

until @scan_queue.empty?
end

until @queue.empty?
@logger.info("Flushing rest of events in logstash queue")
event = @queue.pop()
queue_event(@parser.parse_stream(event), $logstash_queue, @host)
end # until [email protected]?

exit_threads
end

# Starts KCL app in a background thread
# Starts parallel scan if need be in a background thread
Expand Down Expand Up @@ -258,7 +267,7 @@ def setup_stream

kcl_config = KCL::KinesisClientLibConfiguration.new(@checkpointer, stream_arn, @credentials, worker_id) \
.withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON)
cloudwatch_client = nil
cloudwatch_client = nil
if @publish_metrics
cloudwatch_client = CloudWatch::AmazonCloudWatchClient.new(@credentials)
else
Expand All @@ -277,13 +286,17 @@ def scan(logstash_queue)
@connector = DynamoDBBootstrap::DynamoDBBootstrapWorker.new(@dynamodb_client, @read_ops, @table_name, @number_of_scan_threads)
start_table_copy_thread

scan_queue = @logstash_writer.getQueue()
@scan_queue = @logstash_writer.getQueue()
while true
event = scan_queue.take()
if event.getEntry().nil? and event.getSize() == -1
break
end # if event.isEmpty()
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
if !@scan_queue.empty?
event = @scan_queue.take()
if event.getEntry().nil? and event.getSize() == -1
break
end # if event.isEmpty()
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
else
sleep(0.01)
end
end # while true
end

Expand All @@ -292,14 +305,22 @@ def stream(logstash_queue)
@logger.info("Starting stream...")
start_kcl_thread

while true
event = @queue.pop()
queue_event(@parser.parse_stream(event), logstash_queue, @host)
while !$exit
if [email protected]?
event = @queue.pop()
queue_event(@parser.parse_stream(event), logstash_queue, @host)
else
sleep(0.01)
end
end # while true
end

private
def exit_threads
unless @worker.nil?
@worker.shutdown()
end # unless @worker.nil?

unless @dynamodb_scan_thread.nil?
@dynamodb_scan_thread.exit
end # unless @dynamodb_scan_thread.nil?
Expand Down
81 changes: 10 additions & 71 deletions logstash-input-dynamodb.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-dynamodb'
s.version = '1.0.0'
s.version = '1.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "This input plugin scans a specified DynamoDB table and then reads changes to a DynamoDB table from the associated DynamoDB Stream."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -19,82 +19,21 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }

# Gem dependencies
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
s.add_runtime_dependency "logstash-codec-json"
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency "activesupport-json_encoder"
s.add_development_dependency 'logstash-devutils', '>= 0.0.16'
# Jar dependencies
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticbeanstalk', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ses', '1.10.11' "
s.requirements << "jar 'log4j:log4j', '1.2.17'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-opsworks', '1.10.11'"
s.requirements << "jar 'com.amazonaws:dynamodb-streams-kinesis-adapter', '1.0.0'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-sqs', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-emr', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudformation', '1.10.11'"
s.requirements << "jar 'com.beust:jcommander', '1.48'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-redshift', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-iam', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-codedeploy', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-dynamodb', '1.10.10'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directconnect', '1.10.11'"
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.3.6'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-sns', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directory', '1.10.11'"
s.requirements << "jar 'com.google.protobuf:protobuf-java', '2.6.1'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudfront', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-kinesis', '1.10.8'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-workspaces', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-swf-libraries', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudhsm', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-simpledb', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-codepipeline', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-s3', '1.10.10'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cognitoidentity', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-machinelearning', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-logs', '1.10.11'"
s.requirements << "jar 'org.apache.commons:commons-lang3', '3.3.2'"
s.requirements << "jar 'commons-codec:commons-codec', '1.6'"
s.requirements << "jar 'com.fasterxml.jackson.core:jackson-annotations', '2.5.0'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-sts', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-route53', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticloadbalancing', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-storagegateway', '1.10.11'"
s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.3.3'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-efs', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ec2', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ssm', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.10.10'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk', '1.10.67'"
s.requirements << "jar 'com.amazonaws:dynamodb-import-export-tool', '1.0.0'"
s.requirements << "jar 'commons-lang:commons-lang', '2.6'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-config', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudtrail', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elastictranscoder', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-codecommit', '1.10.11'"
s.requirements << "jar 'joda-time:joda-time', '2.5'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-importexport', '1.10.11'"
s.requirements << "jar 'com.fasterxml.jackson.core:jackson-databind', '2.5.3'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudsearch', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk', '1.10.11'"
s.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.6.0'"
s.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.6.2'"
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.4.1'"
s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.4.1'"
s.requirements << "jar 'com.amazonaws:dynamodb-streams-kinesis-adapter', '1.0.2'"
s.requirements << "jar 'com.google.guava:guava', '15.0'"
s.requirements << "jar 'com.fasterxml.jackson.core:jackson-core', '2.5.3'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-rds', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cognitosync', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-datapipeline', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-support', '1.10.11'"
s.requirements << "jar 'commons-logging:commons-logging', '1.1.3'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatchmetrics', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-glacier', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticache', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-simpleworkflow', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-lambda', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-autoscaling', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ecs', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-devicefarm', '1.10.11'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-kms', '1.10.10'"
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatch', '1.10.8'"
s.add_runtime_dependency 'jar-dependencies'
# Development dependencies
s.add_development_dependency "logstash-devutils"
s.add_development_dependency "mocha"
end