-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Support limit pushdown in Loki connector. #25876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
package io.trino.plugin.loki; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.airlift.log.Logger; | ||
import io.github.jeschkies.loki.client.LokiClient; | ||
import io.github.jeschkies.loki.client.LokiClientException; | ||
import io.github.jeschkies.loki.client.model.Matrix; | ||
|
@@ -28,11 +29,14 @@ | |
|
||
import static com.google.common.collect.ImmutableList.toImmutableList; | ||
import static io.trino.plugin.loki.LokiErrorCode.LOKI_CLIENT_ERROR; | ||
import static java.lang.Math.toIntExact; | ||
import static java.util.Objects.requireNonNull; | ||
|
||
public class LokiRecordSet | ||
implements RecordSet | ||
{ | ||
private static final Logger log = Logger.get(LokiRecordSet.class); | ||
|
||
private final List<LokiColumnHandle> columnHandles; | ||
private final List<Type> columnTypes; | ||
|
||
|
@@ -49,7 +53,8 @@ public LokiRecordSet(LokiClient lokiClient, LokiSplit split, List<LokiColumnHand | |
|
||
// Execute the query | ||
try { | ||
this.result = lokiClient.rangeQuery(split.query(), split.start(), split.end(), split.step()); | ||
log.info("querying %s with limit %d", split.query(), split.limit()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to debug level and move before |
||
this.result = lokiClient.rangeQuery(split.query(), split.start(), split.end(), split.step(), toIntExact(split.limit())); | ||
} | ||
catch (LokiClientException e) { | ||
throw new TrinoException(LOKI_CLIENT_ERROR, e); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import java.time.ZoneOffset; | ||
import java.time.format.DateTimeFormatter; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static java.lang.String.format; | ||
|
||
|
@@ -81,6 +82,42 @@ | |
"VALUES ('line 1')"); | ||
} | ||
|
||
@Test | ||
void testLimitedLogsQuery() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please verify LIMIT pushdown is exactly happened. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this is possible other than inferring that it was passed down. We'd have to grab the Loki logs or mock the client. How do you verify it in other database connectors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can refer to tests using |
||
throws Exception | ||
{ | ||
Instant start = Instant.now().minus(Duration.ofHours(3)); | ||
Instant end = start.plus(Duration.ofHours(2)); | ||
|
||
// Loki has a default of 100. Setting it to 120 verifies that the limit is propagated. | ||
long limit = 120; | ||
long numberOfRows = 150; | ||
|
||
for (int i = 0; i < numberOfRows; i++) { | ||
client.pushLogLine("line " + i, end.minus(Duration.ofSeconds(i)), ImmutableMap.of("test", "limited_logs_query")); | ||
} | ||
client.flush(); | ||
|
||
// We expect the last 120 lines | ||
StringBuilder expected = new StringBuilder("VALUES "); | ||
for (long i = numberOfRows - limit; i < numberOfRows; i++) { | ||
expected.append("('line ").append(i).append("')"); | ||
if (i < numberOfRows - 1) { | ||
expected.append(", "); | ||
} | ||
} | ||
assertQueryEventually(getSession(), format(""" | ||
SELECT value FROM | ||
TABLE(system.query_range( | ||
'{test="limited_logs_query"}', | ||
TIMESTAMP '%s', | ||
TIMESTAMP '%s' | ||
)) | ||
LIMIT %d | ||
""", timestampFormatter.format(start), timestampFormatter.format(end), limit), | ||
expected.toString(), new io.airlift.units.Duration(30, TimeUnit.SECONDS)); | ||
} | ||
|
||
@Test | ||
void testMetricsQuery() | ||
throws Exception | ||
|
@@ -92,7 +129,7 @@ | |
client.pushLogLine("line 2", end.minus(Duration.ofMinutes(2)), ImmutableMap.of("test", "metrics_query")); | ||
client.pushLogLine("line 3", end.minus(Duration.ofMinutes(1)), ImmutableMap.of("test", "metrics_query")); | ||
client.flush(); | ||
assertQuery(format(""" | ||
Check failure on line 132 in plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java
|
||
SELECT value FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="metrics_query"}[5m])', | ||
|
@@ -115,7 +152,7 @@ | |
client.pushLogLine("line 2", end.minus(Duration.ofMinutes(2)), ImmutableMap.of("test", "labels")); | ||
client.pushLogLine("line 3", end.minus(Duration.ofMinutes(1)), ImmutableMap.of("test", "labels")); | ||
client.flush(); | ||
assertQuery(format(""" | ||
Check failure on line 155 in plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java
|
||
SELECT labels['test'] FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="labels"}[5m])', | ||
|
@@ -189,7 +226,7 @@ | |
this.client.pushLogLine("line 2", start.plus(Duration.ofHours(2)), ImmutableMap.of("test", "timestamp_metrics_query")); | ||
this.client.pushLogLine("line 3", start.plus(Duration.ofHours(3)), ImmutableMap.of("test", "timestamp_metrics_query")); | ||
this.client.flush(); | ||
assertQuery(format(""" | ||
Check failure on line 229 in plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java
|
||
SELECT to_iso8601(timestamp), value FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="timestamp_metrics_query"}[5m])', | ||
|
@@ -211,29 +248,27 @@ | |
@Test | ||
void testQueryRangeInvalidArguments() | ||
{ | ||
assertQueryFails( | ||
""" | ||
SELECT to_iso8601(timestamp), value FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="timestamp_metrics_query"}[5m])', | ||
TIMESTAMP '2012-08-08', | ||
TIMESTAMP '2012-08-09', | ||
-300 | ||
)) | ||
LIMIT 1 | ||
""", | ||
assertQueryFails(""" | ||
SELECT to_iso8601(timestamp), value FROM | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert unrelated changes. |
||
TABLE(system.query_range( | ||
'count_over_time({test="timestamp_metrics_query"}[5m])', | ||
TIMESTAMP '2012-08-08', | ||
TIMESTAMP '2012-08-09', | ||
-300 | ||
)) | ||
LIMIT 1 | ||
""", | ||
"step must be positive"); | ||
assertQueryFails( | ||
""" | ||
SELECT to_iso8601(timestamp), value FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="timestamp_metrics_query"}[5m])', | ||
TIMESTAMP '2012-08-08', | ||
TIMESTAMP '2012-08-09', | ||
NULL | ||
)) | ||
LIMIT 1 | ||
""", | ||
assertQueryFails(""" | ||
SELECT to_iso8601(timestamp), value FROM | ||
TABLE(system.query_range( | ||
'count_over_time({test="timestamp_metrics_query"}[5m])', | ||
TIMESTAMP '2012-08-08', | ||
TIMESTAMP '2012-08-09', | ||
NULL | ||
)) | ||
LIMIT 1 | ||
""", | ||
"step must be positive"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this bump relate to LIMIT pushdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client did not support passing the limit as URL parameter. We need 0.0.5 to pass it.