Skip to content

Add earliest and latest in aggregation and window function #3640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ static RelBuilder.AggCall makeAggCall(
RexNode field,
List<RexNode> argList) {
switch (functionName) {
case MAX:
case MAX, LATEST:
return context.relBuilder.max(field);
case MIN:
case MIN, EARLIEST:
return context.relBuilder.min(field);
case AVG:
return context.relBuilder.avg(distinct, null, field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public enum BuiltinFunctionName {
TAKE(FunctionName.of("take")),
// t-digest percentile which is used in OpenSearch core by default.
PERCENTILE_APPROX(FunctionName.of("percentile_approx")),
EARLIEST(FunctionName.of("earliest")),
LATEST(FunctionName.of("latest")),
// Not always an aggregation query
NESTED(FunctionName.of("nested")),

Expand Down Expand Up @@ -315,6 +317,8 @@ public enum BuiltinFunctionName {
.put("take", BuiltinFunctionName.TAKE)
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.put("earliest", BuiltinFunctionName.EARLIEST)
.put("latest", BuiltinFunctionName.LATEST)
.build();

private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =
Expand All @@ -331,6 +335,8 @@ public enum BuiltinFunctionName {
.put("stddev", BuiltinFunctionName.STDDEV_POP)
.put("stddev_pop", BuiltinFunctionName.STDDEV_POP)
.put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP)
.put("earliest", BuiltinFunctionName.EARLIEST)
.put("latest", BuiltinFunctionName.LATEST)
.build();

public static Optional<BuiltinFunctionName> of(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.expression.datetime.DateTimeFunctions;
import org.opensearch.sql.expression.function.udf.CryptographicFunction;
import org.opensearch.sql.expression.function.udf.condition.EarliestFunction;
import org.opensearch.sql.expression.function.udf.condition.LatestFunction;
import org.opensearch.sql.expression.function.udf.datetime.AddSubDateFunction;
import org.opensearch.sql.expression.function.udf.datetime.CurrentFunction;
import org.opensearch.sql.expression.function.udf.datetime.DateAddSubFunction;
Expand Down Expand Up @@ -67,6 +69,10 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
public static final SqlOperator SHA2 = CryptographicFunction.sha2().toUDF("SHA2");
public static final SqlOperator CIDRMATCH = new CidrMatchFunction().toUDF("CIDRMATCH");

// Condition function
public static final SqlOperator EARLIEST = new EarliestFunction().toUDF("EARLIEST");
public static final SqlOperator LATEST = new LatestFunction().toUDF("LATEST");

// Datetime function
public static final SqlOperator TIMESTAMP = new TimestampFunction().toUDF("TIMESTAMP");
public static final SqlOperator DATE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ void populate() {
registerOperator(IS_NULL, SqlStdOperatorTable.IS_NULL);
registerOperator(IF, SqlStdOperatorTable.CASE);
registerOperator(IFNULL, SqlStdOperatorTable.COALESCE);
registerOperator(EARLIEST, PPLBuiltinOperators.EARLIEST);
registerOperator(LATEST, PPLBuiltinOperators.LATEST);
registerOperator(COALESCE, SqlStdOperatorTable.COALESCE);

// Register library operator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.udf.condition;

import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.prependFunctionProperties;
import static org.opensearch.sql.utils.DateTimeUtils.getRelativeZonedDateTime;

import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.function.FunctionProperties;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class EarliestFunction extends ImplementorUDF {
public EarliestFunction() {
super(new EarliestImplementor(), NullPolicy.ANY);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.BOOLEAN;
}

public static class EarliestImplementor implements NotNullImplementor {
@Override
public Expression implement(
RexToLixTranslator rexToLixTranslator, RexCall rexCall, List<Expression> list) {
List<RelDataType> types = rexCall.getOperands().stream().map(RexNode::getType).toList();
return Expressions.call(
EarliestFunction.class,
"earliest",
prependFunctionProperties(
UserDefinedFunctionUtils.convertToExprValues(list, types), rexToLixTranslator));
}
}

public static Boolean earliest(Object... inputs) {
String expression = ((ExprValue) inputs[1]).stringValue();
Instant candidate = ((ExprValue) inputs[2]).timestampValue();
FunctionProperties functionProperties = (FunctionProperties) inputs[0];
Clock clock = functionProperties.getQueryStartClock();
ZonedDateTime candidateDatetime = ZonedDateTime.ofInstant(candidate, clock.getZone());
ZonedDateTime earliest =
getRelativeZonedDateTime(
expression, ZonedDateTime.ofInstant(clock.instant(), clock.getZone()));
return earliest.isBefore(candidateDatetime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.udf.condition;

import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.prependFunctionProperties;
import static org.opensearch.sql.utils.DateTimeUtils.getRelativeZonedDateTime;

import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.function.FunctionProperties;
import org.opensearch.sql.expression.function.ImplementorUDF;

public class LatestFunction extends ImplementorUDF {
public LatestFunction() {
super(new LatestFunction.LatestImplementor(), NullPolicy.ANY);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.BOOLEAN;
}

public static class LatestImplementor implements NotNullImplementor {
@Override
public Expression implement(
RexToLixTranslator rexToLixTranslator, RexCall rexCall, List<Expression> list) {
List<RelDataType> types = rexCall.getOperands().stream().map(RexNode::getType).toList();
return Expressions.call(
LatestFunction.class,
"latest",
prependFunctionProperties(
UserDefinedFunctionUtils.convertToExprValues(list, types), rexToLixTranslator));
}
}

public static Boolean latest(Object... inputs) {
String expression = ((ExprValue) inputs[1]).stringValue();
Instant candidate = ((ExprValue) inputs[2]).timestampValue();
FunctionProperties functionProperties = (FunctionProperties) inputs[0];
Clock clock = functionProperties.getQueryStartClock();
ZonedDateTime candidateDatetime = ZonedDateTime.ofInstant(candidate, clock.getZone());
ZonedDateTime latest =
getRelativeZonedDateTime(
expression, ZonedDateTime.ofInstant(clock.instant(), clock.getZone()));
return latest.isAfter(candidateDatetime);
}
}
107 changes: 107 additions & 0 deletions core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.util.regex.Pattern;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.data.model.ExprTimeValue;
import org.opensearch.sql.data.model.ExprValue;
Expand All @@ -19,6 +23,10 @@
@UtilityClass
public class DateTimeUtils {

private static final Pattern OFFSET_PATTERN = Pattern.compile("([+-])(\\d+)([smhdwMy]?)");
private static final DateTimeFormatter DIRECT_FORMATTER =
DateTimeFormatter.ofPattern("MM/dd/yyyy:HH:mm:ss");

/**
* Util method to round the date/time with given unit.
*
Expand Down Expand Up @@ -151,4 +159,103 @@ public static LocalDate extractDate(ExprValue value, FunctionProperties function
? ((ExprTimeValue) value).dateValue(functionProperties)
: value.dateValue();
}

public static ZonedDateTime getRelativeZonedDateTime(String input, ZonedDateTime baseTime) {
try {
Instant localDateTime =
LocalDateTime.parse(input, DIRECT_FORMATTER).toInstant(ZoneOffset.UTC);
return localDateTime.atZone(baseTime.getZone());
} catch (DateTimeParseException ignored) {
}

if ("now".equalsIgnoreCase(input) || "now()".equalsIgnoreCase(input)) {
return baseTime;
}

ZonedDateTime result = baseTime;
int i = 0;
while (i < input.length()) {
char c = input.charAt(i);
if (c == '@') {
// parse snap
int j = i + 1;
while (j < input.length() && Character.isLetter(input.charAt(j))) {
j++;
}
String snapUnit = input.substring(i + 1, j);
result = applySnap(result, snapUnit);
i = j;
} else if (c == '+' || c == '-') {
// parse offset
int j = i + 1;
while (j < input.length() && Character.isDigit(input.charAt(j))) {
j++;
}
int value = Integer.parseInt(input.substring(i + 1, j));
// optional unit
int k = j;
while (k < input.length() && Character.isLetter(input.charAt(k))) {
k++;
}
String unit = input.substring(j, k);
if (unit.isEmpty()) {
unit = "s"; // default to seconds
}
result = applyOffset(result, String.valueOf(c), value, unit);
i = k;
} else {
throw new IllegalArgumentException("Wrong relative time expression: " + input);
}
}

return result;
}

private static ZonedDateTime applyOffset(
ZonedDateTime base, String sign, int value, String unit) {
ChronoUnit chronoUnit = parseUnit(unit);
return sign.equals("-") ? base.minus(value, chronoUnit) : base.plus(value, chronoUnit);
}

private static ZonedDateTime applySnap(ZonedDateTime base, String unit) {
switch (unit) {
case "s":
return base.truncatedTo(ChronoUnit.SECONDS);
case "m":
return base.truncatedTo(ChronoUnit.MINUTES);
case "h":
return base.truncatedTo(ChronoUnit.HOURS);
case "d":
return base.truncatedTo(ChronoUnit.DAYS);
case "w":
return base.minusDays((base.getDayOfWeek().getValue() % 7)).truncatedTo(ChronoUnit.DAYS);
case "M":
return base.withDayOfMonth(1).truncatedTo(ChronoUnit.DAYS);
case "y":
return base.withDayOfYear(1).truncatedTo(ChronoUnit.DAYS);
default:
throw new IllegalArgumentException("Unsupported snap unit: " + unit);
}
}

private static ChronoUnit parseUnit(String unit) {
switch (unit) {
case "s":
return ChronoUnit.SECONDS;
case "m":
return ChronoUnit.MINUTES;
case "h":
return ChronoUnit.HOURS;
case "d":
return ChronoUnit.DAYS;
case "w":
return ChronoUnit.WEEKS;
case "M":
return ChronoUnit.MONTHS;
case "y":
return ChronoUnit.YEARS;
default:
throw new IllegalArgumentException("Unsupported time unit: " + unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
package org.opensearch.sql.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.sql.utils.DateTimeUtils.getRelativeZonedDateTime;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

Expand All @@ -29,4 +33,50 @@ void round() {
.toEpochMilli(),
Instant.ofEpochMilli(rounded).toEpochMilli());
}

@Test
void testRelativeZonedDateTimeWithNow() {
ZonedDateTime now = ZonedDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());
assertEquals(getRelativeZonedDateTime("now", now), now);
assertEquals(getRelativeZonedDateTime("now()", now), now);
}

@Test
void testRelativeZonedDateTimeWithSnap() {
String dateTimeString = "2025-10-22 10:32:12";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

LocalDateTime localDateTime = LocalDateTime.parse(dateTimeString, formatter);
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.systemDefault());
ZonedDateTime snap1 = getRelativeZonedDateTime("-1d@d", zonedDateTime);
ZonedDateTime snap2 = getRelativeZonedDateTime("-3d-2h@h", zonedDateTime);
assertEquals(snap1.toLocalDateTime().toString(), "2025-10-21T00:00");
assertEquals(snap2.toLocalDateTime().toString(), "2025-10-19T08:00");
}

@Test
void testRelativeZonedDateTimeWithOffset() {
String dateTimeString = "2025-10-22 10:32:12";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

LocalDateTime localDateTime = LocalDateTime.parse(dateTimeString, formatter);
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.systemDefault());
ZonedDateTime snap1 = getRelativeZonedDateTime("-1d+1y@M", zonedDateTime);
ZonedDateTime snap2 = getRelativeZonedDateTime("-3d@d-2h+10m@h", zonedDateTime);
assertEquals(snap1.toLocalDateTime().toString(), "2026-10-01T00:00");
assertEquals(snap2.toLocalDateTime().toString(), "2025-10-18T22:00");
}

@Test
void testRelativeZonedDateTimeWithWrongInput() {
String dateTimeString = "2025-10-22 10:32:12";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

LocalDateTime localDateTime = LocalDateTime.parse(dateTimeString, formatter);
ZonedDateTime zonedDateTime = localDateTime.atZone(ZoneId.systemDefault());
IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class, () -> getRelativeZonedDateTime("1d+1y", zonedDateTime));
assertEquals(e.getMessage(), "Wrong relative time expression: 1d+1y");
}
}
Loading
Loading