Skip to content

Commit 4e0b8b7

Browse files
zhixuanjiajjia
andauthored
feat(OpenAPI v3): Improve generic scroll API to have advanced pagination and facets (#14877)
Co-authored-by: jjia <[email protected]>
1 parent df6e6a9 commit 4e0b8b7

File tree

11 files changed

+286
-46
lines changed

11 files changed

+286
-46
lines changed

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchRequestHandler.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.linkedin.common.urn.Urn;
1111
import com.linkedin.data.schema.PathSpec;
1212
import com.linkedin.data.template.DoubleMap;
13+
import com.linkedin.data.template.StringMap;
1314
import com.linkedin.metadata.config.ConfigUtils;
1415
import com.linkedin.metadata.config.search.CustomConfiguration;
1516
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
@@ -442,22 +443,49 @@ public ScrollResult extractScrollResult(
442443
boolean supportsPointInTime) {
443444
int totalCount = (int) searchResponse.getHits().getTotalHits().value;
444445
size = ConfigUtils.applyLimit(searchServiceConfig, size);
445-
Collection<SearchEntity> resultList = getRestrictedResults(opContext, searchResponse);
446+
447+
// Build per-hit results and attach a per-element scrollId
448+
final SearchHit[] searchHits = searchResponse.getHits().getHits();
449+
long expirationTimeMs = 0L;
450+
if (keepAlive != null && supportsPointInTime) {
451+
expirationTimeMs =
452+
TimeValue.parseTimeValue(keepAlive, "expirationTime").getMillis()
453+
+ System.currentTimeMillis();
454+
}
455+
456+
List<SearchEntity> results = new ArrayList<>(searchHits.length);
457+
for (SearchHit hit : searchHits) {
458+
// Build base SearchEntity
459+
SearchEntity entity = getResult(hit);
460+
// Compute per-hit scrollId using this hit's sort values
461+
Object[] sort = hit.getSortValues();
462+
String perHitScrollId =
463+
new SearchAfterWrapper(sort, searchResponse.pointInTimeId(), expirationTimeMs)
464+
.toScrollId();
465+
// Merge into existing extraFields if present
466+
StringMap extra = entity.getExtraFields();
467+
if (extra == null) {
468+
entity.setExtraFields(new StringMap(Map.of("scrollId", perHitScrollId)));
469+
} else {
470+
extra.put("scrollId", perHitScrollId);
471+
entity.setExtraFields(extra);
472+
}
473+
results.add(entity);
474+
}
475+
476+
// Apply access control restrictions while preserving order
477+
Collection<SearchEntity> resultList =
478+
ESAccessControlUtil.restrictSearchResult(opContext, results);
479+
446480
SearchResultMetadata searchResultMetadata =
447481
extractSearchResultMetadata(opContext, searchResponse, filter);
448-
SearchHit[] searchHits = searchResponse.getHits().getHits();
482+
449483
// Only return next scroll ID if there are more results, indicated by full size results
450484
String nextScrollId = null;
451485
if (searchHits.length == size && searchHits.length > 0) {
452-
Object[] sort = searchHits[searchHits.length - 1].getSortValues();
453-
long expirationTimeMs = 0L;
454-
if (keepAlive != null && supportsPointInTime) {
455-
expirationTimeMs =
456-
TimeValue.parseTimeValue(keepAlive, "expirationTime").getMillis()
457-
+ System.currentTimeMillis();
458-
}
486+
Object[] lastSort = searchHits[searchHits.length - 1].getSortValues();
459487
nextScrollId =
460-
new SearchAfterWrapper(sort, searchResponse.pointInTimeId(), expirationTimeMs)
488+
new SearchAfterWrapper(lastSort, searchResponse.pointInTimeId(), expirationTimeMs)
461489
.toScrollId();
462490
}
463491

metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/Criterion.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ public enum Condition {
3232
ENDS_WITH,
3333
EXISTS,
3434
IN,
35-
CONTAIN
35+
CONTAIN,
36+
GREATER_THAN,
37+
LESS_THAN
3638
}
3739

3840
/** Convert this criterion to its counterpart in RecordTemplate. */
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.datahubproject.openapi.v3.models;
2+
3+
import java.util.Map;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.extern.jackson.Jacksonized;
7+
8+
@Data
9+
@Jacksonized
10+
@Builder
11+
public class FacetMetadata {
12+
private String field;
13+
private Map<String, Long> aggregations;
14+
}

metadata-service/openapi-servlet/models/src/main/java/io/datahubproject/openapi/v3/models/GenericEntityScrollResultV3.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ public class GenericEntityScrollResultV3
1111
implements GenericEntityScrollResult<GenericAspectV3, GenericEntityV3> {
1212
private String scrollId;
1313
private List<GenericEntityV3> entities;
14+
private List<FacetMetadata> facets;
1415
private int totalCount;
1516
}

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.linkedin.metadata.query.filter.SortOrder;
4040
import com.linkedin.metadata.search.ScrollResult;
4141
import com.linkedin.metadata.search.SearchEntityArray;
42+
import com.linkedin.metadata.search.SearchResultMetadata;
4243
import com.linkedin.metadata.search.SearchService;
4344
import com.linkedin.metadata.search.utils.QueryUtils;
4445
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
@@ -117,11 +118,13 @@ public abstract class GenericEntitiesController<
117118
protected abstract S buildScrollResult(
118119
@Nonnull OperationContext opContext,
119120
SearchEntityArray searchEntities,
121+
SearchResultMetadata searchResultMetadata,
120122
Set<String> aspectNames,
121123
boolean withSystemMetadata,
122124
@Nullable String scrollId,
123125
boolean expandEmpty,
124-
int totalCount)
126+
int totalCount,
127+
boolean includeScrollIdPerEntity)
125128
throws URISyntaxException;
126129

127130
protected List<E> buildEntityList(
@@ -292,11 +295,13 @@ public ResponseEntity<S> getEntities(
292295
buildScrollResult(
293296
opContext,
294297
result.getEntities(),
298+
null,
295299
mergedAspects,
296300
withSystemMetadata,
297301
result.getScrollId(),
298302
true,
299-
result.getNumEntities()));
303+
result.getNumEntities(),
304+
false));
300305
}
301306

302307
@Tag(name = "Generic Entities")

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.linkedin.metadata.models.AspectSpec;
2727
import com.linkedin.metadata.search.SearchEntity;
2828
import com.linkedin.metadata.search.SearchEntityArray;
29+
import com.linkedin.metadata.search.SearchResultMetadata;
2930
import com.linkedin.metadata.utils.AuditStampUtils;
3031
import com.linkedin.metadata.utils.GenericRecordUtils;
3132
import com.linkedin.metadata.utils.SystemMetadataUtils;
@@ -126,11 +127,13 @@ public ResponseEntity<BatchGetUrnResponseV2<GenericAspectV2, GenericEntityV2>> g
126127
public GenericEntityScrollResultV2 buildScrollResult(
127128
@Nonnull OperationContext opContext,
128129
SearchEntityArray searchEntities,
130+
SearchResultMetadata searchResultMetadata,
129131
Set<String> aspectNames,
130132
boolean withSystemMetadata,
131133
@Nullable String scrollId,
132134
boolean expandEmpty,
133-
int totalCount)
135+
int totalCount,
136+
boolean includeScrollIdPerEntity)
134137
throws URISyntaxException {
135138
return GenericEntityScrollResultV2.builder()
136139
.results(

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class OpenAPIV3Generator {
6161
private static final String NAME_PIT_KEEP_ALIVE = "pitKeepAlive";
6262
private static final String NAME_SLICE_ID = "sliceId";
6363
private static final String NAME_SLICE_MAX = "sliceMax";
64+
private static final String NAME_SKIP_AGGREGATION = "skipAggregation";
65+
private static final String NAME_SCROLL_ID_PER_ENTITY = "scrollIdPerEntity";
6466
private static final String PROPERTY_VALUE = "value";
6567
private static final String PROPERTY_URN = "urn";
6668
private static final String PROPERTY_PATCH = "patch";
@@ -120,6 +122,7 @@ public static OpenAPI generateOpenApiSpec(
120122
buildEntitySchema(filteredAspectSpec, aspectNames, true));
121123
components.addSchemas(
122124
"Scroll" + ENTITIES + ENTITY_RESPONSE_SUFFIX, buildEntitiesScrollSchema());
125+
components.addSchemas("FacetMetadata", buildFacetMetadataSchema());
123126
components.addSchemas(ASPECT_PATCH_PROPERTY, buildAspectPatchPropertySchema());
124127

125128
// --> Aspect components
@@ -661,6 +664,11 @@ private static PathItem buildGenericListEntitiesPath() {
661664
.name(NAME_SKIP_CACHE)
662665
.description("Skip cache when listing entities.")
663666
.schema(newSchema().type(TYPE_BOOLEAN)._default(false)),
667+
new Parameter()
668+
.in(NAME_QUERY)
669+
.name(NAME_SKIP_AGGREGATION)
670+
.description("Skip aggregations when listing entities.")
671+
.schema(newSchema().type(TYPE_BOOLEAN)._default(true)),
664672
new Parameter()
665673
.in(NAME_QUERY)
666674
.name(NAME_PIT_KEEP_ALIVE)
@@ -687,6 +695,11 @@ private static PathItem buildGenericListEntitiesPath() {
687695
.items(newSchema().type(TYPE_STRING)._default(PROPERTY_URN))),
688696
new Parameter().$ref("#/components/parameters/PaginationCount" + MODEL_VERSION),
689697
new Parameter().$ref("#/components/parameters/ScrollId" + MODEL_VERSION),
698+
new Parameter()
699+
.in(NAME_QUERY)
700+
.name(NAME_SCROLL_ID_PER_ENTITY)
701+
.description("Return a scroll id per entity.")
702+
.schema(newSchema().type(TYPE_BOOLEAN)._default(false)),
690703
new Parameter().$ref("#/components/parameters/ScrollQuery" + MODEL_VERSION),
691704
new Parameter().$ref("#/components/parameters/SliceId" + MODEL_VERSION),
692705
new Parameter().$ref("#/components/parameters/SliceMax" + MODEL_VERSION));
@@ -1277,6 +1290,12 @@ private static Schema buildEntitySchema(
12771290
properties.put(
12781291
PROPERTY_URN,
12791292
newSchema().type(TYPE_STRING).description("Unique id for " + entity.getName()));
1293+
// Per-element scrollId for pagination (search-after token)
1294+
properties.put(
1295+
NAME_SCROLL_ID,
1296+
newSchema()
1297+
.types(TYPE_STRING_NULLABLE)
1298+
.description("Per-entity scroll id for pagination."));
12801299

12811300
final Map<String, Schema> aspectProperties =
12821301
entity.getAspectSpecMap().entrySet().stream()
@@ -1349,6 +1368,12 @@ private static Schema buildEntitySchema(
13491368
a.getValue().getPegasusSchema().getName(), withSystemMetadata)));
13501369
properties.put(
13511370
PROPERTY_URN, newSchema().type(TYPE_STRING).description("Unique id for " + ENTITIES));
1371+
// Per-element scrollId for pagination (search-after token)
1372+
properties.put(
1373+
NAME_SCROLL_ID,
1374+
newSchema()
1375+
.types(TYPE_STRING_NULLABLE)
1376+
.description("Per-entity scroll id for pagination."));
13521377

13531378
return newSchema()
13541379
.type(TYPE_OBJECT)
@@ -1403,9 +1428,7 @@ private static Schema buildFilterSchema() {
14031428
"condition",
14041429
newSchema()
14051430
.type(TYPE_STRING)
1406-
._enum(
1407-
Arrays.asList(
1408-
"EQUAL", "STARTS_WITH", "ENDS_WITH", "EXISTS", "IN", "CONTAIN"))
1431+
._enum(Arrays.stream(Criterion.Condition.values()).map(Enum::name).toList())
14091432
._default("EQUAL")
14101433
.description("The condition for the criterion."))
14111434
.addProperties(
@@ -1562,13 +1585,32 @@ NAME_SCROLL_ID, newSchema().type(TYPE_STRING).description("Scroll id for paginat
15621585
.$ref(
15631586
String.format(
15641587
"#/components/schemas/%s%s", ENTITIES, ENTITY_RESPONSE_SUFFIX))))
1588+
.addProperty(
1589+
"facets",
1590+
newSchema()
1591+
.type(TYPE_ARRAY)
1592+
.description("List of facet aggregations for the result set.")
1593+
.items(newSchema().$ref("#/components/schemas/FacetMetadata")))
15651594
.addProperty(
15661595
"totalCount",
15671596
newSchema()
15681597
.type(TYPE_INTEGER)
15691598
.description("Total number of entities satisfy the criteria."));
15701599
}
15711600

1601+
private static Schema buildFacetMetadataSchema() {
1602+
return newSchema()
1603+
.type(TYPE_OBJECT)
1604+
.description("Facet aggregation metadata.")
1605+
.addProperty("field", newSchema().type(TYPE_STRING).description("Facet field name."))
1606+
.addProperty(
1607+
"aggregations",
1608+
newSchema()
1609+
.type(TYPE_OBJECT)
1610+
.description("Counts per facet value.")
1611+
.additionalProperties(newSchema().type(TYPE_INTEGER).format("int64")));
1612+
}
1613+
15721614
private static Schema buildEntityScrollSchema(final EntitySpec entity) {
15731615
return newSchema()
15741616
.type(TYPE_OBJECT)

0 commit comments

Comments
 (0)