Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FederatedQueryPlanner #2216

Open
wants to merge 64 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
cfefd87
Enrich 'i' and 'ri' rows in metadata table with event date
lbschanno Sep 7, 2023
0e4f806
Merge branch 'integration' into task/datedIndexMetadata
ivakegg Sep 29, 2023
f2aa20b
Merge branch 'integration' into task/datedIndexMetadata
ivakegg Oct 4, 2023
ab9ee4c
Merge branch 'integration' into task/datedIndexMetadata
ivakegg Oct 17, 2023
43aeae8
Merge branch 'integration' into task/datedIndexMetadata
lbschanno Nov 7, 2023
83014c1
Merge branch 'integration' into task/datedIndexMetadata
ivakegg Nov 24, 2023
ab14fe2
Add counts to 'i' and 'ri' rows
lbschanno Dec 19, 2023
9aad9f6
Merge branch 'integration' into task/datedIndexMetadata
lbschanno Jan 10, 2024
da6ee69
Initial federated query planner implementation
lbschanno Oct 25, 2023
b40201b
code formatting
lbschanno Jan 12, 2024
8ca62b3
Fixed issues with FederatedQueryIterable
lbschanno Jan 13, 2024
59d3be9
Fix test failures
lbschanno Jan 13, 2024
bde374d
Fix failing tests
lbschanno Jan 13, 2024
461a526
Additional test fixes
lbschanno Jan 13, 2024
7784b4c
pr feedback
lbschanno Jan 18, 2024
b288196
Use new MetadataHelper function version
lbschanno Jan 23, 2024
c34a543
Extract fields to filter index holes
lbschanno Jan 24, 2024
e0ef160
Correct logic for determining sub date ranges
lbschanno Jan 24, 2024
9ea3a84
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Jan 24, 2024
7052bcb
Remove unnecessary check
lbschanno Jan 24, 2024
d44e0f0
code formatting
lbschanno Jan 24, 2024
fca2fff
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Jan 27, 2024
e341c72
Add check for null query model
lbschanno Jan 27, 2024
8779a99
Limit config arg to function scope
lbschanno Jan 29, 2024
a875a20
Update metadata-utils submodule commit
lbschanno Jan 29, 2024
cd20ca5
code formatting
lbschanno Jan 29, 2024
56e3bad
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Feb 6, 2024
39ecec4
Fix failing tests
lbschanno Feb 6, 2024
906f3ee
Additional test fixes
lbschanno Feb 6, 2024
9478a62
Ensure all original tests pass
lbschanno Feb 6, 2024
1bf9201
Add federated planner tests and chained schedulers
lbschanno Feb 27, 2024
941607f
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Feb 27, 2024
c019c78
pr feedback
lbschanno Feb 29, 2024
9c72dab
metadata-utils 3.0.3 tag
ivakegg Mar 6, 2024
e5af693
Fixed the index hole data ingest to set appropriate time stamps on th…
ivakegg Mar 6, 2024
0842cec
Merge branch 'integration' into task/federatedQueryPlanner
ivakegg Mar 6, 2024
d774f6f
Updated applyModel to use the passed in script
ivakegg Mar 6, 2024
3da641d
Remove unneeded changes
lbschanno Mar 6, 2024
235dad8
Make FederatedQueryPlanner the default
lbschanno Mar 7, 2024
b63b24c
Restore original log4j.properties
lbschanno Mar 8, 2024
0c4ccff
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Mar 8, 2024
5515fdc
code formatting
lbschanno Mar 8, 2024
fede1a0
Fix QueryPlanTest
lbschanno Mar 9, 2024
6ed7a39
Updated to test with teardown
ivakegg Mar 9, 2024
1d506e3
Test debugging edits
lbschanno Mar 11, 2024
0578bc2
Updated formatting
ivakegg Mar 13, 2024
e9a76e6
Concatenate sub-plans
lbschanno Mar 13, 2024
195dabe
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Mar 13, 2024
d6354ce
Make FederatedQueryPlanner implement Cloneable
lbschanno Mar 14, 2024
c2a57f0
code formatting
lbschanno Mar 14, 2024
6e1f131
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Jun 3, 2024
a2da80c
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Jun 11, 2024
49c4bc6
Merge branch 'integration' into task/federatedQueryPlanner
lbschanno Jun 13, 2024
53a3ea8
Merge branch 'integration' into task/federatedQueryPlanner
ivakegg Jun 25, 2024
cbe8d87
Merge remote-tracking branch 'origin/integration' into task/federated…
ivakegg Jul 1, 2024
74c9db8
* Updated with metadata-utils 4.0.5 (index markers and avoid non-inde…
ivakegg Jul 2, 2024
6608e5f
Merge branch 'integration' into task/federatedQueryPlanner
ivakegg Jul 2, 2024
390ae48
* Allow subclasses of ShardQueryConfiguration
ivakegg Jul 5, 2024
5b52f8b
Merge remote-tracking branch 'origin/integration' into task/federated…
ivakegg Jul 5, 2024
5516f73
Merge branch 'integration' into task/federatedQueryPlanner
ivakegg Jul 8, 2024
8370ea2
Updated to throw a NoResultsException for am empty query.
ivakegg Jul 8, 2024
a9cb2fa
Merge branch 'task/federatedQueryPlanner' of github.com:NationalSecur…
ivakegg Jul 8, 2024
fece27b
import reorg
ivakegg Jul 8, 2024
9aca68a
Updated to avoid expanding unfielded if disabled, and to assume no in…
ivakegg Jul 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
<version.microservice.base-rest-responses>3.0.0</version.microservice.base-rest-responses>
<version.microservice.common-utils>2.0.0</version.microservice.common-utils>
<version.microservice.dictionary-api>3.0.0</version.microservice.dictionary-api>
<version.microservice.metadata-utils>3.0.0</version.microservice.metadata-utils>
<version.microservice.metadata-utils>3.0.2-SNAPSHOT</version.microservice.metadata-utils>
<version.microservice.metrics-reporter>2.0.0</version.microservice.metrics-reporter>
<version.microservice.query-metric-api>3.0.0</version.microservice.query-metric-api>
<version.microservice.type-utils>2.0.1</version.microservice.type-utils>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,27 @@
*/
public class EventMetadata implements RawRecordMetadata {

private MetadataWithMostRecentDate compositeFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_CI);
private MetadataWithMostRecentDate compositeSeparators = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_CISEP);
private MetadataWithMostRecentDate dataTypeFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_T);
private MetadataWithMostRecentDate normalizedFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_N);

private static final Logger log = getLogger(EventMetadata.class);
private final Text metadataTableName;
private final Text loadDatesTableName;

private final MetadataWithMostRecentDate compositeFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_CI);
private final MetadataWithMostRecentDate compositeSeparators = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_CISEP);
private final MetadataWithMostRecentDate dataTypeFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_T);
private final MetadataWithMostRecentDate normalizedFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_N);

// stores field name, data type, and most recent event date
private final MetadataWithMostRecentDate eventFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_E);
private final MetadataWithMostRecentDate indexedFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_I);
private final MetadataWithMostRecentDate reverseIndexedFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_RI);
private final MetadataWithMostRecentDate termFrequencyFieldsInfo = new MetadataWithMostRecentDate(ColumnFamilyConstants.COLF_TF);

// stores counts
private final MetadataCounterGroup frequencyCounts = new MetadataCounterGroup(ColumnFamilyConstants.COLF_F); // by event date
private final MetadataCounterGroup indexedFieldsLoadDateCounts;
private final MetadataCounterGroup reverseIndexedFieldsLoadDateCounts;
private boolean frequency = false;
private final MetadataCounterGroup indexedCounts = new MetadataCounterGroup(ColumnFamilyConstants.COLF_I);
private final MetadataCounterGroup reverseIndexedCounts = new MetadataCounterGroup(ColumnFamilyConstants.COLF_RI);

private boolean writeFrequencyCounts = false;

/**
* @param shardTableName
Expand All @@ -119,32 +122,32 @@ public class EventMetadata implements RawRecordMetadata {
* used as part of column family within LOAD_DATES_TABLE_NAME
* @param loadDatesTableName
* the table name for the load dates
* @param frequency
* @param writeFrequencyCounts
* whether to add to the metadata table's counts for field name by event date
*/
public EventMetadata(@SuppressWarnings("UnusedParameters") Text shardTableName, Text metadataTableName, Text loadDatesTableName, Text shardIndexTableName,
Text shardReverseIndexTableName, boolean frequency) {
Text shardReverseIndexTableName, boolean writeFrequencyCounts) {
this.metadataTableName = metadataTableName;
this.loadDatesTableName = loadDatesTableName;
this.frequency = frequency;
this.writeFrequencyCounts = writeFrequencyCounts;

this.indexedFieldsLoadDateCounts = new MetadataCounterGroup("FIELD_NAME", shardIndexTableName);
this.reverseIndexedFieldsLoadDateCounts = new MetadataCounterGroup("FIELD_NAME", shardReverseIndexTableName);
}

@Override
public void addEvent(IngestHelperInterface helper, RawRecordContainer event, Multimap<String,NormalizedContentInterface> fields, long loadTimeInMillis) {
addEvent(helper, event, fields, this.frequency, INCLUDE_LOAD_DATES, loadTimeInMillis);
addEvent(helper, event, fields, this.writeFrequencyCounts, INCLUDE_LOAD_DATES, loadTimeInMillis);
}

@Override
public void addEvent(IngestHelperInterface helper, RawRecordContainer event, Multimap<String,NormalizedContentInterface> fields) {
addEvent(helper, event, fields, this.frequency, INCLUDE_LOAD_DATES, System.currentTimeMillis());
addEvent(helper, event, fields, this.writeFrequencyCounts, INCLUDE_LOAD_DATES, System.currentTimeMillis());
}

@Override
public void addEventWithoutLoadDates(IngestHelperInterface helper, RawRecordContainer event, Multimap<String,NormalizedContentInterface> fields) {
addEvent(helper, event, fields, this.frequency, EXCLUDE_LOAD_DATES, System.currentTimeMillis());
addEvent(helper, event, fields, this.writeFrequencyCounts, EXCLUDE_LOAD_DATES, System.currentTimeMillis());
}

@Override
Expand Down Expand Up @@ -241,17 +244,17 @@ protected void addToFrequencyCounts(RawRecordContainer event, String fieldName,

protected void updateForIndexedField(@SuppressWarnings("UnusedParameters") IngestHelperInterface helper, RawRecordContainer event,
Multimap<String,NormalizedContentInterface> fields, long countDelta, String loadDate, String tokenDesignator, String fieldName) {
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, indexedFieldsInfo, indexedFieldsLoadDateCounts);
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, indexedCounts, indexedFieldsLoadDateCounts);
}

private void updateForCompositeField(@SuppressWarnings("UnusedParameters") IngestHelperInterface helper, RawRecordContainer event,
Multimap<String,NormalizedContentInterface> fields, long countDelta, String loadDate, String tokenDesignator, String fieldName) {
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, indexedFieldsInfo, indexedFieldsLoadDateCounts);
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, indexedCounts, indexedFieldsLoadDateCounts);
}

protected void updateForReverseIndexedField(@SuppressWarnings("UnusedParameters") IngestHelperInterface helper, RawRecordContainer event,
Multimap<String,NormalizedContentInterface> fields, long countDelta, String loadDate, String tokenDesignator, String fieldName) {
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, reverseIndexedFieldsInfo, reverseIndexedFieldsLoadDateCounts);
update(event, fields.get(fieldName), tokenDesignator, countDelta, loadDate, reverseIndexedCounts, reverseIndexedFieldsLoadDateCounts);
}

protected void update(List<datawave.data.type.Type<?>> types, RawRecordContainer event, Collection<NormalizedContentInterface> norms,
Expand Down Expand Up @@ -300,6 +303,28 @@ protected void update(RawRecordContainer event, Collection<NormalizedContentInte
}
}

protected void update(RawRecordContainer event, Collection<NormalizedContentInterface> norms, String tokenDesignator, long countDelta, String loadDate,
MetadataWithEventDate metadata, MetadataCounterGroup counts) {
for (NormalizedContentInterface norm : norms) {
String field = norm.getIndexedFieldName() + tokenDesignator;
metadata.put(field, event.getDataType().outputName(), event.getDate());
if (null != loadDate) {
updateLoadDateCounters(counts, event, field, countDelta, loadDate);
}
}
}

protected void update(RawRecordContainer event, Collection<NormalizedContentInterface> norms, String tokenDesignator, long countDelta, String loadDate,
MetadataCounterGroup metadata, MetadataCounterGroup counts) {
for (NormalizedContentInterface norm : norms) {
String field = norm.getIndexedFieldName() + tokenDesignator;
metadata.addToCount(countDelta, event.getDataType().outputName(), field, DateHelper.format(event.getDate()));
if (null != loadDate) {
updateLoadDateCounters(counts, event, field, countDelta, loadDate);
}
}
}

protected void updateMetadata(MetadataWithMostRecentDate metadataInfo, @SuppressWarnings("UnusedParameters") IngestHelperInterface helper,
RawRecordContainer event, Multimap<String,NormalizedContentInterface> fields, String fieldName) {
// will ignore the update load counters because it's null
Expand Down Expand Up @@ -381,10 +406,9 @@ public Multimap<BulkIngestKey,Value> getBulkMetadata() {
addIndexedFieldToMetadata(bulkData, eventFieldsInfo);
addIndexedFieldToMetadata(bulkData, termFrequencyFieldsInfo);

addIndexedFieldToMetadata(bulkData, indexedFieldsInfo);
addIndexedFieldToMetadata(bulkData, reverseIndexedFieldsInfo);

addFrequenciesToMetadata(bulkData);
addCountsToMetadata(bulkData, indexedCounts);
addCountsToMetadata(bulkData, reverseIndexedCounts);
addCountsToMetadata(bulkData, frequencyCounts);

addIndexedFieldToMetadata(bulkData, dataTypeFieldsInfo);
addIndexedFieldToMetadata(bulkData, normalizedFieldsInfo);
Expand All @@ -400,7 +424,7 @@ public Multimap<BulkIngestKey,Value> getBulkMetadata() {

protected void addToLoadDates(Multimap<BulkIngestKey,Value> results, MetadataCounterGroup countsGroup) {
if (loadDatesTableName != null) {
for (MetadataCounterGroup.CountAndKeyComponents entry : countsGroup.getEntries()) {
for (MetadataCounterGroup.Components entry : countsGroup.getEntries()) {
Long count = entry.getCount();
Key k = new Key(new Text(entry.getRowId()), countsGroup.getColumnFamily(), new Text(entry.getDate() + DELIMITER + entry.getDataType()));

Expand All @@ -409,13 +433,14 @@ protected void addToLoadDates(Multimap<BulkIngestKey,Value> results, MetadataCou
}
}

protected void addFrequenciesToMetadata(Multimap<BulkIngestKey,Value> results) {
if (frequency) {
for (MetadataCounterGroup.CountAndKeyComponents entry : frequencyCounts.getEntries()) {
protected void addCountsToMetadata(Multimap<BulkIngestKey,Value> results, MetadataCounterGroup frequencies) {
// Do not write the counts if these are for "f" rows and writeFrequencyCounts is false.
if (!frequencies.getColumnFamily().equals(ColumnFamilyConstants.COLF_F) || writeFrequencyCounts) {
for (MetadataCounterGroup.Components entry : frequencies.getEntries()) {
Long count = entry.getCount();
Key k = new Key(new Text(entry.getRowId()), frequencyCounts.getColumnFamily(), new Text(entry.getDataType() + DELIMITER + entry.getDate()),
Key key = new Key(new Text(entry.getRowId()), frequencies.getColumnFamily(), new Text(entry.getDataType() + DELIMITER + entry.getDate()),
DateHelper.parse(entry.getDate()).getTime());
addToResults(results, count, k, this.metadataTableName);
addToResults(results, count, key, this.metadataTableName);
}
}
}
Expand All @@ -426,7 +451,7 @@ protected void addToResults(Multimap<BulkIngestKey,Value> results, Long value, K
}

protected void addIndexedFieldToMetadata(Multimap<BulkIngestKey,Value> results, MetadataWithMostRecentDate mostRecentDates) {
for (MetadataWithMostRecentDate.MostRecentEventDateAndKeyComponents entry : mostRecentDates.entries()) {
for (MetadataWithMostRecentDate.Components entry : mostRecentDates.entries()) {
long mostRecentDate = entry.getMostRecentDate();
Text fieldName = new Text(entry.getFieldName());
Text colq = new Text(entry.getDataType());
Expand All @@ -445,10 +470,9 @@ public void clear() {
this.termFrequencyFieldsInfo.clear();
this.frequencyCounts.clear();

this.indexedFieldsInfo.clear();
this.indexedFieldsLoadDateCounts.clear();
this.indexedCounts.clear();
this.reverseIndexedCounts.clear();

this.reverseIndexedFieldsInfo.clear();
this.reverseIndexedFieldsLoadDateCounts.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public class MetadataCounterGroup {
private static final Logger log = Logger.getLogger(MetadataCounterGroup.class);
private final Text columnFamily;
private HashMap<String,CountAndKeyComponents> counts = new HashMap<>();
private HashMap<String,Components> counts = new HashMap<>();

public MetadataCounterGroup(String groupName, Text tableName) {
this.columnFamily = new Text(groupName + RawRecordMetadata.DELIMITER + tableName);
Expand All @@ -28,9 +28,9 @@ private static String createKey(String dataType, String rowid, String date) {
/* rowId is either the fieldName or the Lac */
public void addToCount(long countDelta, String dataType, String rowId, String date) {
String hashMapKey = createKey(dataType, rowId, date);
CountAndKeyComponents value = counts.get(hashMapKey);
Components value = counts.get(hashMapKey);
if (null == value) {
counts.put(hashMapKey, new CountAndKeyComponents(dataType, rowId, date, countDelta));
counts.put(hashMapKey, new Components(dataType, rowId, date, countDelta));
} else {
value.incrementCount(countDelta);
}
Expand All @@ -44,17 +44,17 @@ public Text getColumnFamily() {
return columnFamily;
}

public Collection<CountAndKeyComponents> getEntries() {
public Collection<Components> getEntries() {
return counts.values();
}

public class CountAndKeyComponents {
public static class Components {
private final String dataType;
private final String rowId;
private final String date;
private long count;

public CountAndKeyComponents(String dataType, String rowId, String date, long countDelta) {
public Components(String dataType, String rowId, String date, long countDelta) {
this.dataType = dataType;
this.rowId = rowId;
this.date = date;
Expand Down Expand Up @@ -84,18 +84,18 @@ public int hashCode() {
return hashCode;
}

public long getCount() {
return count;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CountAndKeyComponents)) {
if (!(o instanceof Components)) {
return false;
}
CountAndKeyComponents other = (CountAndKeyComponents) o;
Components other = (Components) o;
return Objects.equal(this.dataType, other.dataType) && Objects.equal(this.rowId, other.rowId) && Objects.equal(this.date, other.date)
&& count == other.count;
}

public long getCount() {
return count;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package datawave.ingest.metadata;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import org.apache.hadoop.io.Text;

public class MetadataWithEventDate {

private final Text columnFamily;
private final Set<Components> entries = new HashSet<>();

public MetadataWithEventDate(Text columnFamily) {
this.columnFamily = columnFamily;
}

public Text getColumnFamily() {
return columnFamily;
}

public void put(String fieldName, String dataTypeName, long date) {
entries.add(new Components(fieldName, dataTypeName, date));
}

public void clear() {
entries.clear();
}

public Set<Components> getEntries() {
return entries;
}

public static class Components {
private final String fieldName;
private final String dataType;
private final long date;

public Components(String fieldName, String dataType, long date) {
this.fieldName = fieldName;
this.dataType = dataType;
this.date = date;
}

public String getFieldName() {
return fieldName;
}

public String getDataType() {
return dataType;
}

public long getDate() {
return date;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Components that = (Components) o;
return Objects.equals(fieldName, that.fieldName) && Objects.equals(dataType, that.dataType) && Objects.equals(date, that.date);
}

@Override
public int hashCode() {
return Objects.hash(fieldName, dataType, date);
}
}
}
Loading
Loading