Skip to content

Add lambda function and array related functions #3584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

xinyual
Copy link
Contributor

@xinyual xinyual commented Apr 27, 2025

Description

This pr adds lambda function and array related functions. Calcite don't have array related functions so we need to implement by ourselves.
Now the logic for lambda is:
We will consider lambda function as a new PPL expression and parse it regularly to construct rexnode. To get return type for lambda expression, we need to firstly map the argument type in the calciteContext. For example, forall(array(1, 2, 3), x -> x > 0), then x -> INTEGER.
We also have an exception for reduce because the acc is the dynamic type.
The calcite/lin4j generate code according to the input type. For example, reduce(array(1.0,2.0 ,3.0), 0, (acc, x) -> acc + x). Ideally, we should map acc -> INTEGER, x -> DOUBLE. But if we map through this, the code of + would be plus(INTERGER acc, DOUBLE x), then after first apply, the acc would be double, then it will throw exception. Thus, we apply ANY to the acc and infer the return type in getReturnTypeInference

The function is aligned with https://github.com/opensearch-project/opensearch-spark/blob/main/docs/ppl-lang/functions/ppl-collection.md

TODO: nested object is not supported in lambda currently. It will be automatically supported when we support this. E.g. x -> x.a > 0

For detailed implementation and description:

Functions argument description return type implementation
ARRAY ARRAY(value1: ANY, value2:ANY, ...) create an array with input values. Currently we don't allow mixture types. We will infer a least restricted type, for example array(1, "demo") -> ["1", "demo"] ARRAY wrap SqlLibraryOperators.ARRAY
ARRAY_LENGTH ARRAY_LENGTH(value: ARRAY) return array length integer SqlLibraryOperators.ARRAY_LENGTH
FORALL forall(value:ARRAY, function: LAMBDA) check whether all element inside array can meet the lambda function. The function should also return boolean. boolean implement by ourselves since we cannot find matched built-in calcite one.
EXISTS exists(value:ARRAY, function: LAMBDA) check whether existing one of element inside array can meet the lambda function. The function should also return boolean. boolean implement by ourselves since we cannot find matched built-in calcite one.
FILTER filter(value:ARRAY, function: LAMBDA) filter the element in the array by the lambda function. The function should return boolean array implement by ourselves since we cannot find matched built-in calcite one.
TRANSFORM transform(value:ARRAY, function: LAMBDA) transform the element of array one by one using lambda. Transform can accept one more argument like (x, i) -> x + i, where i is the index of element in array. array implement by ourselves since we cannot find matched built-in calcite one.
REDUCE reduce(value:ARRAY, base_value:ANY, acc_function: LAMBDA)/reduce(value:ARRAY, base_value:ANY, acc_function: LAMBDA, reduce_function:LAMBDA) The function will first use acc_function to go through all element and return value to the acc. Then apply reduce function to the acc if exists. The acc_function's lambda format is (acc,x) -> ..., the reduce_function format is (acc) -> ... ANY, according to the lambda function implement by ourselves since we cannot find matched built-in calcite one.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]
#3575

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

xinyual added 11 commits April 23, 2025 16:49
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
xinyual added 3 commits April 27, 2025 14:56
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
@LantaoJin LantaoJin added the calcite calcite migration releated label Apr 29, 2025
xinyual added 2 commits May 26, 2025 15:17
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
@@ -44,13 +47,16 @@ public class CalcitePlanContext {

private final Stack<RexCorrelVariable> correlVar = new Stack<>();

@Getter public Map<String, RexLambdaRef> temparolInputMap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is temparol? typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Typo error, already rename it.

* will map type for each lambda argument by the order of previous argument. Also, the function
* will add these variables to the context so they can pass visitQualifiedName
*/
private CalcitePlanContext prepareLambdaContext(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add UT

@@ -58,6 +58,14 @@ public enum BuiltinFunctionName {
TAN(FunctionName.of("tan")),
SPAN(FunctionName.of("span")),

/** Collection functions */
ARRAY(FunctionName.of("array")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add it.

Comment on lines 63 to 76
switch (targetType) {
case DOUBLE:
List<Object> unboxed =
IntStream.range(0, args.length - 1)
.mapToObj(i -> ((Number) args[i]).doubleValue())
.collect(Collectors.toList());

return unboxed;
case FLOAT:
List<Object> unboxedFloat =
IntStream.range(0, args.length - 1)
.mapToObj(i -> ((Number) args[i]).floatValue())
.collect(Collectors.toList());
return unboxedFloat;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain why this special logic needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to internally convert it. Otherwise, the calcite will directly cast like DOUBLE to INTEGER, which will raise exception.

import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class ArrayFunctionImpl extends ImplementorUDF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we reuse SqlLibraryOperators.ARRAY? Again, please add a reason in PR description for any new added function why it must implement by ourselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already update the implementation. Wrap the implementation of SqlLibraryOperators.ARRAY

import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class ExistsFunctionImpl extends ImplementorUDF {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we reuse SqlLibraryOperators.ARRAY_CONTAINS? please check all SqlLibraryOperators.ARRAY_* first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. All SqlLibraryOperators.ARRAY_* is for array related function which is not related to lambda. We use SqlLibraryOperators .array_length

xinyual added 5 commits May 30, 2025 10:55
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: xinyual <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
calcite calcite migration releated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants