Skip to content

Commit

Permalink
Fixing issues with multi level unnesting in extract implementation an…
Browse files Browse the repository at this point in the history
…d adding a more legible representation of Projection for debugging purposes.
  • Loading branch information
piotrszul committed Jan 17, 2025
1 parent 405c01d commit 9fc7130
Show file tree
Hide file tree
Showing 12 changed files with 512 additions and 96 deletions.
2 changes: 1 addition & 1 deletion fhir-server/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<pattern>[%level] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger level="WARN" name="au.csiro"/>
<logger level="DEBUG" name="au.csiro"/>
<logger level="ERROR" name="org.apache.hadoop.metrics2"/>
<logger level="ERROR" name="org.apache.spark.sql.execution.CacheManager"/>
<logger level="ERROR" name="org.apache.spark.sql.catalyst.util.SparkStringUtils"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static au.csiro.pathling.utilities.Strings.randomAlias;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;

import au.csiro.pathling.QueryExecutor;
Expand Down Expand Up @@ -115,6 +116,8 @@ public Dataset<Row> buildQuery(@Nonnull final ExtractRequest query,
// Build a Projection from the ExtractRequest.
final Projection projection = buildProjection(query, constraint);

log.debug("Executing projection:\n {}", projection.toTreeString());

// Execute the Projection to get the result dataset.
Dataset<Row> result = projection.execute(executionContext);

Expand Down Expand Up @@ -156,7 +159,7 @@ private Projection buildProjection(@Nonnull final ExtractRequest query,
}

@Nonnull
private ProjectionClause buildSelectClause(@Nonnull final List<FhirPath> paths) {
static ProjectionClause buildSelectClause(@Nonnull final List<FhirPath> paths) {
if (paths.isEmpty()) {
throw new IllegalArgumentException("Empty column list");
}
Expand All @@ -171,19 +174,14 @@ private ProjectionClause buildSelectClause(@Nonnull final List<FhirPath> paths)

// Group the paths by their first element. We use a LinkedHashMap to preserve the order.
final Map<FhirPath, List<FhirPath>> groupedPaths = paths.stream()
.collect(groupingBy(FhirPath::first, LinkedHashMap::new, toList()));
.collect(
groupingBy(FhirPath::first, LinkedHashMap::new, mapping(FhirPath::suffix, toList())));

final List<ProjectionClause> selects = groupedPaths.entrySet().stream()
.map(entry -> {
// Take the suffix of each path and build a new ProjectionClause from it. The suffix
// is all the components of the traversal except the first one.
final List<ProjectionClause> tail = entry.getValue().stream()
.map(FhirPath::suffix)
.map(path -> buildSelectClause(List.of(path)))
.collect(toList());
// Create an UnnestingSelection with a base corresponding to the group, and the
// projections representing the suffixes as the components.
return new UnnestingSelection(entry.getKey(), tail, true);
// we need to split the suffixed by aggregated and non aggregated
ProjectionClause tail = buildSelectClause(entry.getValue());
return new UnnestingSelection(entry.getKey(), List.of(tail), true);
})
.collect(toList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package au.csiro.pathling.extract;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;

import au.csiro.pathling.fhirpath.FhirPath;
import au.csiro.pathling.fhirpath.path.Paths;
import au.csiro.pathling.fhirpath.path.Paths.ExternalConstantPath;
import jakarta.annotation.Nonnull;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Value
@Slf4j
public class ImplicitUnnester {

@Nonnull
Tree<FhirPath> unnestPaths(@Nonnull final List<FhirPath> paths) {
//
return Tree.node(
new ExternalConstantPath("%resource"),
unnestPathsInternal(paths)
);
}

/**
* A function that converts a list of FhirPaths to the list of their corresponding expressions.
*
* @param paths The list of FhirPaths to convert.
* @return The list of expressions.
*/
@Nonnull
static List<String> asExpressions(@Nonnull final List<FhirPath> paths) {
return paths.stream().map(FhirPath::toExpression).toList();
}

@Nonnull
List<Tree<FhirPath>> unnestPathsInternal(@Nonnull final List<FhirPath> paths) {
log.trace("Unnesting paths: {}", asExpressions(paths));
if (paths.isEmpty()) {
return Collections.emptyList();
} else if (paths.size() == 1 && paths.get(0).isNull()) {
return Collections.emptyList();
} else if (paths.size() == 1) {
return List.of(Tree.Leaf.of(paths.get(0)));
} else {
final Map<FhirPath, List<FhirPath>> groupedPaths = paths.stream()
.collect(
groupingBy(FhirPath::first, LinkedHashMap::new, mapping(FhirPath::suffix, toList())));
return groupedPaths.entrySet().stream()
.flatMap(entry -> {
// identify suffices that are aggregate functions and must not be unnested
final List<FhirPath> aggSuffixes = entry.getValue().stream()
.filter(ImplicitUnnester::isAggregate)
.toList();
// for each of the tree nodes append the current head to the path
final List<Tree<FhirPath>> aggNodes = unnestPathsInternal(aggSuffixes)
.stream()
.map(tn -> tn.mapValue(v -> entry.getKey().andThen(v))).toList();
// identify suffices that need to be unnested
final List<FhirPath> suffixesToUnnest = entry.getValue().stream()
.filter(s -> !ImplicitUnnester.isAggregate(s))
.toList();
// if needed wrap sub-trees in an unnesting node
final Stream<Tree<FhirPath>> unnestedNodesStream =
suffixesToUnnest.isEmpty()
? Stream.empty()
: Stream.of(Tree.node(entry.getKey(), unnestPathsInternal(suffixesToUnnest)));
final List<Tree<FhirPath>> unnestNodes = unnestedNodesStream.toList();
return Stream.concat(unnestNodes.stream(), aggNodes.stream());
}
).toList();
}
}

private final static Set<String> AGG_FUNCTIONS = Set.of("count", "sum", "first", "exists");

static boolean isAggregate(@Nonnull final FhirPath path) {
return (path.first() instanceof Paths.EvalFunction evalFunction)
&& AGG_FUNCTIONS.contains(evalFunction.getFunctionIdentifier());
}

}
102 changes: 102 additions & 0 deletions fhirpath/src/main/java/au/csiro/pathling/extract/Tree.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package au.csiro.pathling.extract;

import jakarta.annotation.Nonnull;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Value;

public interface Tree<T> {

@Nonnull
T getValue();

@Nonnull
List<Tree<T>> getChildren();

@Nonnull
default Stream<Tree<T>> streamChildren() {
return getChildren().stream();
}

@Nonnull
String toTreeString(final int level,
@Nonnull final Function<T, String> formatter);

@Nonnull
default String toTreeString() {
return toTreeString(0, Object::toString);
}

@Nonnull
default String toTreeString(@Nonnull final Function<T, String> formatter) {
return toTreeString(0, formatter);
}

@Nonnull
default <R> Tree<R> map(@Nonnull final Function<T, R> mapper) {
return Tree.node(mapper.apply(getValue()),
streamChildren().map(child -> child.map(mapper)).collect(Collectors.toList()));
}

/**
* Map the value of this node only.
* @param mapper The mapping function.
* @return A new tree with the value of this node mapped.
*/
@Nonnull
default Tree<T> mapValue(@Nonnull final Function<T, T> mapper) {
return Tree.node(mapper.apply(getValue()),
streamChildren().toList());
}


@Value(staticConstructor = "of")
class Node<T> implements Tree<T> {

T value;
List<Tree<T>> children;

@Override
@Nonnull
public String toTreeString(final int level, @Nonnull final Function<T, String> formatter) {
return " ".repeat(level) + formatter.apply(value) + "\n" +
streamChildren()
.map(child -> child.toTreeString(level + 1, formatter))
.collect(Collectors.joining("\n"));
}
}

@Value(staticConstructor = "of")
class Leaf<T> implements Tree<T> {

T value;

@Override
@Nonnull
public List<Tree<T>> getChildren() {
return Collections.emptyList();
}

@Override
@Nonnull
public String toTreeString(final int level, @Nonnull final Function<T, String> formatter) {
return " ".repeat(level) + formatter.apply(value);
}
}

@Nonnull
static <T> Tree<T> node(@Nonnull final T value, @Nonnull final List<Tree<T>> children) {
return children.isEmpty()
? Leaf.of(value)
: Node.of(value, children);
}

@SafeVarargs
@Nonnull
static <T> Tree<T> node(@Nonnull final T value, @Nonnull final Tree<T>... children) {
return node(value, List.of(children));
}
}
Loading

0 comments on commit 9fc7130

Please sign in to comment.