Skip to content

Commit cef2f65

Browse files
Watson1978daipom
andauthored
output: add metrics for number of writing events in secondary (#4971)
**Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: Add metrics for number of writing events in secondary to Output plugins This metrics can be used to determine whether recovery is required from secondary if failure occurs. **Docs Changes**: Need to update https://docs.fluentd.org/input/monitor_agent#output-example **Release Note**: Same as the title. --------- Signed-off-by: Shizuo Fujita <[email protected]> Co-authored-by: Daijiro Fukuda <[email protected]>
1 parent adaeae3 commit cef2f65

File tree

6 files changed

+41
-2
lines changed

6 files changed

+41
-2
lines changed

lib/fluent/plugin/output.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def initialize
186186
@emit_records_metrics = nil
187187
@emit_size_metrics = nil
188188
@write_count_metrics = nil
189+
@write_secondary_count_metrics = nil
189190
@rollback_count_metrics = nil
190191
@flush_time_count_metrics = nil
191192
@slow_flush_count_metrics = nil
@@ -254,6 +255,7 @@ def configure(conf)
254255
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
255256
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
256257
@write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
258+
@write_secondary_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_secondary_count", help_text: "Number of writing events in secondary")
257259
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
258260
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
259261
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
@@ -1188,6 +1190,7 @@ def try_flush
11881190
if output.delayed_commit
11891191
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
11901192
@write_count_metrics.inc
1193+
@write_secondary_count_metrics.inc if using_secondary
11911194
@dequeued_chunks_mutex.synchronize do
11921195
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
11931196
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
@@ -1200,6 +1203,7 @@ def try_flush
12001203
dump_chunk_id = dump_unique_id_hex(chunk_id)
12011204
log.trace "adding write count", instance: self.object_id
12021205
@write_count_metrics.inc
1206+
@write_secondary_count_metrics.inc if using_secondary
12031207
log.trace "executing sync write", chunk: dump_chunk_id
12041208

12051209
output.write(chunk)
@@ -1567,6 +1571,7 @@ def statistics
15671571
'retry_count' => @num_errors_metrics.get,
15681572
'emit_count' => @emit_count_metrics.get,
15691573
'write_count' => @write_count_metrics.get,
1574+
'write_secondary_count' => @write_secondary_count_metrics.get,
15701575
'rollback_count' => @rollback_count_metrics.get,
15711576
'slow_flush_count' => @slow_flush_count_metrics.get,
15721577
'flush_time_count' => @flush_time_count_metrics.get,

test/plugin/test_in_monitor_agent.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def test_enable_input_metrics(with_config)
181181
"emit_records" => Integer,
182182
"emit_size" => Integer,
183183
"write_count" => Integer,
184+
"write_secondary_count" => Integer,
184185
"rollback_count" => Integer,
185186
"slow_flush_count" => Integer,
186187
"flush_time_count" => Integer,
@@ -203,6 +204,7 @@ def test_enable_input_metrics(with_config)
203204
"emit_records" => Integer,
204205
"emit_size" => Integer,
205206
"write_count" => Integer,
207+
"write_secondary_count" => Integer,
206208
"rollback_count" => Integer,
207209
"slow_flush_count" => Integer,
208210
"flush_time_count" => Integer,
@@ -320,6 +322,7 @@ def test_enable_input_metrics(with_config)
320322
"emit_records" => Integer,
321323
"emit_size" => Integer,
322324
"write_count" => Integer,
325+
"write_secondary_count" => Integer,
323326
"rollback_count" => Integer,
324327
"slow_flush_count" => Integer,
325328
"flush_time_count" => Integer,
@@ -342,6 +345,7 @@ def test_enable_input_metrics(with_config)
342345
"emit_records" => Integer,
343346
"emit_size" => Integer,
344347
"write_count" => Integer,
348+
"write_secondary_count" => Integer,
345349
"rollback_count" => Integer,
346350
"slow_flush_count" => Integer,
347351
"flush_time_count" => Integer,
@@ -416,6 +420,7 @@ def test_enable_input_metrics(with_config)
416420
"emit_records" => Integer,
417421
"emit_size" => Integer,
418422
"write_count" => Integer,
423+
"write_secondary_count" => Integer,
419424
"rollback_count" => Integer,
420425
"slow_flush_count" => Integer,
421426
"flush_time_count" => Integer,
@@ -430,6 +435,7 @@ def test_enable_input_metrics(with_config)
430435
"emit_records" => Integer,
431436
"emit_size" => Integer,
432437
"write_count" => Integer,
438+
"write_secondary_count" => Integer,
433439
"rollback_count" => Integer,
434440
"slow_flush_count" => Integer,
435441
"flush_time_count" => Integer,
@@ -568,6 +574,7 @@ def get(uri, header = {})
568574
"emit_records" => Integer,
569575
"emit_size" => Integer,
570576
"write_count" => Integer,
577+
"write_secondary_count" => Integer,
571578
"rollback_count" => Integer,
572579
"slow_flush_count" => Integer,
573580
"flush_time_count" => Integer,
@@ -682,6 +689,7 @@ def get(uri, header = {})
682689
"emit_records" => Integer,
683690
"emit_size" => Integer,
684691
"write_count" => Integer,
692+
"write_secondary_count" => Integer,
685693
"rollback_count" => Integer,
686694
"slow_flush_count" => Integer,
687695
"flush_time_count" => Integer,
@@ -813,6 +821,7 @@ def write(chunk)
813821
"emit_records" => Integer,
814822
"emit_size" => Integer,
815823
"write_count" => Integer,
824+
"write_secondary_count" => Integer,
816825
"rollback_count" => Integer,
817826
'slow_flush_count' => Integer,
818827
'flush_time_count' => Integer,

test/plugin/test_out_forward.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,4 +1395,15 @@ def plugin_id_for_test?
13951395
end
13961396
end
13971397
end
1398+
1399+
test 'can use metrics plugins and fallback methods' do
1400+
@d = create_driver
1401+
1402+
%w[healthy_nodes_count_metrics registered_nodes_count_metrics].each do |metric_name|
1403+
assert_true @d.instance.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
1404+
end
1405+
1406+
assert_equal 0, @d.instance.healthy_nodes_count
1407+
assert_equal 0, @d.instance.registered_nodes_count
1408+
end
13981409
end

test/plugin/test_output.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,21 @@ def waiting(seconds)
226226
test 'can use metrics plugins and fallback methods' do
227227
@i.configure(config_element())
228228

229-
%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics
230-
write_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
229+
%w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics write_count_metrics
230+
write_secondary_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name|
231231
assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics)
232232
end
233233

234234
assert_equal 0, @i.num_errors
235235
assert_equal 0, @i.emit_count
236+
assert_equal 0, @i.emit_records
236237
assert_equal 0, @i.emit_size
237238
assert_equal 0, @i.emit_records
238239
assert_equal 0, @i.write_count
240+
assert_equal 0, @i.write_secondary_count
239241
assert_equal 0, @i.rollback_count
242+
assert_equal 0, @i.flush_time_count
243+
assert_equal 0, @i.slow_flush_count
240244
end
241245

242246
data(:new_api => :chunk,

test/plugin/test_output_as_buffered_secondary.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ def dummy_event_stream
308308

309309
assert_equal 0, @i.write_count
310310
assert_equal 0, @i.num_errors
311+
assert_equal 0, @i.write_secondary_count
311312

312313
@i.enqueue_thread_wait
313314
@i.flush_thread_wakeup
@@ -348,6 +349,8 @@ def dummy_event_stream
348349
assert{ @i.write_count > prev_write_count }
349350
assert{ @i.num_errors == prev_num_errors }
350351

352+
assert{ @i.write_secondary_count > 0 }
353+
351354
assert_nil @i.retry
352355

353356
assert_equal [ 'test.tag.1', event_time('2016-04-13 18:33:00').to_i, {"name" => "moris", "age" => 36, "message" => "data1"} ], written[0]
@@ -443,6 +446,8 @@ def dummy_event_stream
443446
assert{ @i.buffer.dequeued[chunks[0].unique_id].nil? }
444447
assert{ chunks.first.empty? }
445448

449+
assert{ @i.write_secondary_count > 0 }
450+
446451
assert_nil @i.retry
447452

448453
logs = @i.log.out.logs
@@ -737,6 +742,7 @@ def dummy_event_stream
737742

738743
assert_equal 0, @i.write_count
739744
assert_equal 0, @i.num_errors
745+
assert_equal 0, @i.write_secondary_count
740746

741747
@i.enqueue_thread_wait
742748
@i.flush_thread_wakeup
@@ -765,6 +771,8 @@ def dummy_event_stream
765771
prev_write_count = @i.write_count
766772
end
767773

774+
assert{ @i.write_secondary_count > 0 }
775+
768776
# retry_timeout == 60(sec), retry_secondary_threshold == 0.8
769777

770778
assert{ now >= first_failure + 60 * 0.8 }

test/test_plugin_classes.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def initialize
151151
@emit_records_metrics = FluentTest::FluentTestCounterMetrics.new
152152
@emit_size_metrics = FluentTest::FluentTestCounterMetrics.new
153153
@write_count_metrics = FluentTest::FluentTestCounterMetrics.new
154+
@write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new
154155
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
155156
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
156157
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new
@@ -281,6 +282,7 @@ def initialize
281282
@emit_records_metrics = FluentTest::FluentTestCounterMetrics.new
282283
@emit_size_metrics = FluentTest::FluentTestCounterMetrics.new
283284
@write_count_metrics = FluentTest::FluentTestCounterMetrics.new
285+
@write_secondary_count_metrics = FluentTest::FluentTestCounterMetrics.new
284286
@rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new
285287
@flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new
286288
@slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new

0 commit comments

Comments
 (0)