Skip to content

Commit

Permalink
Merge pull request #189 from thingsboard/agg-fix
Browse files Browse the repository at this point in the history
Use sort order in aggregation queries
  • Loading branch information
dmytro-landiak authored Dec 11, 2024
2 parents f2c3f56 + 79d3046 commit 687f983
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public DeferredResult<ResponseEntity> getTimeseries(
@RequestParam(name = "interval", defaultValue = "0") Long interval,
@RequestParam(name = "limit", defaultValue = "100") Integer limit,
@RequestParam(name = "agg", defaultValue = "NONE") String aggStr,
@RequestParam(name = "orderBy", defaultValue = "DESC") String orderBy,
@RequestParam(name = "orderBy", defaultValue = BrokerConstants.DESC_ORDER) String orderBy,
@RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "true") Boolean useStrictDataTypes) throws ThingsboardException {
try {
checkParameter(ENTITY_ID, entityId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class BrokerConstants {
public static final String WS = "WS";
public static final String WSS = "WSS";

public static final String ASC_ORDER = "ASC";
public static final String DESC_ORDER = "DESC";

public static final String ENTITY_ID_TOTAL = "total";
public static final String INCOMING_MSGS = "incomingMsgs";
public static final String OUTGOING_MSGS = "outgoingMsgs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;

import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.DESC_ORDER;

@Data
@EqualsAndHashCode(callSuper = true)
public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
Expand All @@ -28,7 +30,7 @@ public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
private final String order;

public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
this(key, startTs, endTs, interval, limit, aggregation, "DESC");
this(key, startTs, endTs, interval, limit, aggregation, DESC_ORDER);
}

public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String order) {
Expand All @@ -40,7 +42,7 @@ public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, in
}

public BaseReadTsKvQuery(String key, long startTs, long endTs) {
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, DESC_ORDER);
}

public BaseReadTsKvQuery(String key, long startTs, long endTs, int limit, String order) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityId, ReadTsKvQ
futures.add(aggregateTsKvEntry);
startPeriod = endTs;
}
return getTsKvEntriesFuture(Futures.allAsList(futures));
return getTsKvEntriesFuture(Futures.allAsList(futures), query.getOrder());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,49 @@
*/
package org.thingsboard.mqtt.broker.dao.sqlts;

import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.kv.TsKvEntry;
import org.thingsboard.mqtt.broker.dao.DaoUtil;
import org.thingsboard.mqtt.broker.dao.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.mqtt.broker.dao.model.sqlts.AbstractTsKvEntity;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Slf4j
public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeningExecutorService {

protected ListenableFuture<List<TsKvEntry>> getTsKvEntriesFuture(ListenableFuture<List<Optional<? extends AbstractTsKvEntity>>> future) {
return Futures.transform(future, new Function<>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<Optional<? extends AbstractTsKvEntity>> results) {
if (results == null || results.isEmpty()) {
return null;
}
List<? extends AbstractTsKvEntity> data = results.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
return DaoUtil.convertDataList(data);
protected ListenableFuture<List<TsKvEntry>> getTsKvEntriesFuture(ListenableFuture<List<Optional<? extends AbstractTsKvEntity>>> future,
String order) {
return Futures.transform(future, results -> {
if (CollectionUtils.isEmpty(results)) {
return null;
}
return DaoUtil.convertDataList(collectWithOrder(results, order));
}, service);
}

private List<? extends AbstractTsKvEntity> collectWithOrder(List<Optional<? extends AbstractTsKvEntity>> results,
String order) {
List<? extends AbstractTsKvEntity> data = collectData(results);
if (BrokerConstants.DESC_ORDER.equals(order)) {
Collections.reverse(data);
}
return data;
}

private List<? extends AbstractTsKvEntity> collectData(List<Optional<? extends AbstractTsKvEntity>> results) {
return results
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@
import java.util.function.Function;

import static java.time.ZoneOffset.UTC;
import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.DESC_ORDER;

@Slf4j
@Component
public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {

private static final String DESC_ORDER = "DESC";

@Autowired
private TsKvLatestRepository tsKvLatestRepository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.ASC_ORDER;
import static org.thingsboard.mqtt.broker.common.data.BrokerConstants.DESC_ORDER;

@Slf4j
@DaoSqlTest
Expand All @@ -51,7 +53,6 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
private final String LONG_KEY = "incomingMsgs";

private final long TS = 42L;
private final String DESC_ORDER = "DESC";

KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);

Expand Down Expand Up @@ -96,7 +97,7 @@ public void testFindByQueryAscOrder() throws Exception {
saveEntries(entityId, TS - 1);

List<ReadTsKvQuery> queries = new ArrayList<>();
queries.add(new BaseReadTsKvQuery(LONG_KEY, TS - 3, TS, 0, 1000, Aggregation.NONE, "ASC"));
queries.add(new BaseReadTsKvQuery(LONG_KEY, TS - 3, TS, 0, 1000, Aggregation.NONE, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(3, entries.size());
Expand All @@ -114,7 +115,7 @@ public void testFindByQueryDescOrder() throws Exception {
saveEntries(entityId, TS - 1);

List<ReadTsKvQuery> queries = new ArrayList<>();
queries.add(new BaseReadTsKvQuery(LONG_KEY, TS - 3, TS, 0, 1000, Aggregation.NONE, "DESC"));
queries.add(new BaseReadTsKvQuery(LONG_KEY, TS - 3, TS, 0, 1000, Aggregation.NONE, DESC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(3, entries.size());
Expand All @@ -130,7 +131,7 @@ public void testFindByQuery_whenPeriodEqualsOneMillisecondPeriod() throws Except
saveEntries(entityId, TS);
saveEntries(entityId, TS + 1L);

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS, 1, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS, 1, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(1, entries.size());
Expand All @@ -146,7 +147,7 @@ public void testFindByQuery_whenPeriodEqualsInterval() throws Exception {
}
saveEntries(entityId, TS + 100L + 1L);

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 100, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 100, 100, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(1, entries.size());
Expand All @@ -162,7 +163,7 @@ public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength() throws E
}
saveEntries(entityId, TS + 100000L + 1L);

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 99999, 50000, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 99999, 50000, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(2, entries.size());
Expand All @@ -179,7 +180,7 @@ public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirs
}
saveEntries(entityId, TS + 80000L + 1L);

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 80000, 50000, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 80000, 50000, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(2, entries.size());
Expand All @@ -194,7 +195,7 @@ public void testFindByQuery_whenPeriodHaveTwoIntervalWithEqualsLength_whereNotAl
saveEntries(entityId, i);
}

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 99999, 50000, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 99999, 50000, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(2, entries.size());
Expand All @@ -209,7 +210,7 @@ public void testFindByQuery_whenPeriodHaveTwoInterval_whereSecondShorterThanFirs
saveEntries(entityId, i);
}

List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 80000, 50000, 1, Aggregation.COUNT, DESC_ORDER));
List<ReadTsKvQuery> queries = List.of(new BaseReadTsKvQuery(LONG_KEY, TS, TS + 80000, 50000, 1, Aggregation.COUNT, ASC_ORDER));

List<TsKvEntry> entries = tsService.findAll(entityId, queries).get();
Assert.assertEquals(2, entries.size());
Expand Down Expand Up @@ -263,7 +264,7 @@ public void testFindDeviceTsData() throws Exception {
assertEquals(Optional.of(400L), list.get(2).getLongValue());

list = tsService.findAll(entityId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.AVG))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
60000, 20000, 3, Aggregation.AVG, ASC_ORDER))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
assertEquals(Optional.of(150.0), list.get(0).getDoubleValue());
Expand All @@ -275,7 +276,7 @@ public void testFindDeviceTsData() throws Exception {
assertEquals(Optional.of(550.0), list.get(2).getDoubleValue());

list = tsService.findAll(entityId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.SUM))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
60000, 20000, 3, Aggregation.SUM, ASC_ORDER))).get(MAX_TIMEOUT, TimeUnit.SECONDS);

assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
Expand All @@ -288,7 +289,7 @@ public void testFindDeviceTsData() throws Exception {
assertEquals(Optional.of(1100L), list.get(2).getLongValue());

list = tsService.findAll(entityId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.MIN))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
60000, 20000, 3, Aggregation.MIN, ASC_ORDER))).get(MAX_TIMEOUT, TimeUnit.SECONDS);

assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
Expand All @@ -301,7 +302,7 @@ public void testFindDeviceTsData() throws Exception {
assertEquals(Optional.of(500L), list.get(2).getLongValue());

list = tsService.findAll(entityId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.MAX))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
60000, 20000, 3, Aggregation.MAX, ASC_ORDER))).get(MAX_TIMEOUT, TimeUnit.SECONDS);

assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
Expand All @@ -314,7 +315,7 @@ public void testFindDeviceTsData() throws Exception {
assertEquals(Optional.of(600L), list.get(2).getLongValue());

list = tsService.findAll(entityId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
60000, 20000, 3, Aggregation.COUNT))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
60000, 20000, 3, Aggregation.COUNT, ASC_ORDER))).get(MAX_TIMEOUT, TimeUnit.SECONDS);

assertEquals(3, list.size());
assertEquals(10000, list.get(0).getTs());
Expand Down

0 comments on commit 687f983

Please sign in to comment.