Skip to content

Commit

Permalink
Export with history and soft deletes (#3519)
Browse files Browse the repository at this point in the history
* Added initial logic for Cosmos history/delete in $export

* Fixed build issue with SQL export orchestrator

* Fixed cosmos export bugs found via manual testing

* Added soft delete extension to export

* Add first pass on Export E2E Tests

* Updated export E2E tests to use test data from fixture.

* Updated long export tests for individual history/delete scenarios

* Updated gitignore to ignore azurite temp files

* Small azure storage explorer / azurite comment update for export tests

* Fixed ExportJobTaskTests to not rely on position of queryParameterList

* Added ExportJobTask unit test for history/delete

Also cleaned up some left over comments from previous commit

* Removing changed settings.json file

* Added historical and soft delete export to rest file

* Added import of soft deleted resources

* Updates per PR review

* Removed import changes - going to another PR

* Removed launch.json changes accidentally committed

* Added initial SQL export with soft delete / history

* oopsies undo change to importresourceprocessor breaking tests

* Fixed logical error in new SQL historical search

* Added searchoptionsfactory test for include history/deleted

* Fixed failing export test failure.

* Removed long running export flag to see if it runs in pipeline

* restructured export test location for readability

* Fixed export issues found in testing

* Changed SQL script version for merge

* Updated SQL schema version

* Code style cleanup

* Updated export history/deleted query params

* Fixed export included data test

* Changed SQL exporter to use export configuration vs magic numbers in code

* Rolling back unneeded SQL changes

* Fixed parallel export with history/soft delete

* Updated export E2E tests for parallel export multi-job

* fixed merge regression

* Removed unnecesary usings

* Fixed tx issue in export data tests

* testing central export perf

* Rolled back central execution of export tests

* Removing exportlongrunning for pipeline perf test

* Fixed build

* Optimized test structure

* merged from main

* Updated schema version to iterate after merge

* decoupled search options from export

* Fixed test issue

* Removing NA SQL comment

* PR comments on tests fix

* "un-fancified" ResourceTypeVersion enum

* Fix STU3 build error

* Fixed unit tests that no longer use export/history query params

* fixed test mistake for resourcetypeversion

* Fixed cosmos export search functoin

* Fixed export includeAssociatedData error message

* Default ResourceVersionType to latest

* Fixed history search - added deleted

* fix export count issue?

* small code cleanups
  • Loading branch information
mikaelweave authored Nov 10, 2023
1 parent 5b8e2e4 commit a938337
Show file tree
Hide file tree
Showing 58 changed files with 8,001 additions and 568 deletions.
7 changes: 7 additions & 0 deletions docs/rest/ExportRequests.http
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ Accept: application/fhir+json
Prefer: respond-async
Authorization: Bearer {{bearer.response.body.access_token}}

### Export with history and soft deleted records
# @name export
GET https://{{hostname}}/$export?includeAssociatedData=_history,_deleted
Accept: application/fhir+json
Prefer: respond-async
Authorization: Bearer {{bearer.response.body.access_token}}

### Get Export request
GET {{exportLocation}}
Authorization: Bearer {{bearer.response.body.access_token}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public ValidateExportRequestFilterAttribute()
KnownQueryParameterNames.Format,
KnownQueryParameterNames.TypeFilter,
KnownQueryParameterNames.IsParallel,
KnownQueryParameterNames.IncludeAssociatedData,
KnownQueryParameterNames.MaxCount,
KnownQueryParameterNames.AnonymizationConfigurationCollectionReference,
KnownQueryParameterNames.AnonymizationConfigurationLocation,
Expand Down
18 changes: 18 additions & 0 deletions src/Microsoft.Health.Fhir.Api/Resources.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion src/Microsoft.Health.Fhir.Api/Resources.resx
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,10 @@
<value>Invalid combination of processing logic and bundle type: {0} and {1}.</value>
<comment>Error message when there is a invalid/unknown combination of a bundle type and a processing logic.</comment>
</data>
</root>
<data name="TypeFilterNotSupportedWithHistoryOrDeletedExport" xml:space="preserve">
<value>The request "_typeFilter" cannot be used with an export request with historical or soft deleted resources.</value>
</data>
<data name="InvalidExportAssociatedDataParameter" xml:space="preserve">
<value>The export parameter "includeAssociatedData" contains an invalid value. Supported values are: {0}. </value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Threading;
Expand All @@ -20,6 +21,7 @@
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Features;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Export;
Expand Down Expand Up @@ -147,7 +149,8 @@ public async Task GivenThereAreTwoPagesOfSearchResults_WhenExecuted_ThenCorrectS
null,
Arg.Is(CreateQueryParametersExpression(KnownResourceTypes.Patient)),
_cancellationToken,
true)
true,
ResourceVersionType.Latest)
.Returns(CreateSearchResult(continuationToken: continuationToken));

bool capturedSearch = false;
Expand All @@ -157,7 +160,8 @@ public async Task GivenThereAreTwoPagesOfSearchResults_WhenExecuted_ThenCorrectS
null,
Arg.Is(CreateQueryParametersExpressionWithContinuationToken(ContinuationTokenConverter.Encode(continuationToken), KnownResourceTypes.Patient)),
_cancellationToken,
true)
true,
ResourceVersionType.Latest)
.Returns(x =>
{
capturedSearch = true;
Expand Down Expand Up @@ -321,40 +325,70 @@ public async Task GivenThereAreMultiplePagesOfSearchResultsWithSinceParameter_Wh
Assert.True(secondCapturedSearch);
}

private Expression<Predicate<IReadOnlyList<Tuple<string, string>>>> CreateQueryParametersExpression(string resourceType)
[Fact]
public async Task GivenAnExportJobWithHistoryAndSoftDeletes_WhenExecuted_ThenAllResourcesAreExportedToTheProperLocation()
{
bool capturedSearch = false;

var exportJobRecordIncludeHistory = CreateExportJobRecord(
exportJobType: ExportJobType.Patient,
includeHistory: true,
includeDeleted: true,
maximumNumberOfResourcesPerQuery: 1);
SetupExportJobRecordAndOperationDataStore(exportJobRecordIncludeHistory);

_searchService.SearchAsync(
null,
Arg.Is(CreateQueryParametersExpression(KnownResourceTypes.Patient, includeHistory: true, includeDeleted: true)),
_cancellationToken,
true,
ResourceVersionType.Latest | ResourceVersionType.Histoy | ResourceVersionType.SoftDeleted)
.Returns(x =>
{
capturedSearch = true;

return CreateSearchResult();
});

await _exportJobTask.ExecuteAsync(_exportJobRecord, _weakETag, _cancellationToken);

Assert.True(capturedSearch);
}

private Expression<Predicate<IReadOnlyList<Tuple<string, string>>>> CreateQueryParametersExpression(string resourceType, bool includeHistory = false, bool includeDeleted = false)
{
return arg => arg != null &&
Tuple.Create("_count", "1").Equals(arg[0]) &&
Tuple.Create("_lastUpdated", $"le{_exportJobRecord.Till}").Equals(arg[1]) &&
Tuple.Create("_type", resourceType).Equals(arg[2]);
arg.Any(x => x.Item1 == "_count" && x.Item2 == "1") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"le{_exportJobRecord.Till}") &&
arg.Any(x => x.Item1 == "_type" && x.Item2 == resourceType);
}

private Expression<Predicate<IReadOnlyList<Tuple<string, string>>>> CreateQueryParametersExpression(PartialDateTime since, string resourceType)
{
return arg => arg != null &&
Tuple.Create("_count", "1").Equals(arg[0]) &&
Tuple.Create("_lastUpdated", $"le{_exportJobRecord.Till}").Equals(arg[1]) &&
Tuple.Create("_lastUpdated", $"ge{since}").Equals(arg[2]) &&
Tuple.Create("_type", resourceType).Equals(arg[3]);
arg.Any(x => x.Item1 == "_count" && x.Item2 == "1") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"le{_exportJobRecord.Till}") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"ge{since}") &&
arg.Any(x => x.Item1 == "_type" && x.Item2 == resourceType);
}

private Expression<Predicate<IReadOnlyList<Tuple<string, string>>>> CreateQueryParametersExpressionWithContinuationToken(string continuationToken, string resourceType)
{
return arg => arg != null &&
Tuple.Create("_count", "1").Equals(arg[0]) &&
Tuple.Create("_lastUpdated", $"le{_exportJobRecord.Till}").Equals(arg[1]) &&
Tuple.Create("_type", resourceType).Equals(arg[2]) &&
Tuple.Create("ct", continuationToken).Equals(arg[3]);
arg.Any(x => x.Item1 == "_count" && x.Item2 == "1") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"le{_exportJobRecord.Till}") &&
arg.Any(x => x.Item1 == "_type" && x.Item2 == resourceType) &&
arg.Any(x => x.Item1 == "ct" && x.Item2 == continuationToken);
}

private Expression<Predicate<IReadOnlyList<Tuple<string, string>>>> CreateQueryParametersExpressionWithContinuationToken(string continuationToken, PartialDateTime since, string resourceType)
{
return arg => arg != null &&
Tuple.Create("_count", "1").Equals(arg[0]) &&
Tuple.Create("_lastUpdated", $"le{_exportJobRecord.Till}").Equals(arg[1]) &&
Tuple.Create("_lastUpdated", $"ge{since}").Equals(arg[2]) &&
Tuple.Create("_type", resourceType).Equals(arg[3]) &&
Tuple.Create("ct", continuationToken).Equals(arg[4]);
arg.Any(x => x.Item1 == "_count" && x.Item2 == "1") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"le{_exportJobRecord.Till}") &&
arg.Any(x => x.Item1 == "_lastUpdated" && x.Item2 == $"ge{since}") &&
arg.Any(x => x.Item1 == "_type" && x.Item2 == resourceType) &&
arg.Any(x => x.Item1 == "ct" && x.Item2 == continuationToken);
}

[Fact]
Expand Down Expand Up @@ -834,7 +868,9 @@ public async Task GivenAnExportJobWithTheTypeParameter_WhenExecuted_ThenOnlyReso
true)
.Returns(x =>
{
string[] types = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[3].Item2.Split(',');
string[] types = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == KnownQueryParameterNames.Type)
.Select(x => x.Item2).First().Split(',');
SearchResultEntry[] entries = new SearchResultEntry[types.Length];

for (int index = 0; index < types.Length; index++)
Expand Down Expand Up @@ -889,7 +925,9 @@ public async Task GivenAPatientExportJobWithTheTypeParameter_WhenExecuted_ThenOn
true)
.Returns(x =>
{
string[] types = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(3)[3].Item2.Split(',');
string[] types = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(3)
.Where(x => x.Item1 == KnownQueryParameterNames.Type)
.Select(x => x.Item2).First().Split(',');
SearchResultEntry[] entries = new SearchResultEntry[types.Length];

for (int index = 0; index < types.Length; index++)
Expand Down Expand Up @@ -1107,7 +1145,9 @@ public async Task GivenAGroupExportJob_WhenExecuted_ThenAllPatientResourcesInThe
true)
.Returns(x =>
{
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2.Split(',');
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == Core.Features.KnownQueryParameterNames.Id)
.Select(x => x.Item2).First().Split(',');
SearchResultEntry[] entries = new SearchResultEntry[ids.Length];

for (int index = 0; index < ids.Length; index++)
Expand Down Expand Up @@ -1175,7 +1215,9 @@ public async Task GivenAGroupExportJobWithMultiplePagesOfPatients_WhenExecuted_T
true)
.Returns(x =>
{
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2.Split(',');
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == KnownQueryParameterNames.Id)
.Select(x => x.Item2).First().Split(',');

countOfSearches++;

Expand Down Expand Up @@ -1250,7 +1292,9 @@ public async Task GivenAGroupExportJobToResume_WhenExecuted_ThenAllPatientResour

if (countOfSearches == 1)
{
ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2.Split(',');
ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == Core.Features.KnownQueryParameterNames.Id)
.Select(x => x.Item2).First().Split(',');
continuationTokenIndex = 0;
}
else if (countOfSearches == 2)
Expand All @@ -1261,7 +1305,10 @@ public async Task GivenAGroupExportJobToResume_WhenExecuted_ThenAllPatientResour
{
// The ids aren't in the query parameters because of the reset
ids = new string[] { "1", "2", "3" };
continuationTokenIndex = int.Parse(ContinuationTokenConverter.Decode(x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2).Substring(2));
continuationTokenIndex = int.Parse(ContinuationTokenConverter.Decode(
x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == Core.Features.KnownQueryParameterNames.ContinuationToken)
.Select(x => x.Item2).First())[2..]);
}

return CreateSearchResult(
Expand Down Expand Up @@ -1342,7 +1389,10 @@ public async Task GivenAGroupExportJobWithTheTypeParameter_WhenExecuted_ThenAllP
true)
.Returns(x =>
{
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2.Split(',');
string[] ids = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)
.Where(x => x.Item1 == KnownQueryParameterNames.Id)
.Select(x => x.Item2).First().Split(',');

SearchResultEntry[] entries = new SearchResultEntry[ids.Length];

for (int index = 0; index < ids.Length; index++)
Expand All @@ -1363,7 +1413,9 @@ public async Task GivenAGroupExportJobWithTheTypeParameter_WhenExecuted_ThenAllP
.Returns(x =>
{
string parentId = x.ArgAt<string>(1);
string[] resourceTypes = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(3)[2].Item2.Split(',');
string[] resourceTypes = x.ArgAt<IReadOnlyList<Tuple<string, string>>>(3)
.Where(x => x.Item1 == KnownQueryParameterNames.Type)
.Select(x => x.Item2).First().Split(',');

SearchResultEntry[] entries = new SearchResultEntry[resourceTypes.Length];

Expand Down Expand Up @@ -2076,7 +2128,9 @@ private ExportJobRecord CreateExportJobRecord(
uint numberOfPagesPerCommit = 0,
string containerName = null,
string anonymizationConfigurationLocation = null,
string anonymizationConfigurationFileEtag = null)
string anonymizationConfigurationFileEtag = null,
bool includeHistory = false,
bool includeDeleted = false)
{
return new ExportJobRecord(
new Uri(requestEndpoint),
Expand All @@ -2094,7 +2148,9 @@ private ExportJobRecord CreateExportJobRecord(
numberOfPagesPerCommit: numberOfPagesPerCommit == 0 ? _exportJobConfiguration.NumberOfPagesPerCommit : numberOfPagesPerCommit,
storageAccountContainerName: containerName,
anonymizationConfigurationLocation: anonymizationConfigurationLocation,
anonymizationConfigurationFileETag: anonymizationConfigurationFileEtag);
anonymizationConfigurationFileETag: anonymizationConfigurationFileEtag,
includeHistory: includeHistory,
includeDeleted: includeDeleted);
}

private ExportJobTask CreateExportJobTask(
Expand Down
10 changes: 10 additions & 0 deletions src/Microsoft.Health.Fhir.Core/Configs/ExportJobConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public class ExportJobConfiguration
/// </summary>
public uint MaximumNumberOfResourcesPerQuery { get; set; } = 10000;

/// <summary>
/// For SQL export, controlls the number of parallel id ranges to gather to be used for parallel export.
/// </summary>
public int NumberOfParallelRecordRanges { get; set; } = 100;

/// <summary>
/// For SQL export, controlls the DOP (degree of parallelization) used by the coordinator to build sub-jobs.
/// </summary>
public int CoordinatorMaxDegreeOfParallelization { get; set; } = 4;

/// <summary>
/// Number of pages to be iterated before committing the export progress.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static async Task<CreateExportResponse> ExportAsync(
string containerName,
string formatName,
bool isParallel,
bool includeDeleted,
bool includeHistory,
uint maxCount,
string anonymizationConfigurationCollectionReference,
string anonymizationConfigLocation,
Expand All @@ -37,7 +39,23 @@ public static async Task<CreateExportResponse> ExportAsync(
EnsureArg.IsNotNull(mediator, nameof(mediator));
EnsureArg.IsNotNull(requestUri, nameof(requestUri));

var request = new CreateExportRequest(requestUri, requestType, resourceType, since, till, filters, groupId, containerName, formatName, isParallel, maxCount, anonymizationConfigurationCollectionReference, anonymizationConfigLocation, anonymizationConfigFileETag);
var request = new CreateExportRequest(
requestUri: requestUri,
requestType: requestType,
resourceType: resourceType,
since: since,
till: till,
filters: filters,
groupId: groupId,
containerName: containerName,
formatName: formatName,
isParallel: isParallel,
maxCount: maxCount,
includeDeleted: includeDeleted,
includeHistory: includeHistory,
anonymizationConfigurationCollectionReference: anonymizationConfigurationCollectionReference,
anonymizationConfigurationLocation: anonymizationConfigLocation,
anonymizationConfigurationFileETag: anonymizationConfigFileETag);

CreateExportResponse response = await mediator.Send(request, cancellationToken);
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public static class KnownQueryParameterNames

public const string PurgeHistory = "_purgeHistory";

/// <summary>
/// Used by $export as a comma-separated list of parameters instructing which initial data should be included.
/// </summary>
public const string IncludeAssociatedData = "includeAssociatedData";

/// <summary>
/// Used by export to specify the number of resources to be processed by the search engine.
/// </summary>
public const string MaxCount = "_maxCount";
}
}
Loading

0 comments on commit a938337

Please sign in to comment.