Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet and Delta import #2062

Merged
merged 11 commits into from
Feb 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import au.csiro.pathling.io.FileSystemPersistence;
import au.csiro.pathling.io.ImportMode;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import io.delta.tables.DeltaTable;
import jakarta.annotation.Nonnull;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
Expand All @@ -39,6 +40,7 @@
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand All @@ -50,6 +52,7 @@
import org.hl7.fhir.r4.model.OperationOutcome.OperationOutcomeIssueComponent;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.UrlType;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -110,7 +113,7 @@ public ImportExecutor(@Nonnull final SparkSession spark,
public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParams) {
// Parse and validate the JSON request.
final List<ParametersParameterComponent> sourceParams = inParams.getParameter().stream()
.filter(param -> "source".equals(param.getName())).collect(Collectors.toList());
.filter(param -> "source".equals(param.getName())).toList();
if (sourceParams.isEmpty()) {
throw new InvalidUserInputError("Must provide at least one source parameter");
}
Expand All @@ -137,6 +140,21 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
.map(param -> ImportMode.fromCode(
((CodeType) param.getValue()).asStringValue()))
.orElse(ImportMode.OVERWRITE);

// Get the serialized resource type from the source parameter.
final ImportFormat format = sourceParam.getPart().stream()
.filter(param -> "format".equals(param.getName()))
.findFirst()
.map(param -> {
final String formatCode = ((StringType) param.getValue()).getValue();
try {
return ImportFormat.fromCode(formatCode);
} catch (final IllegalArgumentException e) {
throw new InvalidUserInputError("Unsupported format: " + formatCode);
}
})
.orElse(ImportFormat.NDJSON);

final String resourceCode = ((CodeType) resourceTypeParam.getValue()).getCode();
final ResourceType resourceType = ResourceType.fromCode(resourceCode);

Expand All @@ -149,17 +167,13 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

// Read the resources from the source URL into a dataset of strings.
final Dataset<String> jsonStrings = readStringsFromUrl(urlParam);

// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
final Dataset<IBaseResource> resources = jsonStrings.map(jsonToResourceConverter(),
fhirEncoder);
final Dataset<Row> rows = readRowsFromUrl(urlParam, format, fhirEncoder);

log.info("Importing {} resources (mode: {})", resourceType.toCode(), importMode.getCode());
if (importMode == ImportMode.OVERWRITE) {
database.overwrite(resourceType, resources.toDF());
database.overwrite(resourceType, rows);
} else {
database.merge(resourceType, resources.toDF());
database.merge(resourceType, rows);
}
}

Expand All @@ -177,22 +191,38 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

@Nonnull
private Dataset<String> readStringsFromUrl(@Nonnull final ParametersParameterComponent urlParam) {
private Dataset<Row> readRowsFromUrl(@Nonnull final ParametersParameterComponent urlParam,
final ImportFormat format, final ExpressionEncoder<IBaseResource> fhirEncoder) {
final String url = ((UrlType) urlParam.getValue()).getValueAsString();
final String decodedUrl = URLDecoder.decode(url, StandardCharsets.UTF_8);
final String convertedUrl = FileSystemPersistence.convertS3ToS3aUrl(decodedUrl);
final Dataset<String> jsonStrings;
final Dataset<Row> rowDataset;
try {
// Check that the user is authorized to execute the operation.
accessRules.ifPresent(ar -> ar.checkCanImportFrom(convertedUrl));
final FilterFunction<String> nonBlanks = s -> !s.isBlank();
jsonStrings = spark.read().textFile(convertedUrl).filter(nonBlanks);

rowDataset = switch (format) {
case NDJSON ->
// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
spark.read()
.textFile(convertedUrl).filter(nonBlanks)
.map(jsonToResourceConverter(), fhirEncoder)
.toDF();
case PARQUET ->
// Use the Spark Parquet reader.
spark.read()
.parquet(convertedUrl);
case DELTA ->
// Use the Delta Lake reader.
DeltaTable.forPath(spark, convertedUrl).toDF();
};
} catch (final SecurityError e) {
throw new InvalidUserInputError("Not allowed to import from URL: " + convertedUrl, e);
} catch (final Exception e) {
throw new InvalidUserInputError("Error reading from URL: " + convertedUrl, e);
}
return jsonStrings;
return rowDataset;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2025 Commonwealth Scientific and Industrial Research
* Organisation (CSIRO) ABN 41 687 119 230.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package au.csiro.pathling.update;

import lombok.Getter;

/**
* Represents the supported formats for resource import.
*/
@Getter
public enum ImportFormat {
/**
* Newline-delimited JSON (NDJSON) format.
*/
NDJSON("ndjson"),
/**
* Parquet format.
*/
PARQUET("parquet"),
/**
* Delta Lake format.
*/
DELTA("delta");

private final String code;

ImportFormat(final String code) {
this.code = code;
}

/**
* Resolve an ImportFormat enum from its string code.
*
* @param code The string code to resolve.
* @return An ImportFormat if a match is found.
* @throws IllegalArgumentException if no match can be found.
*/
public static ImportFormat fromCode(final String code) {
for (final ImportFormat format : ImportFormat.values()) {
if (format.getCode().equalsIgnoreCase(code)) {
return format;
}
}
throw new IllegalArgumentException("Unsupported format: " + code);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
"max": "1",
"documentation": "A value of 'overwrite' will cause all existing resources of the specified type to be deleted and replaced with the contents of the source file. A value of 'merge' will match existing resources with updated resources in the source file based on their ID, and either update the existing resources or add new resources as appropriate. The default value is 'overwrite'.",
"type": "code"
},
{
"name": "format",
"use": "in",
"min": 0,
"max": "1",
"documentation": "Indicates the format of the source file. Possible values are 'ndjson', 'parquet' and 'delta'. The default value is 'ndjson'.",
"type": "code"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,33 @@ class ImportTest extends ModificationTest {

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL jsonURL,
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType) {
final Parameters parameters = new Parameters();
final ParametersParameterComponent sourceParam = parameters.addParameter().setName("source");
sourceParam.addPart().setName("resourceType").setValue(new CodeType(resourceType.toCode()));
sourceParam.addPart().setName("url").setValue(new UrlType(jsonURL.toExternalForm()));
sourceParam.addPart().setName("url").setValue(new UrlType(url.toExternalForm()));
return parameters;
}

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL jsonURL,
@Nonnull final ResourceType resourceType, @Nonnull final ImportMode mode) {
final Parameters parameters = buildImportParameters(jsonURL, resourceType);
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType, @Nonnull final String format) {
final Parameters parameters = new Parameters();
final ParametersParameterComponent sourceParam = parameters.addParameter().setName("source");
sourceParam.addPart().setName("resourceType").setValue(new CodeType(resourceType.toCode()));
sourceParam.addPart().setName("url").setValue(new UrlType(url.toExternalForm()));
sourceParam.addPart().setName("format").setValue(new CodeType(format));
return parameters;
}

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType, @Nonnull final String format,
@Nonnull final ImportMode mode) {
final Parameters parameters = buildImportParameters(url, resourceType, format);
final ParametersParameterComponent sourceParam = parameters.getParameter().stream()
.filter(p -> p.getName().equals("source")).findFirst()
.orElseThrow();
Expand All @@ -96,30 +109,26 @@ Parameters buildImportParameters(@Nonnull final URL jsonURL,
@Test
void importJsonFile() {
final URL jsonURL = getResourceAsUrl("import/Patient.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
final Dataset<Row> expected = new DatasetBuilder(spark)
.withIdColumn()
.withRow("121503c8-9564-4b48-9086-a22df717948e")
.withRow("2b36c1e2-bbe1-45ae-8124-4adad2677702")
.withRow("7001ad9c-34d2-4eb5-8165-5fdc2147f469")
.withRow("8ee183e2-b3c0-4151-be94-b945d6aa8c6d")
.withRow("9360820c-8602-4335-8b50-c88d627a0c20")
.withRow("a7eb2ce7-1075-426c-addd-957b861b0e55")
.withRow("bbd33563-70d9-4f6d-a79a-dd1fc55f5ad9")
.withRow("beff242e-580b-47c0-9844-c1a68c36c5bf")
.withRow("e62e52ae-2d75-4070-a0ae-3cc78d35ed08")
.build();
assertPatientDatasetMatches(result);
}

DatasetAssert.of(result.select("id")).hasRows(expected);
@Test
void importJsonFileUsingDefault() {
final URL jsonURL = getResourceAsUrl("import/Patient.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void mergeJsonFile() {
final URL jsonURL = getResourceAsUrl("import/Patient_updates.ndjson");
importExecutor.execute(
buildImportParameters(jsonURL, ResourceType.PATIENT, ImportMode.MERGE));
buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson", ImportMode.MERGE));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
final Dataset<Row> expected = new DatasetBuilder(spark)
Expand All @@ -143,14 +152,14 @@ void mergeJsonFile() {
@Test
void importJsonFileWithBlankLines() {
final URL jsonURL = getResourceAsUrl("import/Patient_with_eol.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson"));
assertEquals(9, database.read(ResourceType.PATIENT).count());
}

@Test
void importJsonFileWithRecursiveDatatype() {
final URL jsonURL = getResourceAsUrl("import/Questionnaire.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.QUESTIONNAIRE));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.QUESTIONNAIRE, "ndjson"));
final Dataset<Row> questionnaireDataset = database.read(ResourceType.QUESTIONNAIRE);
assertEquals(1, questionnaireDataset.count());

Expand Down Expand Up @@ -182,6 +191,24 @@ void importJsonFileWithRecursiveDatatype() {
DatasetAssert.of(expandedItemsDataset).hasRows(expectedDataset);
}

@Test
void importParquetFile() {
final URL parquetURL = getResourceAsUrl("import/Patient.parquet");
importExecutor.execute(buildImportParameters(parquetURL, ResourceType.PATIENT, "parquet"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void importDeltaFile() {
final URL deltaURL = getResourceAsUrl("import/Patient.delta");
importExecutor.execute(buildImportParameters(deltaURL, ResourceType.PATIENT, "delta"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void throwsOnUnsupportedResourceType() {
final List<ResourceType> resourceTypes = Arrays.asList(ResourceType.PARAMETERS,
Expand All @@ -190,8 +217,8 @@ void throwsOnUnsupportedResourceType() {
for (final ResourceType resourceType : resourceTypes) {
final InvalidUserInputError error = assertThrows(InvalidUserInputError.class,
() -> importExecutor.execute(
buildImportParameters(new URL("file://some/url"),
resourceType)), "Unsupported resource type: " + resourceType.toCode());
buildImportParameters(getResourceAsUrl("import/Patient.ndjson"),
resourceType, "ndjson")), "Unsupported resource type: " + resourceType.toCode());
assertEquals("Unsupported resource type: " + resourceType.toCode(), error.getMessage());
}
}
Expand All @@ -200,11 +227,37 @@ void throwsOnUnsupportedResourceType() {
void throwsOnMissingId() {
final URL jsonURL = getResourceAsUrl("import/Patient_missing_id.ndjson");
final Exception error = assertThrows(Exception.class,
() -> importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT)));
() -> importExecutor.execute(
buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson")));
final BaseServerResponseException convertedError =
ErrorHandlingInterceptor.convertError(error);
assertInstanceOf(InvalidRequestException.class, convertedError);
assertEquals("Encountered a resource with no ID", convertedError.getMessage());
}

@Test
void throwsOnUnsupportedFormat() {
assertThrows(InvalidUserInputError.class,
() -> importExecutor.execute(
buildImportParameters(getResourceAsUrl("import/Patient.ndjson"),
ResourceType.PATIENT, "foo")), "Unsupported format: foo");
}

private void assertPatientDatasetMatches(@Nonnull final Dataset<Row> result) {
final Dataset<Row> expected = new DatasetBuilder(spark)
.withIdColumn()
.withRow("121503c8-9564-4b48-9086-a22df717948e")
.withRow("2b36c1e2-bbe1-45ae-8124-4adad2677702")
.withRow("7001ad9c-34d2-4eb5-8165-5fdc2147f469")
.withRow("8ee183e2-b3c0-4151-be94-b945d6aa8c6d")
.withRow("9360820c-8602-4335-8b50-c88d627a0c20")
.withRow("a7eb2ce7-1075-426c-addd-957b861b0e55")
.withRow("bbd33563-70d9-4f6d-a79a-dd1fc55f5ad9")
.withRow("beff242e-580b-47c0-9844-c1a68c36c5bf")
.withRow("e62e52ae-2d75-4070-a0ae-3cc78d35ed08")
.build();

DatasetAssert.of(result.select("id")).hasRows(expected);
}

}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1736902211723,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"9","numOutputBytes":"131068"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"35a6f069-9b81-4eba-84ca-49caf72a4fab"}}
{"add":{"path":"part-00000-672fe062-8b0c-442b-868c-89803ac2dc1b-c000.snappy.parquet","partitionValues":{},"size":131068,"modificationTime":1736902211147,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":\"121503c8-9564-4b48-9086-a22df717\",\"id_versioned\":\"Patient/121503c8-9564-4b48-9086-\",\"meta\":{},\"text\":{\"status\":\"generated\",\"div\":\"<div xmlns=\\\"http://www.w3.org/19\",\"_fid\":189809872},\"gender\":\"female\",\"birthDate\":\"1957-06-06\",\"deceasedDateTime\":\"1967-06-08T08:27:40+00:00\",\"maritalStatus\":{\"text\":\"M\",\"_fid\":93571182}},\"maxValues\":{\"id\":\"e62e52ae-2d75-4070-a0ae-3cc78d35�\",\"id_versioned\":\"Patient/e62e52ae-2d75-4070-a0ae-�\",\"meta\":{},\"text\":{\"status\":\"generated\",\"div\":\"<div xmlns=\\\"http://www.w3.org/19�\",\"_fid\":2104429834},\"gender\":\"male\",\"birthDate\":\"1998-12-26\",\"deceasedDateTime\":\"2018-02-17T09:11:55+00:00\",\"maritalStatus\":{\"text\":\"S\",\"_fid\":1857434360}},\"nullCount\":{\"id\":0,\"id_versioned\":0,\"meta\":{\"id\":9,\"versionId\":9,\"versionId_versioned\":9,\"lastUpdated\":9,\"source\":9,\"profile\":9,\"security\":9,\"tag\":9,\"_fid\":9},\"implicitRules\":9,\"language\":9,\"text\":{\"id\":9,\"status\":0,\"div\":0,\"_fid\":0},\"identifier\":0,\"active\":9,\"name\":0,\"telecom\":0,\"gender\":0,\"birthDate\":0,\"deceasedBoolean\":9,\"deceasedDateTime\":5,\"address\":0,\"maritalStatus\":{\"id\":9,\"coding\":0,\"text\":0,\"_fid\":0},\"multipleBirthBoolean\":0,\"multipleBirthInteger\":9}}"}}
{"remove":{"path":"part-00000-b3c5e8be-3b6c-4e9c-8d65-eb2ef157cd95-c000.snappy.parquet","deletionTimestamp":1736902211707,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":132841}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Loading
Loading