-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnewrelic_kafka_consumer_lag_agent
executable file
·109 lines (85 loc) · 3.64 KB
/
newrelic_kafka_consumer_lag_agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#! /usr/bin/env ruby
require "rubygems"
require "bundler/setup"
Bundler.require
module KafkaConsumerLagAgent
class Agent < NewRelic::Plugin::Agent::Base
agent_guid "com.kareemk.kafka.consumer-lag"
agent_version "0.0.1"
agent_config_options :host, :port, :name
agent_human_labels("Kafka Consumer") { name }
CONSUMER_FILTER = ENV["CONSUMER_FILTER"] || ".*"
def poll_cycle
consumers_offsets = get_consumers_offsets
topics_offsets = get_topics_offsets
total_lags = []
topics_offsets.each do |topic, offsets|
consumers_offsets[topic].to_a.each do |consumer, consumer_offsets|
lags = offsets.map { |partition, offset| offset.to_i - consumer_offsets[partition].to_i }
total_lag = lags.inject(:+)
total_lags << total_lag
report_metric "#{topic}/#{consumer}", "Lag", total_lag
end
end
report_metric "Max", "Lag", total_lags.max
ensure
if @zookeeper
@zookeeper.close
@zookeeper = nil
end
end
def get_consumers_offsets
consumers_offsets = {}
zookeeper.get_children(path: '/consumers')[:children].each do |consumer|
if consumer =~ /^#{CONSUMER_FILTER}$/
puts "Tracking: #{consumer}"
else
puts "Skipping: #{consumer}"
next
end
zookeeper.get_children(path: "/consumers/#{consumer}/offsets")[:children].to_a.each do |topic|
offsets = {}
zookeeper.get_children(path: "/consumers/#{consumer}/offsets/#{topic}")[:children].each do |partition|
offsets[partition.to_i] = zookeeper.get(path: "/consumers/#{consumer}/offsets/#{topic}/#{partition}")[:data].to_i
end
consumers_offsets[topic] ||= {}
consumers_offsets[topic][consumer] = offsets
end
end
consumers_offsets
end
def get_topics_offsets
topics = zookeeper.get_children(path: '/brokers/topics')[:children]
broker_id = zookeeper.get_children(path: "/brokers/ids")[:children].first
broker = JSON.load(zookeeper.get(path: "/brokers/ids/#{broker_id}")[:data])
kafka = Poseidon::Connection.new(broker["host"], broker["port"], "newrelic-conumser-lag", 1000)
topic_broker_partition_allocations = {}
kafka.topic_metadata(topics).topics.each do |topic|
topic.available_partitions.each do |partition|
topic_broker_partition_allocations[partition.leader] ||= {}
topic_broker_partition_allocations[partition.leader][topic.name] ||= []
topic_broker_partition_allocations[partition.leader][topic.name] << partition.id
end
end
topics_offsets = {}
topic_broker_partition_allocations.each do |broker_id, allocation|
allocation.each do |topic, partitions|
broker = JSON.load(zookeeper.get(path: "/brokers/ids/#{broker_id}")[:data])
kafka = Poseidon::Connection.new(broker["host"], broker["port"], "newrelic-conumser-lag", 1000)
partition_requests = partitions.map { |partition| Poseidon::Protocol::PartitionOffsetRequest.new(partition.to_i, -1, 1) }
offset_res = kafka.offset([Poseidon::Protocol::TopicOffsetRequest.new(topic, partition_requests)])
offset_res[0].partition_offsets.each do |offset|
topics_offsets[topic] ||= {}
topics_offsets[topic][offset.partition] = offset.offsets.first.offset
end
end
end
topics_offsets
end
def zookeeper
@zookeeper ||= Zookeeper.new("#{host}:#{port}")
end
end
NewRelic::Plugin::Setup.install_agent :kafka_consumer_lag, KafkaConsumerLagAgent
NewRelic::Plugin::Run.setup_and_run
end