Skip to content

Commit

Permalink
Allow fetching counts for date ranges spanning one day (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
lbschanno committed Mar 6, 2024
1 parent d56dbeb commit 0824633
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 82 deletions.
49 changes: 33 additions & 16 deletions src/main/java/datawave/query/util/MetadataHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -1194,37 +1194,54 @@ public Long getCountsByFieldForDays(String fieldName, Date begin, Date end) {
return getCountsByFieldForDays(fieldName, begin, end, UniversalSet.instance());
}

public Long getCountsByFieldForDays(String fieldName, Date begin, Date end, Set<String> ingestTypeFilter) {
/**
* Returns the sum of all counts for the given fields and datatypes from the start date to the end date.
*
* @param fieldName
* the field name to filter on
* @param begin
* the begin date
* @param end
* the end date
* @param dataTypes
* the datatypes to filter on
* @return the total counts
*/
public Long getCountsByFieldForDays(String fieldName, Date begin, Date end, Set<String> dataTypes) {
Preconditions.checkNotNull(fieldName);
Preconditions.checkNotNull(begin);
Preconditions.checkNotNull(end);
Preconditions.checkArgument(begin.before(end));
Preconditions.checkNotNull(ingestTypeFilter);
Preconditions.checkArgument((begin.before(end) || begin.getTime() == end.getTime()));
Preconditions.checkNotNull(dataTypes);

Date truncatedBegin = DateUtils.truncate(begin, Calendar.DATE);
Date truncatedEnd = DateUtils.truncate(end, Calendar.DATE);

if (truncatedEnd.getTime() != end.getTime()) {
// If we don't have the same time for both, we actually truncated
// the end,
// and, as such, we want to bump out the date range to include the
// end
// If we don't have the same time for both, we actually truncated the end, and, as such, we want to bump out the date range to include the end.
truncatedEnd = new Date(truncatedEnd.getTime() + 86400000);
}

Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(truncatedBegin);

long sum = 0l;
while (cal.getTime().before(truncatedEnd)) {
Date curDate = cal.getTime();
String desiredDate = DateHelper.format(curDate);

sum += getCountsByFieldInDayWithTypes(fieldName, desiredDate, ingestTypeFilter);
cal.add(Calendar.DATE, 1);
// If the start and end date are the same, return the count for just the start date.
// TODO - Verify if this is the correct behavior, i.e. treating the end date as inclusive vs. not. It should probably match query date range behavior.
if (truncatedBegin.getTime() == truncatedEnd.getTime()) {
String desiredDate = DateHelper.format(cal.getTime());
return getCountsByFieldInDayWithTypes(fieldName, desiredDate, dataTypes);
} else {
// Otherwise, sum up the counts across the given date range.
long sum = 0L;
while (cal.getTime().before(truncatedEnd)) {
Date curDate = cal.getTime();
String desiredDate = DateHelper.format(curDate);

sum += getCountsByFieldInDayWithTypes(fieldName, desiredDate, dataTypes);
cal.add(Calendar.DATE, 1);
}
return sum;
}

return sum;
}

/**
Expand Down
131 changes: 65 additions & 66 deletions src/test/java/datawave/query/util/MetadataHelperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloClient;
Expand All @@ -16,8 +21,11 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -30,92 +38,83 @@ public class MetadataHelperTest {

private static final String TABLE_METADATA = "testMetadataTable";
private static final String[] AUTHS = {"FOO"};
private static AccumuloClient accumuloClient;
private static MetadataHelper mdh;
private AccumuloClient accumuloClient;
private MetadataHelper helper;

private static AllFieldMetadataHelper createAllFieldMetadataHelper() {
final Set<Authorizations> allMetadataAuths = Collections.emptySet();
final Set<Authorizations> auths = Collections.singleton(new Authorizations(AUTHS));
TypeMetadataHelper tmh = new TypeMetadataHelper(Maps.newHashMap(), allMetadataAuths, accumuloClient, TABLE_METADATA, auths, false);
CompositeMetadataHelper cmh = new CompositeMetadataHelper(accumuloClient, TABLE_METADATA, auths);

return new AllFieldMetadataHelper(tmh, cmh, accumuloClient, TABLE_METADATA, auths, allMetadataAuths);
}

private static void addFields(Mutation m) throws TableNotFoundException {
BatchWriterConfig config = new BatchWriterConfig();
config.setMaxMemory(0);
try (BatchWriter writer = accumuloClient.createBatchWriter(TABLE_METADATA, config)) {
writer.addMutation(m);
writer.flush();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}

private static void clearTable() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
accumuloClient.tableOperations().deleteRows(TABLE_METADATA, null, null);
@BeforeAll
static void beforeAll() throws URISyntaxException {
File dir = new File(Objects.requireNonNull(ClassLoader.getSystemClassLoader().getResource(".")).toURI());
File targetDir = dir.getParentFile();
System.setProperty("hadoop.home.dir", targetDir.getAbsolutePath());
}

@Before
@BeforeEach
public void setup() throws TableNotFoundException, AccumuloException, TableExistsException, AccumuloSecurityException {
accumuloClient = new InMemoryAccumuloClient("root", new InMemoryInstance(MetadataHelperTest.class.toString()));
if (!accumuloClient.tableOperations().exists(TABLE_METADATA)) {
accumuloClient.tableOperations().create(TABLE_METADATA);
}
mdh = new MetadataHelper(createAllFieldMetadataHelper(), Collections.emptySet(), accumuloClient, TABLE_METADATA, Collections.emptySet(),
helper = new MetadataHelper(createAllFieldMetadataHelper(), Collections.emptySet(), accumuloClient, TABLE_METADATA, Collections.emptySet(),
Collections.emptySet());
}

private AllFieldMetadataHelper createAllFieldMetadataHelper() {
final Set<Authorizations> allMetadataAuths = Collections.emptySet();
final Set<Authorizations> auths = Collections.singleton(new Authorizations(AUTHS));
TypeMetadataHelper tmh = new TypeMetadataHelper(Maps.newHashMap(), allMetadataAuths, accumuloClient, TABLE_METADATA, auths, false);
CompositeMetadataHelper cmh = new CompositeMetadataHelper(accumuloClient, TABLE_METADATA, auths);
return new AllFieldMetadataHelper(tmh, cmh, accumuloClient, TABLE_METADATA, auths, allMetadataAuths);
}

@AfterEach
void tearDown() throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
accumuloClient.tableOperations().delete(TABLE_METADATA);
}

@Test
public void testSingleFieldFilter() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
clearTable();
Mutation m = new Mutation("rowA");
m.put("t", "dataTypeA", new Value("value"));
addFields(m);
public void testSingleFieldFilter() throws TableNotFoundException {
writeMutation("rowA", "t", "dataTypeA", new Value("value"));

testFilter(Collections.singleton("rowA"), mdh.getAllFields(Collections.singleton("dataTypeA")));
testFilter(Collections.singleton("rowA"), mdh.getAllFields(null));
testFilter(Collections.EMPTY_SET, mdh.getAllFields(Collections.EMPTY_SET));
Assertions.assertEquals(Collections.singleton("rowA"), helper.getAllFields(Collections.singleton("dataTypeA")));
Assertions.assertEquals(Collections.singleton("rowA"), helper.getAllFields(null));
Assertions.assertEquals(Collections.singleton("rowA"), helper.getAllFields(Collections.emptySet()));
}

@Test
public void testMultipleFieldFilter() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
clearTable();
Mutation m1 = new Mutation("rowA");
m1.put("t", "dataTypeA", new Value("value"));
addFields(m1);

Mutation m2 = new Mutation("rowB");
m2.put("t", "dataTypeB", new Value("value"));
addFields(m2);
public void testMultipleFieldFilter() throws TableNotFoundException {
writeMutation("rowA", "t", "dataTypeA", new Value("value"));
writeMutation("rowB", "t", "dataTypeB", new Value("value"));

testFilter(Collections.singleton("rowB"), mdh.getAllFields(Collections.singleton("dataTypeB")));
testFilter(Sets.newHashSet("rowA", "rowB"), mdh.getAllFields(null));
testFilter(Collections.EMPTY_SET, mdh.getAllFields(Collections.EMPTY_SET));
Assertions.assertEquals(Collections.singleton("rowB"), helper.getAllFields(Collections.singleton("dataTypeB")));
Assertions.assertEquals(Sets.newHashSet("rowA", "rowB"), helper.getAllFields(null));
Assertions.assertEquals(Sets.newHashSet("rowA", "rowB"), helper.getAllFields(Collections.emptySet()));
}

@Test
public void testMultipleFieldFilter2() throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
clearTable();
Mutation m1 = new Mutation("rowA");
m1.put("t", "dataTypeA", new Value("value"));
addFields(m1);

Mutation m2 = new Mutation("rowA");
m2.put("t", "dataTypeB", new Value("value"));
addFields(m2);

Mutation m3 = new Mutation("rowB");
m3.put("t", "dataTypeC", new Value("value"));
addFields(m3);
public void testMultipleFieldFilter2() throws TableNotFoundException {
writeMutation("rowA", "t", "dataTypeA", new Value("value"));
writeMutation("rowB", "t", "dataTypeB", new Value("value"));
writeMutation("rowC", "t", "dataTypeC", new Value("value"));

testFilter(Collections.singleton("rowA"), mdh.getAllFields(Collections.singleton("dataTypeB")));
testFilter(Sets.newHashSet("rowA", "rowB"), mdh.getAllFields(null));
testFilter(Collections.EMPTY_SET, mdh.getAllFields(Collections.EMPTY_SET));
Assertions.assertEquals(Collections.singleton("rowB"), helper.getAllFields(Collections.singleton("dataTypeB")));
Assertions.assertEquals(Sets.newHashSet("rowA", "rowB", "rowC"), helper.getAllFields(null));
Assertions.assertEquals(Sets.newHashSet("rowA", "rowB", "rowC"), helper.getAllFields(Collections.emptySet()));
}

private void testFilter(Set<String> expected, Set<String> actual) throws TableNotFoundException {
assertEquals(expected, actual);
private void writeMutation(String row, String columnFamily, String columnQualifier, Value value) throws TableNotFoundException {
Mutation mutation = new Mutation(row);
mutation.put(columnFamily, columnQualifier, value);
writeMutation(mutation);
}

private void writeMutation(Mutation m) throws TableNotFoundException {
BatchWriterConfig config = new BatchWriterConfig();
config.setMaxMemory(0);
try (BatchWriter writer = accumuloClient.createBatchWriter(TABLE_METADATA, config)) {
writer.addMutation(m);
writer.flush();
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit 0824633

Please sign in to comment.