Skip to content

Implement geoip udf with Calcite #3604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 28, 2025
Merged

Conversation

yuancu
Copy link
Contributor

@yuancu yuancu commented May 6, 2025

Description

Implement GEOIP on Calcite engine with a UDF.

Additionally, this PR fixed a function name issue which can be reproduced with:

./gradlew ':integ-test:integTest' --tests "org.opensearch.sql.calcite.standalone.CalcitePPLIPFunctionIT.testCidrMatch" -Dtests.locale=az-Latn-AZ
CalcitePPLIPFunctionIT > testCidrMatch FAILED

    java.lang.IllegalArgumentException: Unsupported operator: cıdrmatch

Implementation

  • Talk to the geospatial plugin to retrieve GEOIP information
  • The exact implementation is primarily copied from PPL command expression implementation for geoip #3228
  • A NodeClient instance is necessary for the implementation of GEOIP. It is first saved into DataSourceServiceImpl from SqlPlugin, then passed to CalcitePlanContext in QueryService, so that the client can be accessed when visiting geoip function expressions.

Dependencies changed

  • geospatial-client and opensearch-rest-high-level-client dependencies are added to core as org.opensearch.geospatial.action.IpEnrichmentActionClient and org.opensearch.transport.client.node.NodeClient are necessary to communicate with the geospatial plugin

Related Issues

Resolves #3506

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

yuancu added 2 commits May 6, 2025 16:33
Signed-off-by: Yuanchun Shen <[email protected]>
@yuancu yuancu marked this pull request as ready for review May 6, 2025 09:52
@yuancu yuancu changed the title Implement geoip UDF Implement geoip udf with Calcite May 6, 2025
@noCharger noCharger added the calcite calcite migration releated label May 6, 2025
List<Expression> operandsWithClient = new ArrayList<>(translatedOperands);
// Since a NodeClient cannot be passed as a parameter using Expressions.constant,
// it is instead provided through a function call.
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find a way to pass a NodeClient instance to fetchIpEnrichment as an Expression. Therefore, I created a static member variable nodeClient and pass it to fetchIpEnrichment with a static function.

This is not a standard way to pass variables. Please feel free to suggest if you have a better solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Expressions.constant work?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuancu What exception will it throw if using Expressions.constant?

Copy link
Contributor Author

@yuancu yuancu May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.sql.SQLException: Error while preparing plan [LogicalProject(name=[$0], ip=[$1], enrichmentResult=[GEOIP('dummycityindex':VARCHAR, $1, 'city':VARCHAR)])
  CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_geoip]])...
...
Caused by: java.lang.RuntimeException: Error while compiling generated Java code:

public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
  final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get("v1stashed");
  final org.apache.calcite.linq4j.Enumerable _inputEnumerable = v1stashed.scan();
  return new org.apache.calcite.linq4j.AbstractEnumerable(){
      public org.apache.calcite.linq4j.Enumerator enumerator() {
        return new org.apache.calcite.linq4j.Enumerator(){
            public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
            public void reset() {
              inputEnumerator.reset();
            }

            public boolean moveNext() {
              return inputEnumerator.moveNext();
            }

            public void close() {
              inputEnumerator.close();
            }

            public Object current() {
              final Object[] current = (Object[]) inputEnumerator.current();
              final String input_value1 = current[1] == null ? null : current[1].toString();
              return new Object[] {
                  current[0],
                  current[1],
                  input_value1 == null ? null : org.opensearch.sql.expression.function.udf.ip.GeoIpFunction.GeoIPImplementor.fetchIpEnrichment("dummycityindex", input_value1, "city", org.opensearch.transport.client.node.NodeClient@52804e28)};
            }

          };
      }

    };
}


public Class getElementType() {
  return java.lang.Object[].class;
}

The expression org.opensearch.sql.expression.function.udf.ip.GeoIpFunction.GeoIPImplementor.fetchIpEnrichment("dummycityindex", input_value1, "city", org.opensearch.transport.client.node.NodeClient@52804e28)}; can not be correctly compiled.

This is because in ConstantExpression.write, there isn't a corresponding handling if for the NodeClient class. A constant of nodeClient falls to the default case writer.append(value), which invokes:

    @Override
    public StringBuilder append(Object obj) {
        return append(String.valueOf(obj));
    }

Therefore, Expressions.constant(nodeClient) doesn't work.

@@ -347,6 +348,14 @@ public RexNode visitLet(Let node, CalcitePlanContext context) {
public RexNode visitFunction(Function node, CalcitePlanContext context) {
List<RexNode> arguments =
node.getFuncArgs().stream().map(arg -> analyze(arg, context)).collect(Collectors.toList());

// GEOIP needs NodeClient to perform RPC calls. Therefore, we handle it separately.
if (BuiltinFunctionName.GEOIP.equals(BuiltinFunctionName.of(node.getFuncName()).orElse(null))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you evaluate how many functions will require NodeClient?

I would prefer not to construct FunctionImp for GEOIP each time of visitFunction, instead, we should enhance PPLFuncImpTable to allow it to support registering dynamic functions after the service has launched?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have only came across with one function, geoip that requires NodeClient.

@qianheng-aws I have moved geoip implementation to the opensearch package. The function is registered inside OpenSearchExecutionEngine during its construction.

List<Expression> operandsWithClient = new ArrayList<>(translatedOperands);
// Since a NodeClient cannot be passed as a parameter using Expressions.constant,
// it is instead provided through a function call.
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Expressions.constant work?

String option = StringUtils.unquoteText(commaSeparatedOptions);
// Convert the option into a set.
final Set<String> options = new HashSet<>();
if (!commaSeparatedOptions.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid add if on a constant value in plan, you could extract the main logic into a private function, and 2 functions of different signatures has its own handling logic on option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qianheng-aws Neither Expressions.constant nor Expressions.variable work in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@yuancu yuancu requested a review from qianheng-aws May 21, 2025 03:42
@@ -91,13 +91,29 @@ default List<RelDataType> getParams() {
INSTANCE = new PPLFuncImpTable(builder);
}

private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map;
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace PairList to List<Pair>?

Copy link
Contributor Author

@yuancu yuancu May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes. But I don't see an obvious gain in doing so. Besides, we may have to introduce javafx as a new dependency to the core module since Pair resides in javafx.util. (Although we can also use the counterpart in apache commons-lang3)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map;
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
functionRegistry;
private final Map<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some javadocs on these two functionRegistrys to explain how to choose them in developing udf.

Copy link
Contributor Author

@yuancu yuancu May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added.

@@ -91,13 +92,45 @@ default List<RelDataType> getParams() {
INSTANCE = new PPLFuncImpTable(builder);
}

private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking] what's the concern of changing PairList to List<Pair>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we may move away from Calcite in the future. We remove dependency on the Calcite library for such general data types.

@@ -119,7 +152,12 @@ public RexNode resolve(final RexBuilder builder, final String functionName, RexN

public RexNode resolve(
final RexBuilder builder, final BuiltinFunctionName functionName, RexNode... args) {
final PairList<CalciteFuncSignature, FunctionImp> implementList = map.get(functionName);
List<Pair<CalciteFuncSignature, FunctionImp>> implementList =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[discussion] It comes to me that it may make more sense to use external function registry prior to core function registry. Then it provides the functionality of overriding core implementation with module's implementation, although we don't have such requirements for now. Or we'd better do this change until we really need it.

What's your opinion? @yuancu @LantaoJin @penghuo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the change is trivial, we can implement it now to leave room for future evolvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: implemented by retrieving implementations from the external function registry before checking the internal one.

…ta-storage-dependent overriding

Signed-off-by: Yuanchun Shen <[email protected]>
} else {
externalFunctionRegistry.put(
functionName, new ArrayList<>(List.of(Pair.of(signature, functionImp))));
}
}

public @Nullable RexNode resolveSafe(
Copy link
Collaborator

@qianheng-aws qianheng-aws May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR. Could you delete this method, which is no longer needed since we've migrated all functions? It should be done in the previous migration PR.

This method will swallow the important exception message. There seems a flakey error in the CI, but we couldn't find the cause easily.

Copy link
Collaborator

@qianheng-aws qianheng-aws May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, I'm guessing it's caused by some charset is missing in that machine. The function name in the log is weird. Should be i instead of ı.

java.lang.IllegalArgumentException: Unsupported operator: cıdrmatch

Copy link
Contributor Author

@yuancu yuancu May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. It was because I was converted to ı in AstExpressionBuilder.java when system locale uses Turkic languages (e.g. az-Latn-AZ). Fixed by replacing toLowercase() with toLowercase(Locale.ROOT).

@LantaoJin LantaoJin merged commit 92cb089 into opensearch-project:main May 28, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
calcite calcite migration releated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE][Calcite engine] Support function geoip
5 participants