Skip to content

Commit

Permalink
chore | adding the support for querying multi interaction filters (#203)
Browse files Browse the repository at this point in the history
* chore | adding the support for querying multi interaction filters
  • Loading branch information
aman-bansal authored Nov 29, 2023
1 parent 4055aaf commit dae8a7a
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

class GatewayServiceEntityEdgeFetcher {

private static final Logger LOG = LoggerFactory.getLogger(GatewayServiceEntityEdgeFetcher.class);
static final EdgeResultSet EMPTY_EDGE_RESULT_SET = new ConvertedEdgeResultSet(List.of());

Expand Down Expand Up @@ -119,16 +120,22 @@ private Maybe<Edge> buildEdge(

return zip(
this.attributeMapConverter.convert(
edgeSetGroupRequest.attributeRequests(), response.getAttributeMap()),
edgeSetGroupRequest.edgeSetRequests().get(neighbor.type()).attributeRequests(),
response.getAttributeMap()),
this.baselineMetricAggregationContainerMapConverter.convert(
edgeSetGroupRequest.metricAggregationRequests(), response.getMetricsMap()),
edgeSetGroupRequest
.edgeSetRequests()
.get(neighbor.type())
.metricAggregationRequests(),
response.getMetricsMap()),
(attributes, metrics) -> (Edge) new ConvertedEdge(neighbor, attributes, metrics))
.toMaybe();
}

@lombok.Value
@Accessors(fluent = true)
private static class ConvertedEdge implements Edge {

Entity neighbor;
Map<AttributeExpression, Object> attributeValues;
Map<AttributeExpression, BaselinedMetricAggregationContainer> metricContainers;
Expand All @@ -147,6 +154,7 @@ public BaselinedMetricAggregationContainer metric(AttributeExpression attributeE
@lombok.Value
@Accessors(fluent = true)
private static class ConvertedEdgeResultSet implements EdgeResultSet {

List<Edge> results;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.hypertrace.core.graphql.common.utils.Converter;
import org.hypertrace.gateway.service.v1.common.Expression;
import org.hypertrace.gateway.service.v1.common.Filter;
import org.hypertrace.gateway.service.v1.common.Operator;
import org.hypertrace.gateway.service.v1.entity.InteractionsRequest;
import org.hypertrace.graphql.entity.request.EdgeSetGroupRequest;
import org.hypertrace.graphql.entity.request.EdgeSetRequest;
import org.hypertrace.graphql.metric.request.MetricAggregationRequest;

class GatewayServiceEntityInteractionRequestBuilder {
Expand All @@ -44,7 +46,7 @@ class GatewayServiceEntityInteractionRequestBuilder {
}

Single<InteractionsRequest> build(EdgeSetGroupRequest edgeSetRequestGroup) {
if (edgeSetRequestGroup.entityTypes().isEmpty()) {
if (edgeSetRequestGroup.edgeSetRequests().isEmpty()) {
return Single.just(InteractionsRequest.getDefaultInstance());
}

Expand All @@ -61,40 +63,64 @@ Single<InteractionsRequest> build(EdgeSetGroupRequest edgeSetRequestGroup) {

private Single<Set<Expression>> collectSelectionsAndAggregations(EdgeSetGroupRequest request) {
return this.selectionConverter
.convert(request.attributeRequests())
.mergeWith(this.aggregationConverter.convert(request.metricAggregationRequests()))
.convert(getAllAttributeRequests(request))
.mergeWith(this.aggregationConverter.convert(getAllMetricAggregationRequests(request)))
.toObservable()
.flatMap(Observable::fromIterable)
.collect(Collectors.toUnmodifiableSet());
}

private Set<AttributeRequest> getAllAttributeRequests(EdgeSetGroupRequest request) {
return request.edgeSetRequests().values().stream()
.map(EdgeSetRequest::attributeRequests)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
}

private Set<MetricAggregationRequest> getAllMetricAggregationRequests(
EdgeSetGroupRequest request) {
return request.edgeSetRequests().values().stream()
.map(EdgeSetRequest::metricAggregationRequests)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
}

private Single<Filter> buildEntityInteractionFilter(EdgeSetGroupRequest request) {
return Observable.fromIterable(request.entityTypes()) // add entity types filter
.collect(Collectors.toUnmodifiableSet())
// Todo: we should be using converter taking argument as logical filters with filter arg schema
return Observable.fromIterable(request.edgeSetRequests().entrySet())
.map(
entry ->
Stream.concat(
Stream.of(buildEntityTypeFilter(request, entry.getKey())),
entry.getValue().filterArguments().stream())
.collect(Collectors.toUnmodifiableList()))
.flatMapSingle(this.filterConverter::convert)
.collect(Collectors.toUnmodifiableList())
.map(
entityTypes ->
AttributeAssociation.<FilterArgument>of(
request.neighborTypeAttribute().attributeExpressionAssociation().attribute(),
new EntityNeighborTypeFilter(
request.neighborTypeAttribute().attributeExpressionAssociation().value(),
entityTypes)))
.flatMap(
filterAssociation ->
this.filterConverter.convert(
Stream.concat(
request.filterArguments().stream(), // add all other filters
Stream.of(filterAssociation))
.collect(Collectors.toUnmodifiableSet())));
childFilters ->
Filter.newBuilder()
.setOperator(Operator.OR)
.addAllChildFilter(childFilters)
.build());
}

private AttributeAssociation<FilterArgument> buildEntityTypeFilter(
EdgeSetGroupRequest request, String entityType) {
return AttributeAssociation.of(
request.neighborTypeAttribute().attributeExpressionAssociation().attribute(),
new EntityNeighborTypeFilter(
request.neighborTypeAttribute().attributeExpressionAssociation().value(), entityType));
}

@Value
@Accessors(fluent = true)
private static class EntityNeighborTypeFilter implements FilterArgument {

FilterType type = FilterType.ATTRIBUTE;
String key = null;
AttributeExpression keyExpression;
FilterOperatorType operator = FilterOperatorType.IN;
Collection<String> value;
FilterOperatorType operator = FilterOperatorType.EQUALS;
String value;
AttributeScope idType = null;
String idScope = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@
import org.hypertrace.core.graphql.utils.schema.SelectionQuery;
import org.hypertrace.graphql.entity.dao.EntityDao;
import org.hypertrace.graphql.entity.request.EdgeSetGroupRequest;
import org.hypertrace.graphql.entity.request.EdgeSetRequest;
import org.hypertrace.graphql.entity.request.EntityLabelRequest;
import org.hypertrace.graphql.entity.request.EntityLabelRequestBuilder;
import org.hypertrace.graphql.entity.request.EntityRequest;
import org.hypertrace.graphql.entity.schema.Entity;
import org.hypertrace.graphql.entity.schema.EntityJoinable;
import org.hypertrace.graphql.entity.schema.EntityResultSet;
import org.hypertrace.graphql.entity.schema.argument.EntityTypeStringArgument;
import org.hypertrace.graphql.metric.request.MetricAggregationRequest;
import org.hypertrace.graphql.metric.request.MetricRequest;
import org.hypertrace.graphql.metric.request.MetricRequestBuilder;
import org.hypertrace.graphql.metric.schema.argument.AggregatableOrderArgument;
Expand Down Expand Up @@ -317,12 +317,9 @@ private static class DefaultEntityRequest implements EntityRequest {
@Value
@Accessors(fluent = true)
private static class EmptyEdgeSetGroupRequest implements EdgeSetGroupRequest {
Set<String> entityTypes = Collections.emptySet();
Collection<AttributeRequest> attributeRequests = Collections.emptyList();
Collection<MetricAggregationRequest> metricAggregationRequests = Collections.emptyList();
Collection<AttributeAssociation<FilterArgument>> filterArguments = Collections.emptyList();
AttributeRequest neighborIdAttribute = null;
AttributeRequest neighborTypeAttribute = null;
Map<String, EdgeSetRequest> edgeSetRequests = Collections.emptyMap();

@Override
public Single<EntityRequest> buildNeighborRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import static io.reactivex.rxjava3.core.Single.zip;

import graphql.schema.SelectedField;
import io.grpc.Status;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -89,35 +90,18 @@ private Single<EdgeSetGroupRequest> buildEdgeRequest(
Stream<SelectedField> edgeSetFields,
EdgeType edgeType) {
Set<SelectedField> edgeFields = edgeSetFields.collect(Collectors.toUnmodifiableSet());
List<FilterArgument> filterArguments = this.getFilters(edgeFields);

if (!filterArguments.isEmpty() && edgeFields.size() > 1) {
throw Status.UNIMPLEMENTED
.withDescription("Cannot specify more than one edge type with edge filters")
.asRuntimeException();
}

Map<String, Set<SelectedField>> edgesByType = this.getEdgesByType(edgeFields.stream());
Set<SelectedField> allEdges =
edgesByType.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
Map<String, Set<SelectedField>> edgesSelectionsByType =
this.getEdgesSelectionByType(edgeFields.stream());

return zip(
this.getRequestedAndRequiredAttributes(context, allEdges, edgeType),
this.getNeighborIdAttribute(context, edgeType),
this.getNeighborTypeAttribute(context, edgeType),
this.metricAggregationRequestBuilder.build(
context, HypertraceAttributeScopeString.INTERACTION, allEdges.stream()),
this.filterRequestBuilder.build(
context, HypertraceAttributeScopeString.INTERACTION, filterArguments),
(attributeRequests, neighborIdRequest, neighborTypeRequest, metricRequests, filters) ->
this.getEntityTypeToEdgeSetRequest(context, edgeType, edgeFields),
(neighborIdRequest, neighborTypeRequest, edgeRequests) ->
new DefaultEdgeSetGroupRequest(
edgesByType.keySet(),
attributeRequests,
metricRequests,
neighborIdRequest,
neighborTypeRequest,
edgeRequests,
(entityType, neighborIds) ->
this.neighborEntitiesRequestBuilderProvider
.get()
Expand All @@ -127,12 +111,44 @@ private Single<EdgeSetGroupRequest> buildEdgeRequest(
timeRange,
space,
neighborIds,
edgesByType.get(entityType)),
filters));
edgesSelectionsByType.get(entityType))));
}

private Single<Map<String, EdgeSetRequest>> getEntityTypeToEdgeSetRequest(
GraphQlRequestContext context, EdgeType edgeType, Set<SelectedField> edgeFields) {
return Observable.fromIterable(edgeFields)
.collect(Collectors.groupingBy(this::getEntityType, Collectors.toUnmodifiableSet()))
.flatMap(edgeFieldsMap -> this.getEdgeSetRequestMap(context, edgeType, edgeFieldsMap));
}

private Map<String, Set<SelectedField>> getEdgesByType(Stream<SelectedField> edgeSetStream) {
private Single<Map<String, EdgeSetRequest>> getEdgeSetRequestMap(
GraphQlRequestContext context,
EdgeType edgeType,
Map<String, Set<SelectedField>> entityTypeToEdgeFieldsMap) {
return Observable.fromIterable(entityTypeToEdgeFieldsMap.entrySet())
.flatMapSingle(
entry ->
this.getEdgeSetRequestEntry(context, edgeType, entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));
}

private Single<Map.Entry<String, EdgeSetRequest>> getEdgeSetRequestEntry(
GraphQlRequestContext context,
EdgeType edgeType,
String entityType,
Set<SelectedField> edgeFields) {
return zip(
this.getRequestedAndRequiredAttributes(context, edgeFields, edgeType),
this.getMetricAggregationRequestAttributes(context, edgeFields),
this.getFilterArguments(context, edgeFields),
(requestAttributes, metricAttributes, filters) ->
Map.entry(
entityType,
new DefaultEdgeSetRequest(requestAttributes, metricAttributes, filters)));
}

private Map<String, Set<SelectedField>> getEdgesSelectionByType(
Stream<SelectedField> edgeSetStream) {
return edgeSetStream.collect(
Collectors.groupingBy(
this::getEntityType,
Expand All @@ -155,11 +171,25 @@ private String getEntityType(SelectedField edgeSetField) {
.orElseThrow();
}

private Single<List<MetricAggregationRequest>> getMetricAggregationRequestAttributes(
GraphQlRequestContext context, Collection<SelectedField> edges) {
Set<SelectedField> selections =
edges.stream()
.collect(
Collectors.flatMapping(this::getEdgesForEdgeSet, Collectors.toUnmodifiableSet()));
return this.metricAggregationRequestBuilder.build(
context, HypertraceAttributeScopeString.INTERACTION, selections.stream());
}

private Single<List<AttributeRequest>> getRequestedAndRequiredAttributes(
GraphQlRequestContext context, Collection<SelectedField> edges, EdgeType edgeType) {
Set<SelectedField> selections =
edges.stream()
.collect(
Collectors.flatMapping(this::getEdgesForEdgeSet, Collectors.toUnmodifiableSet()));
return this.attributeRequestBuilder
.buildForAttributeQueryableFields(
context, HypertraceAttributeScopeString.INTERACTION, edges.stream())
context, HypertraceAttributeScopeString.INTERACTION, selections.stream())
.mergeWith(this.getNeighborIdAttribute(context, edgeType))
.mergeWith(this.getNeighborTypeAttribute(context, edgeType))
.collect(Collectors.toUnmodifiableList());
Expand All @@ -183,15 +213,21 @@ private Single<AttributeRequest> getNeighborIdAttribute(
}
}

private List<FilterArgument> getFilters(Set<SelectedField> selectedFields) {
return selectedFields.stream()
.map(
selectedField ->
this.argumentDeserializer.deserializeObjectList(
selectedField.getArguments(), FilterArgument.class))
.flatMap(Optional::stream)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList());
private Single<List<AttributeAssociation<FilterArgument>>> getFilterArguments(
GraphQlRequestContext context, Set<SelectedField> edgeFields) {
Set<FilterArgument> filterArguments =
edgeFields.stream()
.collect(Collectors.flatMapping(this::getFilter, Collectors.toUnmodifiableSet()));

return this.filterRequestBuilder.build(
context, HypertraceAttributeScopeString.INTERACTION, filterArguments);
}

private Stream<FilterArgument> getFilter(SelectedField selectedField) {
return this.argumentDeserializer
.deserializeObjectList(selectedField.getArguments(), FilterArgument.class)
.stream()
.flatMap(Collection::stream);
}

private Single<AttributeRequest> getNeighborTypeAttribute(
Expand Down Expand Up @@ -220,19 +256,23 @@ private enum EdgeType {
@Value
@Accessors(fluent = true)
private static class DefaultEdgeSetGroupRequest implements EdgeSetGroupRequest {

Set<String> entityTypes;
Collection<AttributeRequest> attributeRequests;
Collection<MetricAggregationRequest> metricAggregationRequests;
AttributeRequest neighborIdAttribute;
AttributeRequest neighborTypeAttribute;
Map<String, EdgeSetRequest> edgeSetRequests;
BiFunction<String, Collection<String>, Single<EntityRequest>> neighborRequestBuilder;
Collection<AttributeAssociation<FilterArgument>> filterArguments;

@Override
public Single<EntityRequest> buildNeighborRequest(
String entityType, Collection<String> neighborIds) {
return this.neighborRequestBuilder.apply(entityType, neighborIds);
}
}

@Value
@Accessors(fluent = true)
private static class DefaultEdgeSetRequest implements EdgeSetRequest {
Collection<AttributeRequest> attributeRequests;
Collection<MetricAggregationRequest> metricAggregationRequests;
Collection<AttributeAssociation<FilterArgument>> filterArguments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,15 @@

import io.reactivex.rxjava3.core.Single;
import java.util.Collection;
import java.util.Set;
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
import java.util.Map;
import org.hypertrace.core.graphql.common.request.AttributeRequest;
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
import org.hypertrace.graphql.metric.request.MetricAggregationRequest;

public interface EdgeSetGroupRequest {

Set<String> entityTypes();

// Includes neighbor id and type
Collection<AttributeRequest> attributeRequests();

Collection<MetricAggregationRequest> metricAggregationRequests();

AttributeRequest neighborIdAttribute();

AttributeRequest neighborTypeAttribute();

Single<EntityRequest> buildNeighborRequest(String entityType, Collection<String> neighborIds);

Collection<AttributeAssociation<FilterArgument>> filterArguments();
Map<String, EdgeSetRequest> edgeSetRequests();
}
Loading

0 comments on commit dae8a7a

Please sign in to comment.