-
Notifications
You must be signed in to change notification settings - Fork 155
Implement geoip
udf with Calcite
#3604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fcf4beb
cee8a16
1db8839
e1dcaa0
47a5d7a
927dafd
34132e9
b2d92a3
3b983a2
d89c911
7b3fdc6
ab6eed7
abd15ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
|
||
import com.google.common.collect.ImmutableMap; | ||
import java.math.BigDecimal; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
@@ -19,13 +20,12 @@ | |
import org.apache.calcite.rel.type.RelDataType; | ||
import org.apache.calcite.rex.RexBuilder; | ||
import org.apache.calcite.rex.RexNode; | ||
import org.apache.calcite.runtime.PairList; | ||
import org.apache.calcite.sql.SqlOperator; | ||
import org.apache.calcite.sql.fun.SqlLibraryOperators; | ||
import org.apache.calcite.sql.fun.SqlStdOperatorTable; | ||
import org.apache.calcite.sql.fun.SqlTrimFunction.Flag; | ||
import org.apache.calcite.sql.type.SqlTypeName; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
import org.apache.commons.lang3.tuple.Pair; | ||
import org.opensearch.sql.executor.QueryType; | ||
|
||
public class PPLFuncImpTable { | ||
|
@@ -91,21 +91,44 @@ default List<RelDataType> getParams() { | |
INSTANCE = new PPLFuncImpTable(builder); | ||
} | ||
|
||
private final ImmutableMap<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map; | ||
/** | ||
* The registry for built-in functions. Functions defined by the PPL specification, whose | ||
* implementations are independent of any specific data storage, should be registered here | ||
* internally. | ||
*/ | ||
private final ImmutableMap<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>> | ||
functionRegistry; | ||
|
||
/** | ||
* The external function registry. Functions whose implementations depend on a specific data | ||
* engine should be registered here. This reduces coupling between the core module and particular | ||
* storage backends. | ||
*/ | ||
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>> | ||
externalFunctionRegistry; | ||
|
||
private PPLFuncImpTable(Builder builder) { | ||
final ImmutableMap.Builder<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> | ||
final ImmutableMap.Builder<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>> | ||
mapBuilder = ImmutableMap.builder(); | ||
builder.map.forEach((k, v) -> mapBuilder.put(k, v.immutable())); | ||
this.map = ImmutableMap.copyOf(mapBuilder.build()); | ||
builder.map.forEach((k, v) -> mapBuilder.put(k, List.copyOf(v))); | ||
this.functionRegistry = ImmutableMap.copyOf(mapBuilder.build()); | ||
this.externalFunctionRegistry = new HashMap<>(); | ||
} | ||
|
||
public @Nullable RexNode resolveSafe( | ||
final RexBuilder builder, final String functionName, RexNode... args) { | ||
try { | ||
return resolve(builder, functionName, args); | ||
} catch (Exception e) { | ||
return null; | ||
/** | ||
* Register a function implementation from external services dynamically. | ||
* | ||
* @param functionName the name of the function, has to be defined in BuiltinFunctionName | ||
* @param functionImp the implementation of the function | ||
*/ | ||
public void registerExternalFunction(BuiltinFunctionName functionName, FunctionImp functionImp) { | ||
CalciteFuncSignature signature = | ||
new CalciteFuncSignature(functionName.getName(), functionImp.getParams()); | ||
if (externalFunctionRegistry.containsKey(functionName)) { | ||
externalFunctionRegistry.get(functionName).add(Pair.of(signature, functionImp)); | ||
} else { | ||
externalFunctionRegistry.put( | ||
functionName, new ArrayList<>(List.of(Pair.of(signature, functionImp)))); | ||
} | ||
} | ||
|
||
|
@@ -119,7 +142,14 @@ public RexNode resolve(final RexBuilder builder, final String functionName, RexN | |
|
||
public RexNode resolve( | ||
final RexBuilder builder, final BuiltinFunctionName functionName, RexNode... args) { | ||
final PairList<CalciteFuncSignature, FunctionImp> implementList = map.get(functionName); | ||
// Check the external function registry first. This allows the data-storage-dependent | ||
// function implementations to override the internal ones with the same name. | ||
List<Pair<CalciteFuncSignature, FunctionImp>> implementList = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [discussion] It comes to me that it may make more sense to use external function registry prior to core function registry. Then it provides the functionality of overriding core implementation with module's implementation, although we don't have such requirements for now. Or we'd better do this change until we really need it. What's your opinion? @yuancu @LantaoJin @penghuo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the change is trivial, we can implement it now to leave room for future evolvement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: implemented by retrieving implementations from the external function registry before checking the internal one. |
||
externalFunctionRegistry.get(functionName); | ||
// If the function is not part of the external registry, check the internal registry. | ||
if (implementList == null) { | ||
implementList = functionRegistry.get(functionName); | ||
} | ||
if (implementList == null || implementList.isEmpty()) { | ||
throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName)); | ||
} | ||
|
@@ -401,17 +431,17 @@ void populate() { | |
} | ||
|
||
private static class Builder extends AbstractBuilder { | ||
private final Map<BuiltinFunctionName, PairList<CalciteFuncSignature, FunctionImp>> map = | ||
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>> map = | ||
new HashMap<>(); | ||
|
||
@Override | ||
void register(BuiltinFunctionName functionName, FunctionImp implement) { | ||
CalciteFuncSignature signature = | ||
new CalciteFuncSignature(functionName.getName(), implement.getParams()); | ||
if (map.containsKey(functionName)) { | ||
map.get(functionName).add(signature, implement); | ||
map.get(functionName).add(Pair.of(signature, implement)); | ||
} else { | ||
map.put(functionName, PairList.of(signature, implement)); | ||
map.put(functionName, new ArrayList<>(List.of(Pair.of(signature, implement)))); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.opensearch.functions; | ||
|
||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
import lombok.Getter; | ||
import org.apache.calcite.adapter.enumerable.NotNullImplementor; | ||
import org.apache.calcite.adapter.enumerable.NullPolicy; | ||
import org.apache.calcite.adapter.enumerable.RexToLixTranslator; | ||
import org.apache.calcite.linq4j.tree.Expression; | ||
import org.apache.calcite.linq4j.tree.Expressions; | ||
import org.apache.calcite.rel.type.RelDataType; | ||
import org.apache.calcite.rel.type.RelDataTypeFactory; | ||
import org.apache.calcite.rex.RexCall; | ||
import org.apache.calcite.sql.type.SqlReturnTypeInference; | ||
import org.apache.calcite.sql.type.SqlTypeName; | ||
import org.opensearch.geospatial.action.IpEnrichmentActionClient; | ||
import org.opensearch.sql.common.utils.StringUtils; | ||
import org.opensearch.sql.data.model.ExprStringValue; | ||
import org.opensearch.sql.data.model.ExprTupleValue; | ||
import org.opensearch.sql.data.model.ExprValue; | ||
import org.opensearch.sql.expression.function.ImplementorUDF; | ||
import org.opensearch.transport.client.node.NodeClient; | ||
|
||
/** | ||
* {@code GEOIP(dataSourceName, ipAddress[, options])} looks up location information from given IP | ||
* addresses via OpenSearch GeoSpatial plugin API. The options is a comma-separated list of fields | ||
* to be returned. If not specified, all fields are returned. | ||
* | ||
* <p>Signatures: | ||
* | ||
* <ul> | ||
* <li>(STRING, STRING) -> MAP | ||
* <li>(STRING, STRING, STRING) -> MAP | ||
* </ul> | ||
*/ | ||
public class GeoIpFunction extends ImplementorUDF { | ||
public GeoIpFunction(NodeClient nodeClient) { | ||
super(new GeoIPImplementor(nodeClient), NullPolicy.ANY); | ||
} | ||
|
||
@Override | ||
public SqlReturnTypeInference getReturnTypeInference() { | ||
return op -> { | ||
RelDataTypeFactory typeFactory = op.getTypeFactory(); | ||
RelDataType varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR); | ||
RelDataType anyType = typeFactory.createSqlType(SqlTypeName.ANY); | ||
return typeFactory.createMapType(varcharType, anyType); | ||
}; | ||
} | ||
|
||
public static class GeoIPImplementor implements NotNullImplementor { | ||
@Getter private static NodeClient nodeClient; | ||
|
||
public GeoIPImplementor(NodeClient nodeClient) { | ||
GeoIPImplementor.nodeClient = nodeClient; | ||
} | ||
|
||
@Override | ||
public Expression implement( | ||
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) { | ||
if (getNodeClient() == null) { | ||
throw new IllegalStateException("nodeClient is null."); | ||
} | ||
List<Expression> operandsWithClient = new ArrayList<>(translatedOperands); | ||
// Since a NodeClient cannot be passed as a parameter using Expressions.constant, | ||
// it is instead provided through a function call. | ||
operandsWithClient.add(Expressions.call(GeoIPImplementor.class, "getNodeClient")); | ||
return Expressions.call(GeoIPImplementor.class, "fetchIpEnrichment", operandsWithClient); | ||
} | ||
|
||
public static Map<String, ?> fetchIpEnrichment( | ||
String dataSource, String ipAddress, NodeClient nodeClient) { | ||
return fetchIpEnrichment(dataSource, ipAddress, Collections.emptySet(), nodeClient); | ||
} | ||
|
||
public static Map<String, ?> fetchIpEnrichment( | ||
String dataSource, String ipAddress, String commaSeparatedOptions, NodeClient nodeClient) { | ||
String unquotedOptions = StringUtils.unquoteText(commaSeparatedOptions); | ||
final Set<String> options = | ||
Arrays.stream(unquotedOptions.split(",")).map(String::trim).collect(Collectors.toSet()); | ||
return fetchIpEnrichment(dataSource, ipAddress, options, nodeClient); | ||
} | ||
|
||
private static Map<String, ?> fetchIpEnrichment( | ||
String dataSource, String ipAddress, Set<String> options, NodeClient nodeClient) { | ||
IpEnrichmentActionClient ipClient = new IpEnrichmentActionClient(nodeClient); | ||
dataSource = StringUtils.unquoteText(dataSource); | ||
try { | ||
Map<String, Object> geoLocationData = ipClient.getGeoLocationData(ipAddress, dataSource); | ||
Map<String, ExprValue> enrichmentResult = | ||
geoLocationData.entrySet().stream() | ||
.filter(entry -> options.isEmpty() || options.contains(entry.getKey())) | ||
.collect( | ||
Collectors.toMap( | ||
Map.Entry::getKey, v -> new ExprStringValue(v.getValue().toString()))); | ||
@SuppressWarnings("unchecked") | ||
Map<String, ?> result = | ||
(Map<String, ?>) ExprTupleValue.fromExprValueMap(enrichmentResult).valueForCalcite(); | ||
return result; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking] what's the concern of changing PairList to List<Pair>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we may move away from Calcite in the future. We remove dependency on the Calcite library for such general data types.