Skip to content

Commit

Permalink
Merge pull request #2064 from aehrc/release/7.1.0
Browse files Browse the repository at this point in the history
v7.1.0
  • Loading branch information
johngrimes authored Feb 9, 2025
2 parents 02243f6 + 66d1820 commit 0d177fc
Show file tree
Hide file tree
Showing 24 changed files with 257 additions and 56 deletions.
5 changes: 0 additions & 5 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ updates:
schedule:
interval: daily
open-pull-requests-limit: 50
- package-ecosystem: npm
directory: "/site"
schedule:
interval: daily
open-pull-requests-limit: 50
- package-ecosystem: pip
directory: "/lib/python"
schedule:
Expand Down
13 changes: 7 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ jobs:
with:
# This is required so that git-commit-id-plugin can find the latest tag.
fetch-depth: 0
submodules: recursive
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down Expand Up @@ -547,7 +548,7 @@ jobs:
restore-keys: |-
${{ runner.os }}-maven-
- name: Cache R packages
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ${{ runner.temp }}/Library
key: r-packages-${{ runner.os }}-${{ hashFiles('lib/R/DESCRIPTION.src') }}
Expand All @@ -560,7 +561,7 @@ jobs:
run: echo "HADOOP_VERSION=$(mvn help:evaluate -Dexpression=pathling.Rapi.hadoopMajorVersion -q -DforceStdout)" >> $GITHUB_ENV
- name: Cache Spark
id: cache-spark
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: /home/runner/spark/spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}
key: spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}
Expand Down Expand Up @@ -591,7 +592,7 @@ jobs:
${{ env.PATHLING_OPTS }}
timeout-minutes: 60
- name: Upload check logs
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
if: always()
with:
name: r-check-logs
Expand All @@ -600,7 +601,7 @@ jobs:
lib/R/target/pathling.Rcheck/*.out
lib/R/target/pathling.Rcheck/*.fail
- name: Upload package as artifact
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: r-package
path: lib/R/target/pathling_*.tar.gz
Expand Down Expand Up @@ -634,7 +635,7 @@ jobs:
restore-keys: |-
${{ runner.os }}-maven-
- name: Cache R packages
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ${{ runner.temp }}/Library
key: r-packages-${{ runner.os }}-${{ hashFiles('lib/R/DESCRIPTION.src') }}
Expand All @@ -647,7 +648,7 @@ jobs:
run: echo "HADOOP_VERSION=$(mvn help:evaluate -Dexpression=pathling.Rapi.hadoopMajorVersion -q -DforceStdout)" >> $GITHUB_ENV
- name: Cache Spark
id: cache-spark
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: /home/runner/spark/spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}
key: spark-${{ env.SPARK_VERSION }}-bin-hadoop${{ env.HADOOP_VERSION }}
Expand Down
9 changes: 9 additions & 0 deletions .trivyignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,12 @@ CVE-2022-1471
# This vulnerability relates to privilege management using spark-submit, which is outside of the
# scope of this solution.
CVE-2023-22946


# This vulnerability relates to WebFlex applications, which are not used in Pathling.
CVE-2024-38821

# This vulnerability relates to avro support in hadoop client runtime, which is not shipped
# with pathling but with Spark. The server docker image which used this version does not use
# avro.
CVE-2024-47561
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import au.csiro.pathling.io.FileSystemPersistence;
import au.csiro.pathling.io.ImportMode;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import io.delta.tables.DeltaTable;
import jakarta.annotation.Nonnull;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
Expand All @@ -39,6 +40,7 @@
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand All @@ -50,6 +52,7 @@
import org.hl7.fhir.r4.model.OperationOutcome.OperationOutcomeIssueComponent;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.UrlType;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -110,7 +113,7 @@ public ImportExecutor(@Nonnull final SparkSession spark,
public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParams) {
// Parse and validate the JSON request.
final List<ParametersParameterComponent> sourceParams = inParams.getParameter().stream()
.filter(param -> "source".equals(param.getName())).collect(Collectors.toList());
.filter(param -> "source".equals(param.getName())).toList();
if (sourceParams.isEmpty()) {
throw new InvalidUserInputError("Must provide at least one source parameter");
}
Expand All @@ -137,6 +140,21 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
.map(param -> ImportMode.fromCode(
((CodeType) param.getValue()).asStringValue()))
.orElse(ImportMode.OVERWRITE);

// Get the serialized resource type from the source parameter.
final ImportFormat format = sourceParam.getPart().stream()
.filter(param -> "format".equals(param.getName()))
.findFirst()
.map(param -> {
final String formatCode = ((StringType) param.getValue()).getValue();
try {
return ImportFormat.fromCode(formatCode);
} catch (final IllegalArgumentException e) {
throw new InvalidUserInputError("Unsupported format: " + formatCode);
}
})
.orElse(ImportFormat.NDJSON);

final String resourceCode = ((CodeType) resourceTypeParam.getValue()).getCode();
final ResourceType resourceType = ResourceType.fromCode(resourceCode);

Expand All @@ -149,17 +167,13 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

// Read the resources from the source URL into a dataset of strings.
final Dataset<String> jsonStrings = readStringsFromUrl(urlParam);

// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
final Dataset<IBaseResource> resources = jsonStrings.map(jsonToResourceConverter(),
fhirEncoder);
final Dataset<Row> rows = readRowsFromUrl(urlParam, format, fhirEncoder);

log.info("Importing {} resources (mode: {})", resourceType.toCode(), importMode.getCode());
if (importMode == ImportMode.OVERWRITE) {
database.overwrite(resourceType, resources.toDF());
database.overwrite(resourceType, rows);
} else {
database.merge(resourceType, resources.toDF());
database.merge(resourceType, rows);
}
}

Expand All @@ -177,22 +191,38 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

@Nonnull
private Dataset<String> readStringsFromUrl(@Nonnull final ParametersParameterComponent urlParam) {
private Dataset<Row> readRowsFromUrl(@Nonnull final ParametersParameterComponent urlParam,
final ImportFormat format, final ExpressionEncoder<IBaseResource> fhirEncoder) {
final String url = ((UrlType) urlParam.getValue()).getValueAsString();
final String decodedUrl = URLDecoder.decode(url, StandardCharsets.UTF_8);
final String convertedUrl = FileSystemPersistence.convertS3ToS3aUrl(decodedUrl);
final Dataset<String> jsonStrings;
final Dataset<Row> rowDataset;
try {
// Check that the user is authorized to execute the operation.
accessRules.ifPresent(ar -> ar.checkCanImportFrom(convertedUrl));
final FilterFunction<String> nonBlanks = s -> !s.isBlank();
jsonStrings = spark.read().textFile(convertedUrl).filter(nonBlanks);

rowDataset = switch (format) {
case NDJSON ->
// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
spark.read()
.textFile(convertedUrl).filter(nonBlanks)
.map(jsonToResourceConverter(), fhirEncoder)
.toDF();
case PARQUET ->
// Use the Spark Parquet reader.
spark.read()
.parquet(convertedUrl);
case DELTA ->
// Use the Delta Lake reader.
DeltaTable.forPath(spark, convertedUrl).toDF();
};
} catch (final SecurityError e) {
throw new InvalidUserInputError("Not allowed to import from URL: " + convertedUrl, e);
} catch (final Exception e) {
throw new InvalidUserInputError("Error reading from URL: " + convertedUrl, e);
}
return jsonStrings;
return rowDataset;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2025 Commonwealth Scientific and Industrial Research
* Organisation (CSIRO) ABN 41 687 119 230.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package au.csiro.pathling.update;

import lombok.Getter;

/**
* Represents the supported formats for resource import.
*/
@Getter
public enum ImportFormat {
/**
* Newline-delimited JSON (NDJSON) format.
*/
NDJSON("ndjson"),
/**
* Parquet format.
*/
PARQUET("parquet"),
/**
* Delta Lake format.
*/
DELTA("delta");

private final String code;

ImportFormat(final String code) {
this.code = code;
}

/**
* Resolve an ImportFormat enum from its string code.
*
* @param code The string code to resolve.
* @return An ImportFormat if a match is found.
* @throws IllegalArgumentException if no match can be found.
*/
public static ImportFormat fromCode(final String code) {
for (final ImportFormat format : ImportFormat.values()) {
if (format.getCode().equalsIgnoreCase(code)) {
return format;
}
}
throw new IllegalArgumentException("Unsupported format: " + code);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
"max": "1",
"documentation": "A value of 'overwrite' will cause all existing resources of the specified type to be deleted and replaced with the contents of the source file. A value of 'merge' will match existing resources with updated resources in the source file based on their ID, and either update the existing resources or add new resources as appropriate. The default value is 'overwrite'.",
"type": "code"
},
{
"name": "format",
"use": "in",
"min": 0,
"max": "1",
"documentation": "Indicates the format of the source file. Possible values are 'ndjson', 'parquet' and 'delta'. The default value is 'ndjson'.",
"type": "code"
}
]
}
Expand Down
Loading

0 comments on commit 0d177fc

Please sign in to comment.