Skip to content

Commit

Permalink
support throw duplcate row during realtime ingestion in RealtimePlumb…
Browse files Browse the repository at this point in the history
…er (apache#5693)
  • Loading branch information
kaijianding authored and b-slim committed May 4, 2018
1 parent 2c5f003 commit c12c163
Show file tree
Hide file tree
Showing 26 changed files with 318 additions and 57 deletions.
1 change: 1 addition & 0 deletions docs/content/ingestion/stream-pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|alertTimeout|long|Milliseconds timeout after which an alert is created if the task isn't finished by then. This allows users to monitor tasks that are failing to finish and give up the worker slot for any unexpected errors.|no (default == 0)|
|segmentWriteOutMediumFactory|String|Segment write-out medium to use when creating segments. See [Indexing Service Configuration](../configuration/indexing-service.html) page, "SegmentWriteOutMediumFactory" section for explanation and available options.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory` is used)|
|dedupColumn|String|the column to judge whether this row is already in this segment, if so, throw away this row. If it is String type column, to reduce heap cost, use long type hashcode of this column's value to judge whether this row is already ingested, so there maybe very small chance to throw away a row that is not ingested before.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|

Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.
Expand Down
1 change: 1 addition & 0 deletions docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
|------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are outside the windowPeriod.|dataSource.|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|dataSource.|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|dataSource.|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|dataSource.|Equal to your # of events per emission period.|
|`ingest/rows/output`|Number of Druid rows persisted.|dataSource.|Your # of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|dataSource.|Depends on configuration.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
"ingest/events/unparseable": [
"dataSource"
],
"ingest/events/duplicate": [
"dataSource"
],
"ingest/events/processed": [
"dataSource"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

"ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count" },
"ingest/persist/count" : { "dimensions" : ["dataSource"], "type" : "count" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.druid.segment.IndexMergerV9;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndexAddResult;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
Expand Down Expand Up @@ -107,7 +108,8 @@ public Plumber findPlumber(
version,
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.isReportParseExceptions()
config.isReportParseExceptions(),
config.getDedupColumn()
);

// Temporary directory to hold spilled segments.
Expand All @@ -125,20 +127,20 @@ public Object startJob()
}

@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {
return -1;
return Plumber.THROWAWAY;
}

final int numRows = sink.add(row, false).getRowCount();
final IncrementalIndexAddResult addResult = sink.add(row, false);

if (!sink.canAppendRow()) {
persist(committerSupplier.get());
}

return numRows;
return addResult;
}

private Sink getSink(long timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ private RealtimeIndexTask makeRealtimeTask(
reportParseExceptions,
handoffTimeout,
null,
null,
null
);
return new RealtimeIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ public Plumber findPlumber(
true,
null,
null,
null,
null
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,7 @@ private RealtimeIndexTask newRealtimeIndexTask()
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,29 @@ public class IncrementalIndexAddResult

@Nullable
private final ParseException parseException;
@Nullable
private String reasonOfNotAdded;

public IncrementalIndexAddResult(
int rowCount,
long bytesInMemory,
@Nullable ParseException parseException
@Nullable ParseException parseException,
@Nullable String reasonOfNotAdded
)
{
this.rowCount = rowCount;
this.bytesInMemory = bytesInMemory;
this.parseException = parseException;
this.reasonOfNotAdded = reasonOfNotAdded;
}

public IncrementalIndexAddResult(
int rowCount,
long bytesInMemory,
@Nullable ParseException parseException
)
{
this(rowCount, bytesInMemory, parseException, null);
}

public int getRowCount()
Expand All @@ -57,4 +70,10 @@ public ParseException getParseException()
{
return parseException;
}

@Nullable
public String getReasonOfNotAdded()
{
return reasonOfNotAdded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
private static final Boolean defaultReportParseExceptions = Boolean.FALSE;
private static final long defaultHandoffConditionTimeout = 0;
private static final long defaultAlertTimeout = 0;
private static final String defaultDedupColumn = null;

private static File createNewBasePersistDirectory()
{
Expand Down Expand Up @@ -87,7 +88,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File
defaultReportParseExceptions,
defaultHandoffConditionTimeout,
defaultAlertTimeout,
null
null,
defaultDedupColumn
);
}

Expand All @@ -108,6 +110,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File
private final long alertTimeout;
@Nullable
private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
@Nullable
private final String dedupColumn;

@JsonCreator
public RealtimeTuningConfig(
Expand All @@ -128,7 +132,8 @@ public RealtimeTuningConfig(
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("alertTimeout") Long alertTimeout,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("dedupColumn") @Nullable String dedupColumn
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
Expand Down Expand Up @@ -160,6 +165,7 @@ public RealtimeTuningConfig(
this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout;
Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0");
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
this.dedupColumn = dedupColumn == null ? defaultDedupColumn : dedupColumn;
}

@Override
Expand Down Expand Up @@ -276,6 +282,13 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
return segmentWriteOutMediumFactory;
}

@JsonProperty
@Nullable
public String getDedupColumn()
{
return dedupColumn;
}

public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
Expand All @@ -295,7 +308,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
reportParseExceptions,
handoffConditionTimeout,
alertTimeout,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
dedupColumn
);
}

Expand All @@ -318,7 +332,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir)
reportParseExceptions,
handoffConditionTimeout,
alertTimeout,
segmentWriteOutMediumFactory
segmentWriteOutMediumFactory,
dedupColumn
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class FireDepartmentMetrics
private final AtomicLong processedWithErrorsCount = new AtomicLong(0);
private final AtomicLong thrownAwayCount = new AtomicLong(0);
private final AtomicLong unparseableCount = new AtomicLong(0);
private final AtomicLong dedupCount = new AtomicLong(0);
private final AtomicLong rowOutputCount = new AtomicLong(0);
private final AtomicLong numPersists = new AtomicLong(0);
private final AtomicLong persistTimeMillis = new AtomicLong(0);
Expand Down Expand Up @@ -60,6 +61,11 @@ public void incrementThrownAway()
thrownAwayCount.incrementAndGet();
}

public void incrementDedup()
{
dedupCount.incrementAndGet();
}

public void incrementUnparseable()
{
unparseableCount.incrementAndGet();
Expand Down Expand Up @@ -145,6 +151,11 @@ public long unparseable()
return unparseableCount.get();
}

public long dedup()
{
return dedupCount.get();
}

public long rowOutput()
{
return rowOutputCount.get();
Expand Down Expand Up @@ -217,6 +228,7 @@ public FireDepartmentMetrics snapshot()
retVal.processedWithErrorsCount.set(processedWithErrorsCount.get());
retVal.thrownAwayCount.set(thrownAwayCount.get());
retVal.unparseableCount.set(unparseableCount.get());
retVal.dedupCount.set(dedupCount.get());
retVal.rowOutputCount.set(rowOutputCount.get());
retVal.numPersists.set(numPersists.get());
retVal.persistTimeMillis.set(persistTimeMillis.get());
Expand Down Expand Up @@ -247,6 +259,7 @@ public FireDepartmentMetrics merge(FireDepartmentMetrics other)
thrownAwayCount.addAndGet(otherSnapshot.thrownAway());
rowOutputCount.addAndGet(otherSnapshot.rowOutput());
unparseableCount.addAndGet(otherSnapshot.unparseable());
dedupCount.addAndGet(otherSnapshot.dedup());
numPersists.addAndGet(otherSnapshot.numPersists());
persistTimeMillis.addAndGet(otherSnapshot.persistTimeMillis());
persistBackPressureMillis.addAndGet(otherSnapshot.persistBackPressureMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.incremental.IncrementalIndexAddResult;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers;
Expand Down Expand Up @@ -335,14 +336,17 @@ private boolean runFirehoseV2(FirehoseV2 firehose)
return false;
}
InputRow inputRow = null;
int numRows = 0;
try {
inputRow = firehose.currRow();
if (inputRow != null) {
numRows = plumber.add(inputRow, committerSupplier);
if (numRows < 0) {
IncrementalIndexAddResult addResult = plumber.add(inputRow, committerSupplier);
int numRows = addResult.getRowCount();
if (numRows == -2) {
metrics.incrementDedup();
log.debug("Throwing away duplicate event[%s]", inputRow);
} else if (numRows < 0) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
log.debug("Throwing away event[%s] due to %s", inputRow, addResult.getReasonOfNotAdded());
} else {
metrics.incrementProcessed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public boolean doMonitor(ServiceEmitter emitter)
log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable);
}
emitter.emit(builder.build("ingest/events/unparseable", unparseable));
final long dedup = metrics.dedup() - previous.dedup();
if (dedup > 0) {
log.warn("[%,d] duplicate events!", dedup);
}
emitter.emit(builder.build("ingest/events/duplicate", dedup));

emitter.emit(builder.build("ingest/events/processed", metrics.processed() - previous.processed()));
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previous.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previous.numPersists()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ private Sink getOrCreateSink(final SegmentIdentifier identifier)
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
tuningConfig.isReportParseExceptions(),
null
);

try {
Expand Down Expand Up @@ -1027,6 +1028,7 @@ public int compare(File o1, File o2)
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions(),
null,
hydrants
);
sinks.put(identifier, currSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.segment.incremental.IncrementalIndexAddResult;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
Expand Down Expand Up @@ -145,23 +146,21 @@ public Object startJob()
}

@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
final SegmentIdentifier identifier = getSegmentIdentifier(row.getTimestampFromEpoch());
if (identifier == null) {
return -1;
return Plumber.THROWAWAY;
}

final int numRows;

try {
numRows = appenderator.add(identifier, row, committerSupplier).getNumRowsInSegment();
final Appenderator.AppenderatorAddResult addResult = appenderator.add(identifier, row, committerSupplier);
lastCommitterSupplier = committerSupplier;
return numRows;
return new IncrementalIndexAddResult(addResult.getNumRowsInSegment(), 0, addResult.getParseException());
}
catch (SegmentNotWritableException e) {
// Segment already started handoff
return -1;
return Plumber.NOT_WRITABLE;
}
}

Expand Down
Loading

0 comments on commit c12c163

Please sign in to comment.