Skip to content

Commit

Permalink
[Cosmos] Allows Conditional Search to execute with _maxParallel (#3600)
Browse files Browse the repository at this point in the history
* Allows Conditional Search to execute in max parallel
* Expanding the usage of max parallelism validation adding new HTTP header to handle max parallelism.
* Added new test to check if the OptimizeConcurrency flag is sent to the search service.
* Add new tests validating if the FHIR Context property bag is correctly populated

Co-authored-by: apurvabhale <[email protected]>
Co-authored-by: Fernando Henrique Inocêncio Borba Ferreira <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2024
1 parent b07b02b commit ac93e95
Show file tree
Hide file tree
Showing 32 changed files with 828 additions and 290 deletions.
3 changes: 3 additions & 0 deletions src/Microsoft.Health.Fhir.Core/Features/KnownHeaders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ public static class KnownHeaders
public const string ProfileValidation = "x-ms-profile-validation";
public const string CustomAuditHeaderPrefix = "X-MS-AZUREFHIR-AUDIT-";
public const string FhirUserHeader = "x-ms-fhiruser";

// #conditionalQueryParallelism - Header used to activate parallel conditional-query processing.
public const string ConditionalQueryProcessingLogic = "x-conditionalquery-processing-logic";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static class KnownQueryParameterNames

public const string Container = "_container";

/// <summary>
/// Originally for CosmosDB workloads to hint that this request should run with a max parallel setting.
/// </summary>
public const string OptimizeConcurrency = "_optimizeConcurrency";

/// <summary>
/// The anonymization configuration
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ public sealed class ConditionalCreateResourceRequest : ConditionalResourceReques
{
private static readonly string[] Capabilities = new string[1] { "conditionalCreate = true" };

public ConditionalCreateResourceRequest(ResourceElement resource, IReadOnlyList<Tuple<string, string>> conditionalParameters, BundleResourceContext bundleResourceContext = null)
public ConditionalCreateResourceRequest(
ResourceElement resource,
IReadOnlyList<Tuple<string, string>> conditionalParameters,
BundleResourceContext bundleResourceContext = null)
: base(resource.InstanceType, conditionalParameters, bundleResourceContext)
{
EnsureArg.IsNotNull(resource, nameof(resource));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ConditionalPatchResourceRequest(
string resourceType,
PatchPayload payload,
IReadOnlyList<Tuple<string, string>> conditionalParameters,
BundleResourceContext bundleResourceContext,
BundleResourceContext bundleResourceContext = null,
WeakETag weakETag = null)
: base(resourceType, conditionalParameters, bundleResourceContext)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ public sealed class ConditionalUpsertResourceRequest : ConditionalResourceReques
{
private static readonly string[] Capabilities = new string[1] { "conditionalUpdate = true" };

public ConditionalUpsertResourceRequest(ResourceElement resource, IReadOnlyList<Tuple<string, string>> conditionalParameters, BundleResourceContext bundleResourceContext = null)
public ConditionalUpsertResourceRequest(
ResourceElement resource,
IReadOnlyList<Tuple<string, string>> conditionalParameters,
BundleResourceContext bundleResourceContext = null)
: base(resource.InstanceType, conditionalParameters, bundleResourceContext)
{
EnsureArg.IsNotNull(resource, nameof(resource));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Core;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
Expand All @@ -40,7 +39,6 @@ internal class FhirCosmosSearchService : SearchService
private readonly RequestContextAccessor<IFhirRequestContext> _requestContextAccessor;
private readonly CosmosDataStoreConfiguration _cosmosConfig;
private readonly ICosmosDbCollectionPhysicalPartitionInfo _physicalPartitionInfo;
private readonly QueryPartitionStatisticsCache _queryPartitionStatisticsCache;
private readonly Lazy<IReadOnlyCollection<IExpressionVisitorWithInitialContext<object, Expression>>> _expressionRewriters;
private readonly ILogger<FhirCosmosSearchService> _logger;
private readonly SearchParameterInfo _resourceTypeSearchParameter;
Expand All @@ -54,7 +52,6 @@ public FhirCosmosSearchService(
RequestContextAccessor<IFhirRequestContext> requestContextAccessor,
CosmosDataStoreConfiguration cosmosConfig,
ICosmosDbCollectionPhysicalPartitionInfo physicalPartitionInfo,
QueryPartitionStatisticsCache queryPartitionStatisticsCache,
CompartmentSearchRewriter compartmentSearchRewriter,
SmartCompartmentSearchRewriter smartCompartmentSearchRewriter,
ILogger<FhirCosmosSearchService> logger)
Expand All @@ -65,7 +62,6 @@ public FhirCosmosSearchService(
EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));
EnsureArg.IsNotNull(cosmosConfig, nameof(cosmosConfig));
EnsureArg.IsNotNull(physicalPartitionInfo, nameof(physicalPartitionInfo));
EnsureArg.IsNotNull(queryPartitionStatisticsCache, nameof(queryPartitionStatisticsCache));
EnsureArg.IsNotNull(compartmentSearchRewriter, nameof(compartmentSearchRewriter));
EnsureArg.IsNotNull(smartCompartmentSearchRewriter, nameof(smartCompartmentSearchRewriter));
EnsureArg.IsNotNull(logger, nameof(logger));
Expand All @@ -75,7 +71,6 @@ public FhirCosmosSearchService(
_requestContextAccessor = requestContextAccessor;
_cosmosConfig = cosmosConfig;
_physicalPartitionInfo = physicalPartitionInfo;
_queryPartitionStatisticsCache = queryPartitionStatisticsCache;
_logger = logger;
_resourceTypeSearchParameter = SearchParameterInfo.ResourceTypeSearchParameter;
_resourceIdSearchParameter = new SearchParameterInfo(SearchParameterNames.Id, SearchParameterNames.Id);
Expand Down Expand Up @@ -403,10 +398,9 @@ protected override async Task<SearchResult> SearchForReindexInternalAsync(
// Additionally, when we query sequentially, we would like to gradually fan out the parallelism, but the Cosmos DB SDK
// does not currently properly support that. See https://github.com/Azure/azure-cosmos-dotnet-v3/issues/2290

QueryPartitionStatistics queryPartitionStatistics = null;
IFhirRequestContext fhirRequestContext = null;
ConcurrentBag<CosmosResponseMessage> messagesList = null;
if (_physicalPartitionInfo.PhysicalPartitionCount > 1 && queryRequestOptionsOverride == null)
if (queryRequestOptionsOverride == null)
{
if (searchOptions.Sort?.Count > 0)
{
Expand All @@ -417,21 +411,24 @@ protected override async Task<SearchResult> SearchForReindexInternalAsync(
if (searchOptions.Expression != null && // without a filter the query will not be selective
string.IsNullOrEmpty(searchOptions.ContinuationToken))
{
// Telemetry currently shows that when there is a continuation token, the the query only hits one partition.
// Telemetry currently shows that when there is a continuation token, then the query only hits one partition.
// This may not be true forever, in which case we would want to encode the max concurrency in the continuation token.

queryPartitionStatistics = _queryPartitionStatisticsCache.GetQueryPartitionStatistics(searchOptions.Expression);
if (IsQuerySelective(queryPartitionStatistics))
{
feedOptions.MaxConcurrency = _cosmosConfig.ParallelQueryOptions.MaxQueryConcurrency;
}

// plant a ConcurrentBag int the request context's properties, so the CosmosResponseProcessor
// knows to add the individual ResponseMessages sent as part of this search.

fhirRequestContext = _requestContextAccessor.RequestContext;
if (fhirRequestContext != null)
{
// Check if the "Optimize Concurrency" flag is present in the FHIR context, then Cosmos DB
// will be able to maximize the number of concurrent operations.
if (fhirRequestContext.Properties.TryGetValue(KnownQueryParameterNames.OptimizeConcurrency, out object maxParallelAsObject))
{
if (maxParallelAsObject != null && Convert.ToBoolean(maxParallelAsObject))
{
feedOptions.MaxConcurrency = _cosmosConfig.ParallelQueryOptions.MaxQueryConcurrency;
}
}

// Plant a ConcurrentBag int the request context's properties, so the CosmosResponseProcessor
// knows to add the individual ResponseMessages sent as part of this search.
messagesList = new ConcurrentBag<CosmosResponseMessage>();
fhirRequestContext.Properties[Constants.CosmosDbResponseMessagesProperty] = messagesList;
}
Expand Down Expand Up @@ -468,68 +465,31 @@ protected override async Task<SearchResult> SearchForReindexInternalAsync(
(results, nextContinuationToken) = await _fhirDataStore.ExecuteDocumentQueryAsync<T>(sqlQuerySpec, feedOptions, continuationToken, searchOptions.MaxItemCountSpecifiedByClient, feedOptions.MaxConcurrency == null ? searchEnumerationTimeoutOverrideIfSequential : null, cancellationToken);
}

if (queryPartitionStatistics != null && messagesList != null)
if (messagesList != null)
{
if ((results == null || results.Count < desiredItemCount) && !string.IsNullOrEmpty(nextContinuationToken))
{
// ExecuteDocumentQueryAsync gave up on filling the pages. This suggests that we would have been better off querying in parallel.
queryPartitionStatistics.Update(_physicalPartitionInfo.PhysicalPartitionCount);

_logger.LogInformation(
"Failed to fill items, found {ItemCount}, needed {DesiredItemCount}. Updating statistics to {PhysicalPartitionCount}",
"Failed to fill items, found {ItemCount}, needed {DesiredItemCount}. Physical partition count {PhysicalPartitionCount}",
results.Count,
desiredItemCount,
_physicalPartitionInfo.PhysicalPartitionCount);
}
else
{
// determine the number of unique physical partitions queried as part of this search.
int physicalPartitionCount = messagesList.Select(r => r.Headers["x-ms-documentdb-partitionkeyrangeid"]).Distinct().Count();
queryPartitionStatistics.Update(physicalPartitionCount);
}
}

return (results, nextContinuationToken, feedOptions.MaxConcurrency);
}
finally
{
if (queryPartitionStatistics != null && fhirRequestContext != null)
if (fhirRequestContext != null)
{
fhirRequestContext.Properties.Remove(Constants.CosmosDbResponseMessagesProperty);
}
}
}

/// <summary>
/// Heuristic for determining whether a query is selective or not. If it is, we should query partitions in parallel.
/// If it is not, we should query sequentially, since we would expect to get a full page of results from the first partition.
/// This is really simple right now
/// </summary>
private bool IsQuerySelective(QueryPartitionStatistics queryPartitionStatistics)
{
int? averagePartitionCount = queryPartitionStatistics.GetAveragePartitionCount();

if (averagePartitionCount.HasValue && _cosmosConfig.UseQueryStatistics)
{
// this is not a new query

double fractionOfPartitionsHit = (double)averagePartitionCount.Value / _physicalPartitionInfo.PhysicalPartitionCount;

if (fractionOfPartitionsHit >= 0.5)
{
_logger.LogInformation(
"Query was Selective. Avg. Partitions: {AvgPartitions} / Physical Partitions: {PhysicalPartitionCount} = {FractionOfPartitionsHit}",
averagePartitionCount.Value,
_physicalPartitionInfo.PhysicalPartitionCount,
fractionOfPartitionsHit);

return true;
}
}

return false;
}

private async Task<int> ExecuteCountSearchAsync(
QueryDefinition sqlQuerySpec,
CancellationToken cancellationToken)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,6 @@ private static IFhirServerBuilder AddCosmosDbPersistence(this IFhirServerBuilder
.AsSelf()
.AsImplementedInterfaces();

services.Add<QueryPartitionStatisticsCache>()
.Singleton()
.AsSelf()
.AsImplementedInterfaces();

services.Add<PurgeOperationCapabilityProvider>()
.Transient()
.AsImplementedInterfaces();
Expand Down
Loading

0 comments on commit ac93e95

Please sign in to comment.