-
Notifications
You must be signed in to change notification settings - Fork 155
Implement geoip
udf with Calcite
#3604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
List<Expression> operandsWithClient = new ArrayList<>(translatedOperands); | ||
// Since a NodeClient cannot be passed as a parameter using Expressions.constant, | ||
// it is instead provided through a function call. | ||
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot find a way to pass a NodeClient
instance to fetchIpEnrichment
as an Expression
. Therefore, I created a static member variable nodeClient
and pass it to fetchIpEnrichment
with a static function.
This is not a standard way to pass variables. Please feel free to suggest if you have a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will Expressions.constant
work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yuancu What exception will it throw if using Expressions.constant
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java.sql.SQLException: Error while preparing plan [LogicalProject(name=[$0], ip=[$1], enrichmentResult=[GEOIP('dummycityindex':VARCHAR, $1, 'city':VARCHAR)])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_geoip]])...
...
Caused by: java.lang.RuntimeException: Error while compiling generated Java code:
public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get("v1stashed");
final org.apache.calcite.linq4j.Enumerable _inputEnumerable = v1stashed.scan();
return new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
public void reset() {
inputEnumerator.reset();
}
public boolean moveNext() {
return inputEnumerator.moveNext();
}
public void close() {
inputEnumerator.close();
}
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
final String input_value1 = current[1] == null ? null : current[1].toString();
return new Object[] {
current[0],
current[1],
input_value1 == null ? null : org.opensearch.sql.expression.function.udf.ip.GeoIpFunction.GeoIPImplementor.fetchIpEnrichment("dummycityindex", input_value1, "city", org.opensearch.transport.client.node.NodeClient@52804e28)};
}
};
}
};
}
public Class getElementType() {
return java.lang.Object[].class;
}
The expression org.opensearch.sql.expression.function.udf.ip.GeoIpFunction.GeoIPImplementor.fetchIpEnrichment("dummycityindex", input_value1, "city", org.opensearch.transport.client.node.NodeClient@52804e28)};
can not be correctly compiled.
This is because in ConstantExpression.write
, there isn't a corresponding handling if
for the NodeClient
class. A constant of nodeClient
falls to the default case writer.append(value)
, which invokes:
@Override
public StringBuilder append(Object obj) {
return append(String.valueOf(obj));
}
Therefore, Expressions.constant(nodeClient)
doesn't work.
core/src/main/java/org/opensearch/sql/datasource/DataSourceService.java
Outdated
Show resolved
Hide resolved
…CalcitePlanContext Signed-off-by: Yuanchun Shen <[email protected]>
@@ -347,6 +348,14 @@ public RexNode visitLet(Let node, CalcitePlanContext context) { | |||
public RexNode visitFunction(Function node, CalcitePlanContext context) { | |||
List<RexNode> arguments = | |||
node.getFuncArgs().stream().map(arg -> analyze(arg, context)).collect(Collectors.toList()); | |||
|
|||
// GEOIP needs NodeClient to perform RPC calls. Therefore, we handle it separately. | |||
if (BuiltinFunctionName.GEOIP.equals(BuiltinFunctionName.of(node.getFuncName()).orElse(null))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you evaluate how many functions will require NodeClient?
I would prefer not to construct FunctionImp
for GEOIP each time of visitFunction
, instead, we should enhance PPLFuncImpTable
to allow it to support registering dynamic functions after the service has launched?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have only came across with one function, geoip
that requires NodeClient.
@qianheng-aws I have moved geoip
implementation to the opensearch package. The function is registered inside OpenSearchExecutionEngine
during its construction.
List<Expression> operandsWithClient = new ArrayList<>(translatedOperands); | ||
// Since a NodeClient cannot be passed as a parameter using Expressions.constant, | ||
// it is instead provided through a function call. | ||
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will Expressions.constant
work?
String option = StringUtils.unquoteText(commaSeparatedOptions); | ||
// Convert the option into a set. | ||
final Set<String> options = new HashSet<>(); | ||
if (!commaSeparatedOptions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid add if on a constant value in plan, you could extract the main logic into a private function, and 2 functions of different signatures has its own handling logic on option
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qianheng-aws Neither Expressions.constant
nor Expressions.variable
work in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
…ons versions Signed-off-by: Yuanchun Shen <[email protected]>
…ExecutionEngine Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
@@ -91,13 +91,29 @@ default List<RelDataType> getParams() { | |||
INSTANCE = new PPLFuncImpTable(builder); | |||
} | |||
|
|||
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map; | |||
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace PairList
to List<Pair>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically yes. But I don't see an obvious gain in doing so. Besides, we may have to introduce javafx
as a new dependency to the core module since Pair
resides in javafx.util
. (Although we can also use the counterpart in apache commons-lang3)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map; | ||
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> | ||
functionRegistry; | ||
private final Map<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some javadocs on these two functionRegistry
s to explain how to choose them in developing udf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added.
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
@@ -91,13 +92,45 @@ default List<RelDataType> getParams() { | |||
INSTANCE = new PPLFuncImpTable(builder); | |||
} | |||
|
|||
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking] what's the concern of changing PairList to List<Pair>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we may move away from Calcite in the future. We remove dependency on the Calcite library for such general data types.
@@ -119,7 +152,12 @@ public RexNode resolve(final RexBuilder builder, final String functionName, RexN | |||
|
|||
public RexNode resolve( | |||
final RexBuilder builder, final BuiltinFunctionName functionName, RexNode... args) { | |||
final PairList<CalciteFuncSignature, FunctionImp> implementList = map.get(functionName); | |||
List<Pair<CalciteFuncSignature, FunctionImp>> implementList = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[discussion] It comes to me that it may make more sense to use external function registry prior to core function registry. Then it provides the functionality of overriding core implementation with module's implementation, although we don't have such requirements for now. Or we'd better do this change until we really need it.
What's your opinion? @yuancu @LantaoJin @penghuo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the change is trivial, we can implement it now to leave room for future evolvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: implemented by retrieving implementations from the external function registry before checking the internal one.
…ta-storage-dependent overriding Signed-off-by: Yuanchun Shen <[email protected]>
} else { | ||
externalFunctionRegistry.put( | ||
functionName, new ArrayList<>(List.of(Pair.of(signature, functionImp)))); | ||
} | ||
} | ||
|
||
public @Nullable RexNode resolveSafe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR. Could you delete this method, which is no longer needed since we've migrated all functions? It should be done in the previous migration PR.
This method will swallow the important exception message. There seems a flakey error in the CI, but we couldn't find the cause easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm, I'm guessing it's caused by some charset is missing in that machine. The function name in the log is weird. Should be i
instead of ı
.
java.lang.IllegalArgumentException: Unsupported operator: cıdrmatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. It was because I
was converted to ı
in AstExpressionBuilder.java when system locale uses Turkic languages (e.g. az-Latn-AZ
). Fixed by replacing toLowercase()
with toLowercase(Locale.ROOT)
.
Signed-off-by: Yuanchun Shen <[email protected]>
Signed-off-by: Yuanchun Shen <[email protected]>
Description
Implement
GEOIP
on Calcite engine with a UDF.Additionally, this PR fixed a function name issue which can be reproduced with:
Implementation
geoip
#3228NodeClient
instance is necessary for the implementation of GEOIP. It is first saved intoDataSourceServiceImpl
fromSqlPlugin
, then passed toCalcitePlanContext
inQueryService
, so that the client can be accessed when visiting geoip function expressions.Dependencies changed
geospatial-client
andopensearch-rest-high-level-client
dependencies are added to core asorg.opensearch.geospatial.action.IpEnrichmentActionClient
andorg.opensearch.transport.client.node.NodeClient
are necessary to communicate with the geospatial pluginRelated Issues
Resolves #3506
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.