Skip to content

Implement geoip udf with Calcite #3604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
List<RexNode> arguments =
node.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
RexNode resolvedNode =
PPLFuncImpTable.INSTANCE.resolveSafe(
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder, node.getFuncName(), arguments.toArray(new RexNode[0]));
if (resolvedNode != null) {
return resolvedNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -19,13 +20,12 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.runtime.PairList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.fun.SqlTrimFunction.Flag;
import org.apache.calcite.sql.type.SqlTypeName;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.executor.QueryType;

public class PPLFuncImpTable {
Expand Down Expand Up @@ -91,21 +91,44 @@ default List<RelDataType> getParams() {
INSTANCE = new PPLFuncImpTable(builder);
}

private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking] what's the concern of changing PairList to List<Pair>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we may move away from Calcite in the future. We remove dependency on the Calcite library for such general data types.

/**
* The registry for built-in functions. Functions defined by the PPL specification, whose
* implementations are independent of any specific data storage, should be registered here
* internally.
*/
private final ImmutableMap<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
functionRegistry;

/**
* The external function registry. Functions whose implementations depend on a specific data
* engine should be registered here. This reduces coupling between the core module and particular
* storage backends.
*/
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
externalFunctionRegistry;

private PPLFuncImpTable(Builder builder) {
final ImmutableMap.Builder<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>>
final ImmutableMap.Builder<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
mapBuilder = ImmutableMap.builder();
builder.map.forEach((k, v) -> mapBuilder.put(k, v.immutable()));
this.map = ImmutableMap.copyOf(mapBuilder.build());
builder.map.forEach((k, v) -> mapBuilder.put(k, List.copyOf(v)));
this.functionRegistry = ImmutableMap.copyOf(mapBuilder.build());
this.externalFunctionRegistry = new HashMap<>();
}

public @Nullable RexNode resolveSafe(
final RexBuilder builder, final String functionName, RexNode... args) {
try {
return resolve(builder, functionName, args);
} catch (Exception e) {
return null;
/**
* Register a function implementation from external services dynamically.
*
* @param functionName the name of the function, has to be defined in BuiltinFunctionName
* @param functionImp the implementation of the function
*/
public void registerExternalFunction(BuiltinFunctionName functionName, FunctionImp functionImp) {
CalciteFuncSignature signature =
new CalciteFuncSignature(functionName.getName(), functionImp.getParams());
if (externalFunctionRegistry.containsKey(functionName)) {
externalFunctionRegistry.get(functionName).add(Pair.of(signature, functionImp));
} else {
externalFunctionRegistry.put(
functionName, new ArrayList<>(List.of(Pair.of(signature, functionImp))));
}
}

Expand All @@ -119,7 +142,14 @@ public RexNode resolve(final RexBuilder builder, final String functionName, RexN

public RexNode resolve(
final RexBuilder builder, final BuiltinFunctionName functionName, RexNode... args) {
final PairList<CalciteFuncSignature, FunctionImp> implementList = map.get(functionName);
// Check the external function registry first. This allows the data-storage-dependent
// function implementations to override the internal ones with the same name.
List<Pair<CalciteFuncSignature, FunctionImp>> implementList =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[discussion] It comes to me that it may make more sense to use external function registry prior to core function registry. Then it provides the functionality of overriding core implementation with module's implementation, although we don't have such requirements for now. Or we'd better do this change until we really need it.

What's your opinion? @yuancu @LantaoJin @penghuo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the change is trivial, we can implement it now to leave room for future evolvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: implemented by retrieving implementations from the external function registry before checking the internal one.

externalFunctionRegistry.get(functionName);
// If the function is not part of the external registry, check the internal registry.
if (implementList == null) {
implementList = functionRegistry.get(functionName);
}
if (implementList == null || implementList.isEmpty()) {
throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName));
}
Expand Down Expand Up @@ -401,17 +431,17 @@ void populate() {
}

private static class Builder extends AbstractBuilder {
private final Map<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map =
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>> map =
new HashMap<>();

@Override
void register(BuiltinFunctionName functionName, FunctionImp implement) {
CalciteFuncSignature signature =
new CalciteFuncSignature(functionName.getName(), implement.getParams());
if (map.containsKey(functionName)) {
map.get(functionName).add(signature, implement);
map.get(functionName).add(Pair.of(signature, implement));
} else {
map.put(functionName, PairList.of(signature, implement));
map.put(functionName, new ArrayList<>(List.of(Pair.of(signature, implement))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class CalciteGeoIpFunctionsIT extends GeoIpFunctionsIT {
public void init() throws Exception {
super.init();
enableCalcite();
// TODO: "https://github.com/opensearch-project/sql/issues/3506"
// disallowCalciteFallback();
disallowCalciteFallback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
Expand All @@ -38,21 +37,33 @@
import org.opensearch.sql.executor.ExecutionEngine.Schema.Column;
import org.opensearch.sql.executor.Explain;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;

/** OpenSearch execution engine implementation. */
@RequiredArgsConstructor
public class OpenSearchExecutionEngine implements ExecutionEngine {

private final OpenSearchClient client;

private final ExecutionProtector executionProtector;
private final PlanSerializer planSerializer;

public OpenSearchExecutionEngine(
OpenSearchClient client,
ExecutionProtector executionProtector,
PlanSerializer planSerializer) {
this.client = client;
this.executionProtector = executionProtector;
this.planSerializer = planSerializer;
registerOpenSearchFunctions();
}

@Override
public void execute(PhysicalPlan physicalPlan, ResponseListener<QueryResponse> listener) {
execute(physicalPlan, ExecutionContext.emptyExecutionContext(), listener);
Expand Down Expand Up @@ -224,4 +235,12 @@ private void buildResultSet(
QueryResponse response = new QueryResponse(schema, values, null);
listener.onResponse(response);
}

/** Registers opensearch-dependent functions */
private void registerOpenSearchFunctions() {
PPLFuncImpTable.FunctionImp geoIpImpl =
(builder, args) ->
builder.makeCall(new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"), args);
PPLFuncImpTable.INSTANCE.registerExternalFunction(BuiltinFunctionName.GEOIP, geoIpImpl);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.functions;

import java.util.*;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.geospatial.action.IpEnrichmentActionClient;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.data.model.ExprStringValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.transport.client.node.NodeClient;

/**
* {@code GEOIP(dataSourceName, ipAddress[, options])} looks up location information from given IP
* addresses via OpenSearch GeoSpatial plugin API. The options is a comma-separated list of fields
* to be returned. If not specified, all fields are returned.
*
* <p>Signatures:
*
* <ul>
* <li>(STRING, STRING) -> MAP
* <li>(STRING, STRING, STRING) -> MAP
* </ul>
*/
public class GeoIpFunction extends ImplementorUDF {
public GeoIpFunction(NodeClient nodeClient) {
super(new GeoIPImplementor(nodeClient), NullPolicy.ANY);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return op -> {
RelDataTypeFactory typeFactory = op.getTypeFactory();
RelDataType varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
RelDataType anyType = typeFactory.createSqlType(SqlTypeName.ANY);
return typeFactory.createMapType(varcharType, anyType);
};
}

public static class GeoIPImplementor implements NotNullImplementor {
@Getter private static NodeClient nodeClient;

public GeoIPImplementor(NodeClient nodeClient) {
GeoIPImplementor.nodeClient = nodeClient;
}

@Override
public Expression implement(
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
if (getNodeClient() == null) {
throw new IllegalStateException("nodeClient is null.");
}
List<Expression> operandsWithClient = new ArrayList<>(translatedOperands);
// Since a NodeClient cannot be passed as a parameter using Expressions.constant,
// it is instead provided through a function call.
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient"));
return Expressions.call(GeoIPImplementor.class, "fetchIpEnrichment", operandsWithClient);
}

public static Map<String, ?> fetchIpEnrichment(
String dataSource, String ipAddress, NodeClient nodeClient) {
return fetchIpEnrichment(dataSource, ipAddress, Collections.emptySet(), nodeClient);
}

public static Map<String, ?> fetchIpEnrichment(
String dataSource, String ipAddress, String commaSeparatedOptions, NodeClient nodeClient) {
String unquotedOptions = StringUtils.unquoteText(commaSeparatedOptions);
final Set<String> options =
Arrays.stream(unquotedOptions.split(",")).map(String::trim).collect(Collectors.toSet());
return fetchIpEnrichment(dataSource, ipAddress, options, nodeClient);
}

private static Map<String, ?> fetchIpEnrichment(
String dataSource, String ipAddress, Set<String> options, NodeClient nodeClient) {
IpEnrichmentActionClient ipClient = new IpEnrichmentActionClient(nodeClient);
dataSource = StringUtils.unquoteText(dataSource);
try {
Map<String, Object> geoLocationData = ipClient.getGeoLocationData(ipAddress, dataSource);
Map<String, ExprValue> enrichmentResult =
geoLocationData.entrySet().stream()
.filter(entry -> options.isEmpty() || options.contains(entry.getKey()))
.collect(
Collectors.toMap(
Map.Entry::getKey, v -> new ExprStringValue(v.getValue().toString())));
@SuppressWarnings("unchecked")
Map<String, ?> result =
(Map<String, ?>) ExprTupleValue.fromExprValueMap(enrichmentResult).valueForCalcite();
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public UnresolvedExpression visitTakeAggFunctionCall(
/** Eval function. */
@Override
public UnresolvedExpression visitBooleanFunctionCall(BooleanFunctionCallContext ctx) {
final String functionName = ctx.conditionFunctionName().getText().toLowerCase();
final String functionName = ctx.conditionFunctionName().getText().toLowerCase(Locale.ROOT);
return buildFunction(
FUNCTION_NAME_MAPPING.getOrDefault(functionName, functionName),
ctx.functionArgs().functionArg());
Expand Down Expand Up @@ -287,15 +287,15 @@ private Function buildFunction(
public UnresolvedExpression visitSingleFieldRelevanceFunction(
SingleFieldRelevanceFunctionContext ctx) {
return new Function(
ctx.singleFieldRelevanceFunctionName().getText().toLowerCase(),
ctx.singleFieldRelevanceFunctionName().getText().toLowerCase(Locale.ROOT),
singleFieldRelevanceArguments(ctx));
}

@Override
public UnresolvedExpression visitMultiFieldRelevanceFunction(
MultiFieldRelevanceFunctionContext ctx) {
return new Function(
ctx.multiFieldRelevanceFunctionName().getText().toLowerCase(),
ctx.multiFieldRelevanceFunctionName().getText().toLowerCase(Locale.ROOT),
multiFieldRelevanceArguments(ctx));
}

Expand Down Expand Up @@ -506,7 +506,7 @@ private List<UnresolvedExpression> singleFieldRelevanceArguments(
v ->
builder.add(
new UnresolvedArgument(
v.relevanceArgName().getText().toLowerCase(),
v.relevanceArgName().getText().toLowerCase(Locale.ROOT),
new Literal(
StringUtils.unquoteText(v.relevanceArgValue().getText()),
DataType.STRING))));
Expand Down Expand Up @@ -534,7 +534,7 @@ private List<UnresolvedExpression> multiFieldRelevanceArguments(
v ->
builder.add(
new UnresolvedArgument(
v.relevanceArgName().getText().toLowerCase(),
v.relevanceArgName().getText().toLowerCase(Locale.ROOT),
new Literal(
StringUtils.unquoteText(v.relevanceArgValue().getText()),
DataType.STRING))));
Expand Down
Loading
Loading