Skip to content

Commit ed68615

Browse files
committed
Improved performance of lineage tracking
1 parent 34eba21 commit ed68615

File tree

50 files changed

+240
-235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+240
-235
lines changed

vtl-api/src/main/java/it/bancaditalia/oss/vtl/model/data/DataSet.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,11 @@ public default Iterator<DataPoint> iterator()
7979
/**
8080
* Creates a new dataset retaining the specified component along with all identifiers of this dataset
8181
* @param alias The alias of the component to retain.
82+
* @param lineageOp TODO
83+
* @param lineageOperator The lineage enricher of this datapoint.
8284
* @return The projected dataset
8385
*/
84-
public DataSet membership(VTLAlias alias);
86+
public DataSet membership(VTLAlias alias, SerUnaryOperator<Lineage> lineageOp);
8587

8688
/**
8789
* Finds a component with given name
@@ -228,7 +230,7 @@ public <A, T, TT> Stream<T> streamByKeys(Set<DataStructureComponent<Identifier,
228230
*/
229231
public <T extends Map<DataStructureComponent<?, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> VTLValue aggregate(VTLValueMetadata resultMetadata,
230232
Set<DataStructureComponent<Identifier, ?, ?>> keys, SerCollector<DataPoint, ?, T> groupCollector,
231-
SerTriFunction<? super T, ? super Lineage[], ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher);
233+
SerTriFunction<? super T, ? super List<Lineage>, ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher);
232234

233235
/**
234236
* Creates a new DataSet by applying a window function over a component of this DataSet.
@@ -258,21 +260,24 @@ public <T, TT> DataSet analytic(SerUnaryOperator<Lineage> lineageOp, DataStructu
258260
* The datasets must have the same structure, and duplicated datapoints are taken from the leftmost operand
259261
*
260262
* @param others The datasets to perform the union with
263+
* @param lineageOp The lineage operator
261264
* @return The result of the union.
262265
*/
263-
public default DataSet union(SerFunction<DataPoint, Lineage> lineageOp, List<DataSet> others)
266+
public default DataSet union(List<DataSet> others, SerUnaryOperator<Lineage> lineageOp)
264267
{
265-
return union(lineageOp, others, true);
268+
return union(others, lineageOp, true);
266269
}
267270

268271
/**
269272
* Creates a new DataSet as the union of this and other datasets. All the datasets must have the same structure as this DataSet.
270273
* If <code>check</code> is true, the duplicated datapoints are taken only from the leftmost operand.
271274
*
272275
* @param others The datasets to perform the union with
276+
* @param lineageOp The lineage operator
277+
* @param check True if a check of uniqueness of datapoints must be performed.
273278
* @return The result of the union.
274279
*/
275-
public DataSet union(SerFunction<DataPoint, Lineage> lineageOp, List<DataSet> others, boolean check);
280+
public DataSet union(List<DataSet> others, SerUnaryOperator<Lineage> lineageOp, boolean check);
276281

277282
/**
278283
* <b>NOTE</b>: The default implementation traverses this DataSet entirely.

vtl-api/src/main/java/it/bancaditalia/oss/vtl/util/SerCollectors.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ private static <T, K, V, M extends Map<K, V>> SerBiConsumer<M, T> throwingPutter
341341
return (m, t) -> {
342342
final K key = keyMapper.apply(t);
343343
final V value = valueMapper.apply(t);
344-
if (m.putIfAbsent(key, requireNonNull(value)) != null)
344+
if (m.putIfAbsent(requireNonNull(key), requireNonNull(value)) != null)
345345
throw new IllegalStateException(String.format("Duplicate key %s with values %s and %s", key, value, m.get(key)));
346346
};
347347
}

vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkDataSet.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
import it.bancaditalia.oss.vtl.impl.types.dataset.AbstractDataSet;
111111
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
112112
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageExternal;
113-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
114113
import it.bancaditalia.oss.vtl.model.data.Component.Identifier;
115114
import it.bancaditalia.oss.vtl.model.data.Component.Measure;
116115
import it.bancaditalia.oss.vtl.model.data.DataPoint;
@@ -141,7 +140,7 @@ public class SparkDataSet extends AbstractDataSet
141140
{
142141
private static final long serialVersionUID = 1L;
143142
private static final Logger LOGGER = LoggerFactory.getLogger(SparkDataSet.class);
144-
private static final Lineage IGNORED_LINEAGE = LineageNode.of("ignored");
143+
private static final Lineage IGNORED_LINEAGE = LineageExternal.of("ignored");
145144

146145
private final SparkSession session;
147146
private final DataPointEncoder encoder;
@@ -222,7 +221,7 @@ public Dataset<Row> getDataFrame()
222221
}
223222

224223
@Override
225-
public DataSet membership(VTLAlias alias)
224+
public DataSet membership(VTLAlias alias, SerUnaryOperator<Lineage> lineageOp)
226225
{
227226
DataSetMetadata membershipStructure = getMetadata().membership(alias);
228227
LOGGER.debug("Creating dataset by membership on {} from {} to {}", alias, getMetadata(), membershipStructure);
@@ -282,7 +281,7 @@ public DataSet subspace(Map<? extends DataStructureComponent<? extends Identifie
282281

283282
Dataset<Row> filterKeys = broadcast(session.createDataFrame(singletonList(RowFactory.create(values)), createStructType(fields)));
284283
Dataset<Row> joinDF = dataFrame.join(filterKeys, names)
285-
.withColumn("$lineage$", udf((Lineage lineage) -> LineageNode.of("sub " + keyValues, lineage), LineageSparkUDT)
284+
.withColumn("$lineage$", udf((Lineage lineage) -> lineageOperator.apply(lineage), LineageSparkUDT)
286285
.apply(dataFrame.col("$lineage$")))
287286
.drop(names);
288287

@@ -500,7 +499,7 @@ public <T, TT> DataSet analytic(SerUnaryOperator<Lineage> lineageOp, DataStructu
500499
@Override
501500
public <T extends Map<DataStructureComponent<?, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> VTLValue aggregate(VTLValueMetadata metadata,
502501
Set<DataStructureComponent<Identifier, ?, ?>> keys, SerCollector<DataPoint, ?, T> groupCollector,
503-
SerTriFunction<? super T, ? super Lineage[], ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher)
502+
SerTriFunction<? super T, ? super List<Lineage>, ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher)
504503
{
505504
DataSetMetadata structure;
506505
if (metadata.isDataSet())
@@ -516,7 +515,7 @@ public <T, TT> DataSet analytic(SerUnaryOperator<Lineage> lineageOp, DataStructu
516515
MapGroupsFunction<Integer, Row, Row> aggregator = (i, s) -> StreamSupport.stream(new SparkSpliterator(s, bufferSize), !SEQUENTIAL)
517516
.map(encoder::decode)
518517
.collect(teeing(mapping(DataPoint::getLineage, toList()), groupCollector, (l, aggr) -> {
519-
TT finished = finisher.apply(aggr, l.toArray(Lineage[]::new), emptyMap());
518+
TT finished = finisher.apply(aggr, l, emptyMap());
520519
if (finished instanceof DataPoint)
521520
return resultEncoder.encode((DataPoint) finished);
522521

@@ -545,7 +544,7 @@ public <T, TT> DataSet analytic(SerUnaryOperator<Lineage> lineageOp, DataStructu
545544

546545
return StreamSupport.stream(new SparkSpliterator(s, bufferSize), !Utils.SEQUENTIAL).map(encoder::decode)
547546
.collect(teeing(mapping(DataPoint::getLineage, toList()), groupCollector, (l, aggr) ->
548-
resultEncoder.encode((DataPoint) finisher.apply(aggr, l.toArray(Lineage[]::new), keyValues))));
547+
resultEncoder.encode((DataPoint) finisher.apply(aggr, l, keyValues))));
549548
};
550549

551550
aggred = dataFrame.groupBy(keyNames)
@@ -666,7 +665,7 @@ private <A, TT, T> Stream<T> processAsBoolMap(SerCollector<DataPoint, A, TT> gro
666665
}
667666

668667
@Override
669-
public DataSet union(SerFunction<DataPoint, Lineage> lineageOp, List<DataSet> others)
668+
public DataSet union(List<DataSet> others, SerUnaryOperator<Lineage> lineageOp)
670669
{
671670
List<Dataset<Row>> allSparkDs = Stream.concat(Stream.of(this), others.stream())
672671
.map(other -> other instanceof SparkDataSet ? ((SparkDataSet) other) : new SparkDataSet(session, other.getMetadata(), other))

vtl-samples/src/main/java/it/bancaditalia/oss/vtl/impl/data/samples/SampleDataSets.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import it.bancaditalia.oss.vtl.impl.types.dataset.DataPointBuilder;
5959
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
6060
import it.bancaditalia.oss.vtl.impl.types.dataset.StreamWrapperDataSet;
61-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
61+
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageExternal;
6262
import it.bancaditalia.oss.vtl.model.data.Component.Identifier;
6363
import it.bancaditalia.oss.vtl.model.data.DataPoint;
6464
import it.bancaditalia.oss.vtl.model.data.DataSet;
@@ -133,7 +133,7 @@ private static DataSet createSample(SampleVariables variables[])
133133
.map(toEntry(
134134
var -> var.getComponent(counts.get(var.getType()).getAndIncrement()),
135135
var -> SampleValues.getValues(var.getType(), var.getIndex()).get(dpIdx)
136-
)).collect(toDataPoint(LineageNode.of("sample"), structure));
136+
)).collect(toDataPoint(LineageExternal.of("sample"), structure));
137137
}));
138138
}
139139

@@ -151,7 +151,7 @@ protected Stream<DataPoint> streamDataPoints()
151151
{
152152
return dataset.stream()
153153
.filter(dp -> dp.matches(keyValues))
154-
.map(dp -> new DataPointBuilder(dp).delete(keyValues.keySet()).build(LineageNode.of("SUB" + keyValues, dp.getLineage()), newMetadata));
154+
.map(dp -> new DataPointBuilder(dp).delete(keyValues.keySet()).build(lineageOperator.apply(dp.getLineage()), newMetadata));
155155
}
156156
};
157157
}
@@ -171,9 +171,9 @@ public DataSetMetadata getMetadata()
171171
return dataset.getMetadata();
172172
}
173173

174-
public DataSet membership(VTLAlias component)
174+
public DataSet membership(VTLAlias component, SerUnaryOperator<Lineage> lineageOp)
175175
{
176-
return dataset.membership(component);
176+
return dataset.membership(component, lineageOp);
177177
}
178178

179179
public Optional<DataStructureComponent<?, ?, ?>> getComponent(VTLAlias name)
@@ -213,8 +213,9 @@ public <A, T, TT> Stream<T> streamByKeys(Set<DataStructureComponent<Identifier,
213213
}
214214

215215
@Override
216-
public <T extends Map<DataStructureComponent<?, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> VTLValue aggregate(VTLValueMetadata metadata, Set<DataStructureComponent<Identifier, ?, ?>> keys,
217-
SerCollector<DataPoint, ?, T> groupCollector, SerTriFunction<? super T, ? super Lineage[], ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher)
216+
public <T extends Map<DataStructureComponent<?, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> VTLValue aggregate(VTLValueMetadata metadata,
217+
Set<DataStructureComponent<Identifier, ?, ?>> keys, SerCollector<DataPoint, ?, T> groupCollector,
218+
SerTriFunction<? super T, ? super List<Lineage>, ? super Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, TT> finisher)
218219
{
219220
return dataset.aggregate(metadata, keys, groupCollector, finisher);
220221
}
@@ -227,9 +228,9 @@ public <T, TT> DataSet analytic(SerUnaryOperator<Lineage> lineageOp, DataStructu
227228
}
228229

229230
@Override
230-
public DataSet union(SerFunction<DataPoint, Lineage> lineageOp, List<DataSet> others, boolean check)
231+
public DataSet union(List<DataSet> others, SerUnaryOperator<Lineage> lineageOp, boolean check)
231232
{
232-
return dataset.union(lineageOp, others);
233+
return dataset.union(others, lineageOp);
233234
}
234235

235236
@Override

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/VarIDOperand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
*/
2020
package it.bancaditalia.oss.vtl.impl.transform;
2121

22+
import static it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode.lineageEnricher;
2223
import static it.bancaditalia.oss.vtl.util.SerUnaryOperator.identity;
2324
import static java.util.Objects.requireNonNull;
2425

25-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
2626
import it.bancaditalia.oss.vtl.model.data.DataSet;
2727
import it.bancaditalia.oss.vtl.model.data.VTLAlias;
2828
import it.bancaditalia.oss.vtl.model.data.VTLValue;
@@ -49,7 +49,7 @@ public VTLValue eval(TransformationScheme session)
4949
if (vtlValue.isDataSet())
5050
{
5151
DataSet dataset = (DataSet) vtlValue;
52-
vtlValue = dataset.mapKeepingKeys(dataset.getMetadata(), lineage -> LineageNode.of(this, lineage), identity());
52+
vtlValue = dataset.mapKeepingKeys(dataset.getMetadata(), lineageEnricher(this), identity());
5353
}
5454

5555
return vtlValue;

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/aggregation/AggregateTransformation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import it.bancaditalia.oss.vtl.impl.types.dataset.FunctionDataSet;
7575
import it.bancaditalia.oss.vtl.impl.types.domain.EntireBooleanDomainSubset;
7676
import it.bancaditalia.oss.vtl.impl.types.domain.EntireIntegerDomainSubset;
77+
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageCall;
7778
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
7879
import it.bancaditalia.oss.vtl.impl.types.operators.AggregateOperator;
7980
import it.bancaditalia.oss.vtl.model.data.Component;
@@ -207,7 +208,7 @@ else if (measuresMap.size() == 1)
207208
else
208209
throw new IllegalStateException();
209210

210-
return builder.addAll(viralsMap).build(LineageNode.of(this, lineages), structure);
211+
return builder.addAll(viralsMap).build(LineageNode.of(this, LineageCall.of(lineages)), structure);
211212
});
212213

213214
if (having != null)

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/aggregation/HierarchyTransformation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import it.bancaditalia.oss.vtl.impl.types.dataset.DataPointBuilder;
6767
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
6868
import it.bancaditalia.oss.vtl.impl.types.dataset.FunctionDataSet;
69+
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageCall;
6970
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageExternal;
7071
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
7172
import it.bancaditalia.oss.vtl.model.data.CodeItem;
@@ -285,7 +286,7 @@ else if (input == RULE)
285286
for (DataStructureComponent<ViralAttribute, ?, ?> viral: structure.getComponents(ViralAttribute.class))
286287
builder = builder.add(viral, computeViral(virals.get(viral)));
287288

288-
DataPoint dp = builder.build(LineageNode.of(this, lineages), structure);
289+
DataPoint dp = builder.build(LineageNode.of(this, LineageCall.of(lineages)), structure);
289290

290291
// Depending on mode, store the computed dp for use by other rules
291292
if (mode == NON_NULL && !aggResult.isNull()

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/aggregation/RankTransformation.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static it.bancaditalia.oss.vtl.impl.transform.scope.ThisScope.THIS;
2323
import static it.bancaditalia.oss.vtl.impl.transform.util.WindowCriterionImpl.DATAPOINTS_UNBOUNDED_PRECEDING_TO_UNBOUNDED_FOLLOWING;
2424
import static it.bancaditalia.oss.vtl.impl.types.domain.Domains.INTEGERDS;
25+
import static it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode.lineageEnricher;
2526
import static it.bancaditalia.oss.vtl.model.transform.analytic.SortCriterion.SortingMethod.DESC;
2627
import static it.bancaditalia.oss.vtl.util.SerCollectors.collectingAndThen;
2728
import static it.bancaditalia.oss.vtl.util.SerCollectors.toList;
@@ -53,7 +54,6 @@
5354
import it.bancaditalia.oss.vtl.impl.types.data.IntegerValue;
5455
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
5556
import it.bancaditalia.oss.vtl.impl.types.domain.EntireIntegerDomainSubset;
56-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
5757
import it.bancaditalia.oss.vtl.impl.types.operators.PartitionToRank;
5858
import it.bancaditalia.oss.vtl.impl.types.window.RankedPartition;
5959
import it.bancaditalia.oss.vtl.model.data.Component.Identifier;
@@ -81,13 +81,11 @@ public class RankTransformation extends TransformationImpl implements AnalyticTr
8181

8282
private final List<VTLAlias> partitionBy;
8383
private final List<OrderByItem> orderByClause;
84-
private final String lineageDescriptor;
8584

8685
public RankTransformation(List<VTLAlias> partitionBy, List<OrderByItem> orderByClause)
8786
{
8887
this.partitionBy = coalesce(partitionBy, emptyList());
8988
this.orderByClause = coalesce(orderByClause, emptyList());
90-
this.lineageDescriptor = "rank by " + orderByClause.stream().map(OrderByItem::getAlias).map(VTLAlias::toString).collect(joining(" "));
9189
}
9290

9391
@Override
@@ -120,9 +118,9 @@ public VTLValue eval(TransformationScheme scheme)
120118
SerCollector<DataPoint, ?, RankedPartition> collector =
121119
collectingAndThen(toList(PartitionToRank::new), l -> rankPartition(dataset.getMetadata(), orderByAliases, l));
122120

123-
return dataset.analytic(lineage -> LineageNode.of(lineageDescriptor, lineage), measure,
121+
return dataset.analytic(lineageEnricher(this), measure,
124122
INTEGERDS.getDefaultVariable().as(Measure.class), window, identity(), collector, (r, dp) -> finisher(r, dp))
125-
.membership(INTEGERDS.getDefaultVariable().getAlias());
123+
.membership(INTEGERDS.getDefaultVariable().getAlias(), identity());
126124
}
127125

128126
private Collection<ScalarValue<?, ?, ?, ?>> finisher(RankedPartition ranks, DataPoint dp)

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/aggregation/RatioToReportTransformation.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static it.bancaditalia.oss.vtl.impl.transform.scope.ThisScope.THIS;
2323
import static it.bancaditalia.oss.vtl.impl.transform.util.WindowCriterionImpl.DATAPOINTS_UNBOUNDED_PRECEDING_TO_UNBOUNDED_FOLLOWING;
2424
import static it.bancaditalia.oss.vtl.impl.types.data.NumberValueImpl.createNumberValue;
25+
import static it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode.lineageEnricher;
2526
import static it.bancaditalia.oss.vtl.impl.types.operators.AnalyticOperator.SUM;
2627
import static it.bancaditalia.oss.vtl.util.Utils.coalesce;
2728
import static java.util.Collections.emptyList;
@@ -39,13 +40,11 @@
3940
import it.bancaditalia.oss.vtl.impl.transform.UnaryTransformation;
4041
import it.bancaditalia.oss.vtl.impl.transform.util.WindowClauseImpl;
4142
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
42-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
4343
import it.bancaditalia.oss.vtl.model.data.Component.Identifier;
4444
import it.bancaditalia.oss.vtl.model.data.Component.Measure;
4545
import it.bancaditalia.oss.vtl.model.data.DataSet;
4646
import it.bancaditalia.oss.vtl.model.data.DataSetMetadata;
4747
import it.bancaditalia.oss.vtl.model.data.DataStructureComponent;
48-
import it.bancaditalia.oss.vtl.model.data.Lineage;
4948
import it.bancaditalia.oss.vtl.model.data.NumberValue;
5049
import it.bancaditalia.oss.vtl.model.data.ScalarValue;
5150
import it.bancaditalia.oss.vtl.model.data.VTLAlias;
@@ -56,7 +55,6 @@
5655
import it.bancaditalia.oss.vtl.model.transform.analytic.WindowClause;
5756
import it.bancaditalia.oss.vtl.session.MetadataRepository;
5857
import it.bancaditalia.oss.vtl.util.SerBiFunction;
59-
import it.bancaditalia.oss.vtl.util.SerUnaryOperator;
6058

6159
public class RatioToReportTransformation extends UnaryTransformation implements AnalyticTransformation
6260
{
@@ -85,7 +83,6 @@ protected VTLValue evalOnDataset(MetadataRepository repo, DataSet dataset, VTLVa
8583
Set<DataStructureComponent<Identifier, ?, ?>> partitionIDs = dataset.getMetadata().matchIdComponents(partitionBy, "partition by");
8684
Set<DataStructureComponent<Measure, ?, ?>> measures = dataset.getMetadata().getMeasures();
8785

88-
SerUnaryOperator<Lineage> lineageOp = lineage -> LineageNode.of(this, lineage);
8986
WindowClause clause = new WindowClauseImpl(partitionIDs, null, DATAPOINTS_UNBOUNDED_PRECEDING_TO_UNBOUNDED_FOLLOWING);
9087
SerBiFunction<ScalarValue<?, ?, ?, ?>, ScalarValue<?, ?, ?, ?>, Collection<? extends ScalarValue<?, ?, ?, ?>>> finisher = (newV, oldV) -> {
9188
if (newV.isNull() || oldV.isNull())
@@ -97,7 +94,7 @@ else if (newV instanceof NumberValue && oldV instanceof NumberValue)
9794
};
9895

9996
for (DataStructureComponent<Measure, ?, ?> measure: measures)
100-
dataset = dataset.analytic(lineageOp, measure, measure, clause, null, SUM.getReducer(measure.getVariable().getDomain()), finisher);
97+
dataset = dataset.analytic(lineageEnricher(this), measure, measure, clause, null, SUM.getReducer(measure.getVariable().getDomain()), finisher);
10198

10299
return dataset;
103100
}

vtl-transform/src/main/java/it/bancaditalia/oss/vtl/impl/transform/bool/BetweenTransformation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import static it.bancaditalia.oss.vtl.impl.types.domain.Domains.BOOLEAN;
2323
import static it.bancaditalia.oss.vtl.impl.types.domain.Domains.BOOLEANDS;
24+
import static it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode.lineageEnricher;
2425
import static java.util.Collections.singletonMap;
2526

2627
import java.util.Set;
@@ -33,7 +34,6 @@
3334
import it.bancaditalia.oss.vtl.impl.types.data.NullValue;
3435
import it.bancaditalia.oss.vtl.impl.types.dataset.DataStructureBuilder;
3536
import it.bancaditalia.oss.vtl.impl.types.domain.EntireBooleanDomainSubset;
36-
import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode;
3737
import it.bancaditalia.oss.vtl.model.data.Component.Measure;
3838
import it.bancaditalia.oss.vtl.model.data.DataSet;
3939
import it.bancaditalia.oss.vtl.model.data.DataSetMetadata;
@@ -114,7 +114,7 @@ public VTLValueMetadata computeMetadata(TransformationScheme scheme)
114114
protected VTLValue evalOnDataset(MetadataRepository repo, DataSet dataset, VTLValueMetadata metadata)
115115
{
116116
DataStructureComponent<? extends Measure, ?, ?> measure = dataset.getMetadata().getMeasures().iterator().next();
117-
return dataset.mapKeepingKeys((DataSetMetadata) metadata, lineage -> LineageNode.of(this, lineage), dp -> singletonMap(BOOL_VAR, evalOnScalar(repo, dp.get(measure), metadata)));
117+
return dataset.mapKeepingKeys((DataSetMetadata) metadata, lineageEnricher(this), dp -> singletonMap(BOOL_VAR, evalOnScalar(repo, dp.get(measure), metadata)));
118118
}
119119

120120
@Override

0 commit comments

Comments
 (0)