Skip to content

Commit

Permalink
#4476 Fix streaming analytic issue
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Sep 26, 2024
1 parent 68f4cf0 commit 80b77fe
Show file tree
Hide file tree
Showing 28 changed files with 680 additions and 429 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package stroom.analytics.impl;

import stroom.query.api.v2.SearchRequest;
import stroom.query.common.v2.CompiledColumns;
import stroom.query.common.v2.StringFieldValue;
import stroom.query.common.v2.ValFilter;
import stroom.query.language.functions.FieldIndex;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ValuesConsumer;
Expand All @@ -11,38 +13,44 @@
import stroom.search.extraction.MemoryIndex;

import java.util.List;
import java.util.function.Predicate;

abstract class AbstractAnalyticFieldListConsumer implements AnalyticFieldListConsumer {

private final SearchRequest searchRequest;
private final FieldIndex fieldIndex;
private final CompiledColumns compiledColumns;
private final FieldValueExtractor fieldValueExtractor;
private final ValuesConsumer valuesConsumer;
private final MemoryIndex memoryIndex;
private final Long minEventId;
private final Predicate<Val[]> valFilter;

private long eventId;

AbstractAnalyticFieldListConsumer(final SearchRequest searchRequest,
final FieldIndex fieldIndex,
final CompiledColumns compiledColumns,
final FieldValueExtractor fieldValueExtractor,
final ValuesConsumer valuesConsumer,
final MemoryIndex memoryIndex,
final Long minEventId) {
final Long minEventId,
final Predicate<Val[]> valFilter) {
this.searchRequest = searchRequest;
this.fieldIndex = fieldIndex;
this.compiledColumns = compiledColumns;
this.fieldValueExtractor = fieldValueExtractor;
this.valuesConsumer = valuesConsumer;
this.memoryIndex = memoryIndex;
this.minEventId = minEventId;
this.valFilter = valFilter;
}

@Override
public void acceptFieldValues(final List<FieldValue> fieldValues) {
eventId++;

final FieldIndex fieldIndex = compiledColumns.getFieldIndex();
// Filter events if we have already added them to LMDB.
if (minEventId == null || minEventId <= eventId) {

// See if this set of fields matches the rule expression.
if (memoryIndex.match(searchRequest, fieldValues)) {
// We have a match so pass the values on to the receiver.
Expand All @@ -54,7 +62,9 @@ public void acceptFieldValues(final List<FieldValue> fieldValues) {
}
}

valuesConsumer.accept(Val.of(values));
if (valFilter.test(values)) {
valuesConsumer.accept(Val.of(values));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

public class DetectionConsumerProxy implements ValuesConsumer, ProcessLifecycleAware {

Expand All @@ -46,6 +47,7 @@ public class DetectionConsumerProxy implements ValuesConsumer, ProcessLifecycleA
private ExecutionSchedule executionSchedule;
private Instant executionTime;
private Instant effectiveExecutionTime;
private Predicate<Val[]> valFilter;

@Inject
public DetectionConsumerProxy(final Provider<ErrorReceiverProxy> errorReceiverProxyProvider,
Expand Down Expand Up @@ -106,6 +108,10 @@ public void setCompiledColumns(final CompiledColumns compiledColumns) {
this.compiledColumns = compiledColumns;
}

public void setValFilter(final Predicate<Val[]> valFilter) {
this.valFilter = valFilter;
}

@Override
public void accept(final Val[] values) {
// Analytics generation search extraction - create records when filters match
Expand All @@ -115,16 +121,17 @@ public void accept(final Val[] values) {
". No values to extract from ", null);
return;
}
final CompiledColumnValue[] outputValues = extractValues(values);
if (outputValues != null) {

if (valFilter.test(values)) {
final ColumnValue[] outputValues = extractValues(values);
writeRecord(outputValues);
}
}

private CompiledColumnValue[] extractValues(final Val[] vals) {
private ColumnValue[] extractValues(final Val[] vals) {
final CompiledColumn[] compiledColumnArray = compiledColumns.getCompiledColumns();
final StoredValues storedValues = compiledColumns.getValueReferenceIndex().createStoredValues();
final CompiledColumnValue[] output = new CompiledColumnValue[compiledColumnArray.length];
final ColumnValue[] output = new ColumnValue[compiledColumnArray.length];
int index = 0;

for (final CompiledColumn compiledColumn : compiledColumnArray) {
Expand All @@ -133,18 +140,7 @@ private CompiledColumnValue[] extractValues(final Val[] vals) {
if (generator != null) {
generator.set(vals, storedValues);
final Val value = generator.eval(storedValues, null);
output[index] = new CompiledColumnValue(compiledColumn, value);

if (compiledColumn.getCompiledFilter() != null) {
// If we are filtering then we need to evaluate this field
// now so that we can filter the resultant value.

if (compiledColumn.getCompiledFilter() != null && value != null
&& !compiledColumn.getCompiledFilter().match(value.toString())) {
// We want to exclude this item.
return null;
}
}
output[index] = new ColumnValue(compiledColumn.getColumn(), value);
}

index++;
Expand All @@ -159,7 +155,7 @@ private void log(final Severity severity, final String message, final Exception
"AlertExtractionReceiver", message, e);
}

private void writeRecord(final CompiledColumnValue[] columnValues) {
private void writeRecord(final ColumnValue[] columnValues) {
// final CompiledField[] compiledFieldArray = compiledFields.getCompiledFields();
if (columnValues == null || columnValues.length == 0) {
return;
Expand Down Expand Up @@ -198,10 +194,9 @@ private void writeRecord(final CompiledColumnValue[] columnValues) {
// select them, e.g.
// eval compound=field1+field2
// select compound
for (final CompiledColumnValue columnValue : columnValues) {
NullSafe.consume(columnValue, CompiledColumnValue::getVal, val -> {
final CompiledColumn compiledColumn = columnValue.getCompiledColumn();
final Column column = compiledColumn.getColumn();
for (final ColumnValue columnValue : columnValues) {
NullSafe.consume(columnValue, ColumnValue::val, val -> {
final Column column = columnValue.column();
final String columnName = column.getName();
if (FieldIndex.isStreamIdFieldName(columnName)) {
streamId.set(getSafeLong(val));
Expand Down Expand Up @@ -256,22 +251,7 @@ public static Long getSafeLong(final Val value) {
return null;
}

private static class CompiledColumnValue {
private record ColumnValue(Column column, Val val) {

private final CompiledColumn compiledColumn;
private final Val val;

public CompiledColumnValue(final CompiledColumn compiledColumn, final Val val) {
this.compiledColumn = compiledColumn;
this.val = val;
}

public CompiledColumn getCompiledColumn() {
return compiledColumn;
}

public Val getVal() {
return val;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import stroom.query.common.v2.ResultStoreManager;
import stroom.query.common.v2.ResultStoreManager.RequestAndStore;
import stroom.query.common.v2.SimpleRowCreator;
import stroom.query.common.v2.ValFilter;
import stroom.query.common.v2.format.ColumnFormatter;
import stroom.query.common.v2.format.FormatterFactory;
import stroom.query.language.SearchRequestFactory;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ref.ErrorConsumer;
import stroom.security.api.SecurityContext;
import stroom.security.api.UserIdentity;
Expand Down Expand Up @@ -67,6 +69,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class ScheduledQueryAnalyticExecutor {
Expand Down Expand Up @@ -355,13 +358,22 @@ private boolean process(final String ruleIdentity,
expressionContext,
tableSettings.getColumns(),
paramMap);
final Predicate<Val[]> valFilter = ValFilter.create(
tableSettings.getValueFilter(),
compiledColumns,
modifiedRequest.getDateTimeSettings(),
paramMap,
errorConsumer::add);

final Provider<DetectionConsumer> detectionConsumerProvider =
detectionConsumerFactory.create(analytic);
final DetectionConsumerProxy detectionConsumerProxy = detectionConsumerProxyProvider.get();
detectionConsumerProxy.setAnalyticRuleDoc(analytic);
detectionConsumerProxy.setExecutionSchedule(executionSchedule);
detectionConsumerProxy.setExecutionTime(executionTime);
detectionConsumerProxy.setEffectiveExecutionTime(effectiveExecutionTime);
detectionConsumerProxy.setCompiledColumns(compiledColumns);
detectionConsumerProxy.setValFilter(valFilter);
detectionConsumerProxy.setDetectionsConsumerProvider(detectionConsumerProvider);

try (final DuplicateCheck duplicateCheck =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import stroom.docref.DocRef;
import stroom.query.api.v2.SearchRequest;
import stroom.security.api.SecurityContext;
import stroom.security.shared.DocumentPermissionNames;
import stroom.util.NullSafe;
import stroom.util.entityevent.EntityAction;
import stroom.util.entityevent.EntityEvent;
Expand All @@ -16,14 +17,13 @@
import stroom.util.logging.LogExecutionTime;
import stroom.util.logging.LogUtil;
import stroom.util.shared.Clearable;
import stroom.util.shared.PermissionException;
import stroom.view.shared.ViewDoc;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;

import java.util.Optional;

@Singleton
@EntityEventHandler(
type = AnalyticRuleDoc.DOCUMENT_TYPE,
Expand All @@ -36,7 +36,7 @@ public class StreamingAnalyticCache implements Clearable, EntityEvent.Handler {
private final AnalyticHelper analyticHelper;
private final AnalyticRuleStore analyticRuleStore;
private final AnalyticRuleSearchRequestHelper analyticRuleSearchRequestHelper;
private final LoadingStroomCache<DocRef, Optional<StreamingAnalytic>> cache;
private final LoadingStroomCache<DocRef, StreamingAnalytic> cache;
private final SecurityContext securityContext;

@Inject
Expand All @@ -56,11 +56,15 @@ public StreamingAnalyticCache(final AnalyticHelper analyticHelper,
this::loadStreamingAnalytic);
}

public Optional<StreamingAnalytic> get(final DocRef analyticRuleRef) {
public StreamingAnalytic get(final DocRef analyticRuleRef) {
if (!securityContext.hasDocumentPermission(analyticRuleRef.getUuid(), DocumentPermissionNames.USE)) {
throw new PermissionException(securityContext.getUserIdentityForAudit(),
"You do not have permission to use this analytic doc");
}
return cache.get(analyticRuleRef);
}

private Optional<StreamingAnalytic> loadStreamingAnalytic(final DocRef analyticRuleRef) {
private StreamingAnalytic loadStreamingAnalytic(final DocRef analyticRuleRef) {
return securityContext.asProcessingUserResult(() -> {
try {
LOGGER.debug("Loading streaming analytic: {}", analyticRuleRef);
Expand All @@ -85,15 +89,15 @@ private Optional<StreamingAnalytic> loadStreamingAnalytic(final DocRef analyticR
}

LOGGER.info(() -> LogUtil.message("Finished loading rules in {}", logExecutionTime));
return Optional.of(new StreamingAnalytic(
return new StreamingAnalytic(
ruleIdentity,
analyticRuleDoc,
searchRequest,
viewDoc));
viewDoc);
} catch (final RuntimeException e) {
LOGGER.error(e::getMessage, e);
LOGGER.debug(e::getMessage, e);
throw e;
}
return Optional.empty();
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
package stroom.analytics.impl;

import stroom.analytics.api.NotificationState;
import stroom.query.api.v2.SearchRequest;
import stroom.query.language.functions.FieldIndex;
import stroom.query.common.v2.CompiledColumns;
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ValuesConsumer;
import stroom.search.extraction.FieldValueExtractor;
import stroom.search.extraction.MemoryIndex;
import stroom.search.extraction.ProcessLifecycleAware;

import java.util.List;
import java.util.function.Predicate;

public class StreamingAnalyticFieldListConsumer extends AbstractAnalyticFieldListConsumer {

private final ProcessLifecycleAware detectionConsumerProxy;

public StreamingAnalyticFieldListConsumer(final SearchRequest searchRequest,
final FieldIndex fieldIndex,
final CompiledColumns compiledColumns,
final FieldValueExtractor fieldValueExtractor,
final ValuesConsumer valuesConsumer,
final MemoryIndex memoryIndex,
final Long minEventId,
final DetectionConsumerProxy detectionConsumerProxy) {
final DetectionConsumerProxy detectionConsumerProxy,
final Predicate<Val[]> valFilter) {
super(
searchRequest,
fieldIndex,
compiledColumns,
fieldValueExtractor,
valuesConsumer,
memoryIndex,
minEventId);
minEventId,
valFilter);
this.detectionConsumerProxy = detectionConsumerProxy;
}

Expand Down
Loading

0 comments on commit 80b77fe

Please sign in to comment.