Skip to content

Commit

Permalink
Add namespace and use event submitter to send events
Browse files Browse the repository at this point in the history
  • Loading branch information
Will-Lo committed Aug 11, 2023
1 parent b26419d commit 0f7ad19
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;


Expand All @@ -46,6 +46,7 @@ public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
public static final String METRICS_BUFFER_SIZE = "bufferSize";
public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";

public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
super(builder, properties);
Expand All @@ -60,7 +61,7 @@ protected synchronized void closeInternal() throws IOException {
this.orcFileWriter.close();
this.closed = true;
log.info("Emitting ORC event metrics");
this.metricContext.submitEvent(this.createOrcWriterMetadataEvent());
this.sendOrcWriterMetadataEvent();
this.recycleRowBatchPool();
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
Expand All @@ -70,8 +71,8 @@ protected synchronized void closeInternal() throws IOException {
}
}

GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException {
GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME);
private void sendOrcWriterMetadataEvent() {
GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME, ORC_WRITER_NAMESPACE);
Map<String, String> eventMetadataMap = Maps.newHashMap();
eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName());
eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten()));
Expand All @@ -80,6 +81,6 @@ GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException {
eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(this.batchSize));

builder.addAdditionalMetadata(eventMetadataMap);
return builder.build();
EventSubmitter.submit(metricContext, builder);
}
}

0 comments on commit 0f7ad19

Please sign in to comment.