Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved patterns command with new algorithm #3263

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Let;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Pattern;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.Relation;
Expand Down Expand Up @@ -107,6 +109,7 @@
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalTrendline;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.logical.LogicalWindow;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.ParseUtils;
Expand Down Expand Up @@ -471,6 +474,31 @@ public LogicalPlan visitParse(Parse node, AnalysisContext context) {
return child;
}

@Override
public LogicalPlan visitPattern(Pattern node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
WindowExpressionAnalyzer windowAnalyzer =
new WindowExpressionAnalyzer(expressionAnalyzer, child);
child = windowAnalyzer.analyze(node.getPatternWindowFunction(), context);
java.util.Map<String, Literal> arguments = node.getArguments();
Literal alias = arguments.getOrDefault("new_field", AstDSL.stringLiteral("patterns_field"));

TypeEnvironment curEnv = context.peek();
if (child instanceof LogicalWindow patternWindow) {
NamedExpression namedExpression =
new NamedExpression(
patternWindow.getWindowFunction().getNameOrAlias(),
new ReferenceExpression(
patternWindow.getWindowFunction().getNameOrAlias(),
patternWindow.getWindowFunction().getDelegated().type()));
curEnv.define(
new Symbol(Namespace.FIELD_NAME, namedExpression.getNameOrAlias()),
namedExpression.type());
}

return child;
}

/** Build {@link LogicalSort}. */
@Override
public LogicalPlan visitSort(Sort node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Pattern;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.Relation;
Expand Down Expand Up @@ -215,6 +216,10 @@ public T visitParse(Parse node, C context) {
return visitChildren(node, context);
}

public T visitPattern(Pattern node, C context) {
return visitChildren(node, context);
}

public T visitLet(Let node, C context) {
return visitChildren(node, context);
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.PatternMethod;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.ScoreFunction;
import org.opensearch.sql.ast.expression.Span;
Expand All @@ -54,6 +55,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Pattern;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
Expand Down Expand Up @@ -489,6 +491,22 @@ public static Parse parse(
return new Parse(parseMethod, sourceField, pattern, arguments, input);
}

public static Pattern pattern(
UnresolvedPlan input,
PatternMethod patternMethod,
UnresolvedExpression sourceField,
String alias,
java.util.Map<String, Literal> arguments) {
return new Pattern(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder if pattern is mostly syntax sugar for pattern window function, is new logical operator still required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the integration with LogicalWindow, I see they are quite similar. Yeah, I think Pattern operator is probably not needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find the Window unresolvedPlan. So replace Pattern with Window instead.

new Alias(
"patterns_field",
new WindowFunction(
new Function(patternMethod.getName(), List.of(sourceField)), List.of(), List.of()),
alias),
arguments,
input);
}

public static FillNull fillNull(UnresolvedExpression replaceNullWithMe, Field... fields) {
return new FillNull(
FillNull.ContainNullableFieldFill.ofSameValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
@RequiredArgsConstructor
public enum ParseMethod {
REGEX("regex"),
GROK("grok"),
PATTERNS("patterns");
GROK("grok");

@Getter private final String name;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.ast.expression;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public enum PatternMethod {
SIMPLE("simple_pattern"),
BRAIN("brain");

@Getter final String name;
}
53 changes: 53 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Pattern.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Pattern extends UnresolvedPlan {

private final UnresolvedExpression patternWindowFunction;

private final Map<String, Literal> arguments;

private UnresolvedPlan child;

@Override
public Pattern attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPattern(this, context);
}
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/opensearch/sql/expression/DSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,14 @@ public static FunctionExpression utc_timestamp(
return compile(functionProperties, BuiltinFunctionName.UTC_TIMESTAMP, args);
}

public static FunctionExpression brain() {
return compile(FunctionProperties.None, BuiltinFunctionName.BRAIN);
}

public static FunctionExpression simple_pattern() {
return compile(FunctionProperties.None, BuiltinFunctionName.SIMPLE_PATTERN);
}

@SuppressWarnings("unchecked")
private static <T extends FunctionImplementation> T compile(
FunctionProperties functionProperties, BuiltinFunctionName bfn, Expression... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ public enum BuiltinFunctionName {
RANK(FunctionName.of("rank")),
DENSE_RANK(FunctionName.of("dense_rank")),

SIMPLE_PATTERN(FunctionName.of("simple_pattern")),
BRAIN(FunctionName.of("brain")),

INTERVAL(FunctionName.of("interval")),

/** Data Type Convert Function. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public PatternsExpression(Expression sourceField, Expression pattern, Expression
}

@Override
ExprValue parseValue(ExprValue value) throws ExpressionEvaluationException {
public ExprValue parseValue(ExprValue value) throws ExpressionEvaluationException {
String rawString = value.stringValue();
if (useCustomPattern) {
return new ExprStringValue(pattern.matcher(rawString).replaceAll(""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package org.opensearch.sql.expression.window;

import static java.util.Collections.emptyList;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
Expand All @@ -16,6 +18,8 @@
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.expression.window.patterns.BufferPatternWindowFunction;
import org.opensearch.sql.expression.window.patterns.StreamPatternWindowFunction;
import org.opensearch.sql.expression.window.ranking.DenseRankFunction;
import org.opensearch.sql.expression.window.ranking.RankFunction;
import org.opensearch.sql.expression.window.ranking.RankingWindowFunction;
Expand All @@ -34,6 +38,8 @@ public void register(BuiltinFunctionRepository repository) {
repository.register(rowNumber());
repository.register(rank());
repository.register(denseRank());
repository.register(brain());
repository.register(simplePattern());
Comment on lines +43 to +44
Copy link
Collaborator

@dai-chen dai-chen Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking are these new algorithm really window function? If users specify order by or partition by, does it still generate meaningful result?

}

private DefaultFunctionResolver rowNumber() {
Expand All @@ -48,6 +54,26 @@ private DefaultFunctionResolver denseRank() {
return rankingFunction(BuiltinFunctionName.DENSE_RANK.getName(), DenseRankFunction::new);
}

private DefaultFunctionResolver brain() {
FunctionName functionName = BuiltinFunctionName.BRAIN.getName();
FunctionSignature functionSignature =
new FunctionSignature(functionName, Collections.singletonList(STRING));
FunctionBuilder functionBuilder =
(functionProperties, arguments) -> new BufferPatternWindowFunction(arguments);
return new DefaultFunctionResolver(
functionName, ImmutableMap.of(functionSignature, functionBuilder));
}

private DefaultFunctionResolver simplePattern() {
FunctionName functionName = BuiltinFunctionName.SIMPLE_PATTERN.getName();
FunctionSignature functionSignature =
new FunctionSignature(functionName, Collections.singletonList(STRING));
FunctionBuilder functionBuilder =
(functionProperties, arguments) -> new StreamPatternWindowFunction(arguments);
return new DefaultFunctionResolver(
functionName, ImmutableMap.of(functionSignature, functionBuilder));
}

private DefaultFunctionResolver rankingFunction(
FunctionName functionName, Supplier<RankingWindowFunction> constructor) {
FunctionSignature functionSignature = new FunctionSignature(functionName, emptyList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.expression.window.frame;

import com.google.common.collect.PeekingIterator;
import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.common.patterns.BrainLogParser;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.window.WindowDefinition;

@EqualsAndHashCode(callSuper = true)
@ToString
public class BufferPatternRowsWindowFrame extends PeerRowsWindowFrame {

private final Expression sourceField;

@Getter private final BrainLogParser logParser;

private final List<List<String>> preprocessedMessages;

public BufferPatternRowsWindowFrame(
WindowDefinition windowDefinition, BrainLogParser logParser, Expression sourceField) {
super(windowDefinition);
this.logParser = logParser;
this.sourceField = sourceField;
this.preprocessedMessages = new ArrayList<>();
}

@Override
public void load(PeekingIterator<ExprValue> it) {
if (hasNext()) {
return;
}

loadAllRows(it);

List<String> logMessages =
peers.stream()
.map(
exprValue -> {
ExprValue value = sourceField.valueOf(exprValue.bindingTuples());
return value.stringValue();
})
.toList();
this.preprocessedMessages.addAll(logParser.preprocessAllLogs(logMessages));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferPatternRowsWindowFrame should have it own spec, doet it means over all rows? @dai-chen WindowFrame definition and How to use WIndowFrame should be seperate, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity, I set the spec to empty partition by and empty sort by during AST tree parsing unresolved WindowFunction, which treats the window frame range is all rows on coordinator node. Because I haven't seen requirements on sorting and partitioning on other columns.

I think we can add partition by and sort by syntax if we see values in case users want to specify them. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think users are free to use pattern functions added with any window frame definition.

}

public List<String> currentPreprocessedMessage() {
return this.preprocessedMessages.get(position);
}
}
Loading
Loading