diff --git a/pom.xml b/pom.xml index ae8a25f..860c2a2 100644 --- a/pom.xml +++ b/pom.xml @@ -145,7 +145,7 @@ com.yahoo.datasketches sketches-core - 0.10.3 + 0.11.0 diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketch.java index 31b5c6c..222f9bf 100644 --- a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketch.java +++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketch.java @@ -46,7 +46,9 @@ public Tuple exec(final Tuple input) throws IOException { } splitPoints[i - 1] = (double) input.get(i); } - return Util.doubleArrayToTuple(sketch.getPMF(splitPoints)); + final double[] pmf = sketch.getPMF(splitPoints); + if (pmf == null) { return null; } + return Util.doubleArrayToTuple(pmf); } } diff --git a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketch.java b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketch.java index 8515f60..8707453 100644 --- a/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketch.java +++ b/src/main/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketch.java @@ -49,7 +49,9 @@ public Tuple exec(final Tuple input) throws IOException { } splitPoints[i - 1] = (String) input.get(i); } - return Util.doubleArrayToTuple(sketch.getPMF(splitPoints)); + final double[] pmf = sketch.getPMF(splitPoints); + if (pmf == null) { return null; } + return Util.doubleArrayToTuple(pmf); } } diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketch.java b/src/main/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketch.java index e5f9c7c..ecb934b 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketch.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketch.java @@ -8,7 +8,9 @@ import org.apache.pig.Algebraic; import com.yahoo.sketches.tuple.DoubleSummary; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; import com.yahoo.sketches.tuple.DoubleSummaryFactory; +import com.yahoo.sketches.tuple.DoubleSummarySetOperations; /** * This UDF creates a Sketch<DoubleSummary> from raw data. @@ -86,7 +88,7 @@ public static class IntermediateFinal * Default sketch size and default mode */ public IntermediateFinal() { - super(new DoubleSummaryFactory()); + super(new DoubleSummaryFactory(), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer()); } /** @@ -95,7 +97,8 @@ public IntermediateFinal() { * @param sketchSize String representation of sketch size */ public IntermediateFinal(final String sketchSize) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory()); + super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(), new DoubleSummarySetOperations(), + new DoubleSummaryDeserializer()); } /** @@ -105,7 +108,9 @@ public IntermediateFinal(final String sketchSize) { * @param summaryMode String representation of mode (sum, min or max) */ public IntermediateFinal(final String sketchSize, final String summaryMode) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode))); + super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)), + new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)), + new DoubleSummaryDeserializer()); } } diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/DataToSketchAlgebraicIntermediateFinal.java b/src/main/java/com/yahoo/sketches/pig/tuple/DataToSketchAlgebraicIntermediateFinal.java index a25434e..878d9d3 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/DataToSketchAlgebraicIntermediateFinal.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/DataToSketchAlgebraicIntermediateFinal.java @@ -16,7 +16,9 @@ import org.apache.pig.data.Tuple; import com.yahoo.sketches.tuple.Sketch; +import com.yahoo.sketches.tuple.SummaryDeserializer; import com.yahoo.sketches.tuple.SummaryFactory; +import com.yahoo.sketches.tuple.SummarySetOperations; import com.yahoo.sketches.tuple.Union; import com.yahoo.sketches.tuple.UpdatableSketch; import com.yahoo.sketches.tuple.UpdatableSketchBuilder; @@ -34,44 +36,56 @@ public abstract class DataToSketchAlgebraicIntermediateFinal> extends EvalFunc { private final int sketchSize_; - private final SummaryFactory summaryFactory_; + private final SummarySetOperations summarySetOps_; + private final SummaryDeserializer summaryDeserializer_; private final UpdatableSketchBuilder sketchBuilder_; private boolean isFirstCall_ = true; /** - * Constructs a function given a summary factory, default sketch size and default - * sampling probability of 1. + * Constructs a function given a summary factory, summary set operations, summary deserializer, + * default sketch size and default sampling probability of 1. * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperaions + * @param summaryDeserializer an instance of SummaryDeserializer */ - public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory summaryFactory) { - this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory); + public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory summaryFactory, + final SummarySetOperations summarySetOps, final SummaryDeserializer summaryDeserializer) { + this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory, summarySetOps, summaryDeserializer); } /** - * Constructs a function given a sketch size, summary factory and default - * sampling probability of 1. + * Constructs a function given a sketch size, summary factory, summary set operations, + * summary deserializer and default sampling probability of 1. * @param sketchSize parameter controlling the size of the sketch and the accuracy. * It represents nominal number of entries in the sketch. Forced to the nearest power of 2 * greater than given value. * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperaions + * @param summaryDeserializer an instance of SummaryDeserializer */ public DataToSketchAlgebraicIntermediateFinal(final int sketchSize, - final SummaryFactory summaryFactory) { - this(sketchSize, 1f, summaryFactory); + final SummaryFactory summaryFactory, final SummarySetOperations summarySetOps, + final SummaryDeserializer summaryDeserializer) { + this(sketchSize, 1f, summaryFactory, summarySetOps, summaryDeserializer); } /** - * Constructs a function given a sketch size, sampling probability and summary factory + * Constructs a function given a sketch size, sampling probability, summary factory, + * summary set operations and summary deserializer * @param sketchSize parameter controlling the size of the sketch and the accuracy. * It represents nominal number of entries in the sketch. Forced to the nearest power of 2 * greater than given value. * @param samplingProbability parameter from 0 to 1 inclusive * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperaions + * @param summaryDeserializer an instance of SummaryDeserializer */ - public DataToSketchAlgebraicIntermediateFinal(final int sketchSize, - final float samplingProbability, final SummaryFactory summaryFactory) { + public DataToSketchAlgebraicIntermediateFinal(final int sketchSize, final float samplingProbability, + final SummaryFactory summaryFactory, final SummarySetOperations summarySetOps, + final SummaryDeserializer summaryDeserializer) { sketchSize_ = sketchSize; - summaryFactory_ = summaryFactory; + summarySetOps_ = summarySetOps; + summaryDeserializer_ = summaryDeserializer; sketchBuilder_ = new UpdatableSketchBuilder(summaryFactory) .setNominalEntries(sketchSize).setSamplingProbability(samplingProbability); } @@ -83,7 +97,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException { Logger.getLogger(getClass()).info("algebraic is used"); isFirstCall_ = false; } - final Union union = new Union(sketchSize_, summaryFactory_); + final Union union = new Union(sketchSize_, summarySetOps_); final DataBag bag = (DataBag) inputTuple.get(0); if (bag == null) { @@ -102,7 +116,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException { // This is a sketch from a prior call to the // Intermediate function. merge it with the // current sketch. - final Sketch incomingSketch = Util.deserializeSketchFromTuple(dataTuple); + final Sketch incomingSketch = Util.deserializeSketchFromTuple(dataTuple, summaryDeserializer_); union.update(incomingSketch); } else { // we should never get here. diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToEstimates.java b/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToEstimates.java index 5ff299f..c3b61d3 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToEstimates.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToEstimates.java @@ -14,9 +14,11 @@ import com.yahoo.memory.Memory; import com.yahoo.sketches.tuple.DoubleSummary; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; import com.yahoo.sketches.tuple.Sketch; import com.yahoo.sketches.tuple.SketchIterator; import com.yahoo.sketches.tuple.Sketches; +import com.yahoo.sketches.tuple.SummaryDeserializer; /** * This UDF converts a Sketch<DoubleSummary> to estimates. @@ -29,6 +31,9 @@ */ public class DoubleSummarySketchToEstimates extends EvalFunc { + private static final SummaryDeserializer SUMMARY_DESERIALIZER = + new DoubleSummaryDeserializer(); + @Override public Tuple exec(final Tuple input) throws IOException { if ((input == null) || (input.size() == 0)) { @@ -36,7 +41,8 @@ public Tuple exec(final Tuple input) throws IOException { } final DataByteArray dba = (DataByteArray) input.get(0); - final Sketch sketch = Sketches.heapifySketch(Memory.wrap(dba.get())); + final Sketch sketch = Sketches.heapifySketch( + Memory.wrap(dba.get()), SUMMARY_DESERIALIZER); final Tuple output = TupleFactory.getInstance().newTuple(2); output.set(0, sketch.getEstimate()); diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentile.java b/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentile.java index 821d18e..5b6c9be 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentile.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentile.java @@ -15,9 +15,11 @@ import com.yahoo.sketches.quantiles.DoublesSketch; import com.yahoo.sketches.quantiles.UpdateDoublesSketch; import com.yahoo.sketches.tuple.DoubleSummary; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; import com.yahoo.sketches.tuple.Sketch; import com.yahoo.sketches.tuple.SketchIterator; import com.yahoo.sketches.tuple.Sketches; +import com.yahoo.sketches.tuple.SummaryDeserializer; /** * This UDF is to get a percentile value from a Sketch<DoubleSummary>. @@ -29,6 +31,8 @@ */ public class DoubleSummarySketchToPercentile extends EvalFunc { + private static final SummaryDeserializer SUMMARY_DESERIALIZER = + new DoubleSummaryDeserializer(); private static final int QUANTILES_SKETCH_SIZE = 1024; @Override @@ -38,7 +42,8 @@ public Double exec(final Tuple input) throws IOException { } final DataByteArray dba = (DataByteArray) input.get(0); - final Sketch sketch = Sketches.heapifySketch(Memory.wrap(dba.get())); + final Sketch sketch = Sketches.heapifySketch( + Memory.wrap(dba.get()), SUMMARY_DESERIALIZER); final double percentile = (double) input.get(1); if ((percentile < 0) || (percentile > 100)) { diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketch.java b/src/main/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketch.java index 96454ae..09206c2 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketch.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketch.java @@ -8,7 +8,8 @@ import org.apache.pig.Algebraic; import com.yahoo.sketches.tuple.DoubleSummary; -import com.yahoo.sketches.tuple.DoubleSummaryFactory; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; +import com.yahoo.sketches.tuple.DoubleSummarySetOperations; /** * This is to union Sketch<DoubleSummary>. @@ -20,7 +21,7 @@ public class UnionDoubleSummarySketch extends UnionSketch impleme * Constructor with default sketch size and default mode (sum) */ public UnionDoubleSummarySketch() { - super(new DoubleSummaryFactory()); + super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer()); } /** @@ -28,7 +29,7 @@ public UnionDoubleSummarySketch() { * @param sketchSize String representation of sketch size */ public UnionDoubleSummarySketch(final String sketchSize) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory()); + super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer()); } /** @@ -37,7 +38,9 @@ public UnionDoubleSummarySketch(final String sketchSize) { * @param summaryMode String representation of mode (sum, min or max) */ public UnionDoubleSummarySketch(final String sketchSize, final String summaryMode) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode))); + super(Integer.parseInt(sketchSize), + new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)), + new DoubleSummaryDeserializer()); } @Override @@ -84,7 +87,7 @@ public static class IntermediateFinal extends UnionSketchAlgebraicIntermediateFi * Default sketch size and default mode. */ public IntermediateFinal() { - super(new DoubleSummaryFactory()); + super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer()); } /** @@ -93,7 +96,7 @@ public IntermediateFinal() { * @param sketchSize String representation of sketch size */ public IntermediateFinal(final String sketchSize) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory()); + super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer()); } /** @@ -103,7 +106,9 @@ public IntermediateFinal(final String sketchSize) { * @param summaryMode String representation of mode (sum, min or max) */ public IntermediateFinal(final String sketchSize, final String summaryMode) { - super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode))); + super(Integer.parseInt(sketchSize), + new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)), + new DoubleSummaryDeserializer()); } } diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketch.java b/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketch.java index d42582f..b136c9d 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketch.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketch.java @@ -20,7 +20,8 @@ import com.yahoo.sketches.tuple.Sketch; import com.yahoo.sketches.tuple.Sketches; import com.yahoo.sketches.tuple.Summary; -import com.yahoo.sketches.tuple.SummaryFactory; +import com.yahoo.sketches.tuple.SummaryDeserializer; +import com.yahoo.sketches.tuple.SummarySetOperations; import com.yahoo.sketches.tuple.Union; /** @@ -29,29 +30,35 @@ */ public abstract class UnionSketch extends EvalFunc implements Accumulator { private final int sketchSize_; - private final SummaryFactory summaryFactory_; + private final SummarySetOperations summarySetOps_; + private final SummaryDeserializer summaryDeserializer_; private Union union_; private boolean isFirstCall_ = true; /** - * Constructs a function given a summary factory and default sketch size - * @param summaryFactory an instance of SummaryFactory + * Constructs a function given a summary set operations, summary deserializer and default sketch size + * @param summarySetOps an instance of SummarySetOperations + * @param summaryDeserializer an instance of SummaryDeserializer */ - public UnionSketch(final SummaryFactory summaryFactory) { - this(DEFAULT_NOMINAL_ENTRIES, summaryFactory); + public UnionSketch(final SummarySetOperations summarySetOps, + final SummaryDeserializer summaryDeserializer) { + this(DEFAULT_NOMINAL_ENTRIES, summarySetOps, summaryDeserializer); } /** - * Constructs a function given a sketch size and summary factory + * Constructs a function given a sketch size, summary set operations and summary deserializer * @param sketchSize parameter controlling the size of the sketch and the accuracy. * It represents nominal number of entries in the sketch. Forced to the nearest power of 2 * greater than given value. - * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperations + * @param summaryDeserializer an instance of SummaryDeserializer */ - public UnionSketch(final int sketchSize, final SummaryFactory summaryFactory) { + public UnionSketch(final int sketchSize, final SummarySetOperations summarySetOps, + final SummaryDeserializer summaryDeserializer) { super(); - this.sketchSize_ = sketchSize; - this.summaryFactory_ = summaryFactory; + sketchSize_ = sketchSize; + summarySetOps_ = summarySetOps; + summaryDeserializer_ = summaryDeserializer; } @Override @@ -65,8 +72,8 @@ public Tuple exec(final Tuple inputTuple) throws IOException { return null; } final DataBag bag = (DataBag) inputTuple.get(0); - final Union union = new Union(sketchSize_, summaryFactory_); - updateUnion(bag, union); + final Union union = new Union(sketchSize_, summarySetOps_); + updateUnion(bag, union, summaryDeserializer_); return Util.tupleFactory.newTuple(new DataByteArray(union.getResult().toByteArray())); } @@ -83,9 +90,9 @@ public void accumulate(final Tuple inputTuple) throws IOException { final DataBag bag = (DataBag) inputTuple.get(0); if (bag == null || bag.size() == 0) { return; } if (union_ == null) { - union_ = new Union(sketchSize_, summaryFactory_); + union_ = new Union(sketchSize_, summarySetOps_); } - updateUnion(bag, union_); + updateUnion(bag, union_, summaryDeserializer_); } @Override @@ -101,13 +108,13 @@ public void cleanup() { if (union_ != null) { union_.reset(); } } - private static void updateUnion(final DataBag bag, final Union union) - throws ExecException { + private static void updateUnion(final DataBag bag, final Union union, + final SummaryDeserializer summaryDeserializer) throws ExecException { for (final Tuple innerTuple: bag) { if ((innerTuple.size() != 1) || (innerTuple.get(0) == null)) { continue; } - final Sketch incomingSketch = Util.deserializeSketchFromTuple(innerTuple); + final Sketch incomingSketch = Util.deserializeSketchFromTuple(innerTuple, summaryDeserializer); union.update(incomingSketch); } } diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketchAlgebraicIntermediateFinal.java b/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketchAlgebraicIntermediateFinal.java index df73e5c..c545254 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketchAlgebraicIntermediateFinal.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/UnionSketchAlgebraicIntermediateFinal.java @@ -17,7 +17,8 @@ import com.yahoo.sketches.tuple.Sketch; import com.yahoo.sketches.tuple.Summary; -import com.yahoo.sketches.tuple.SummaryFactory; +import com.yahoo.sketches.tuple.SummaryDeserializer; +import com.yahoo.sketches.tuple.SummarySetOperations; import com.yahoo.sketches.tuple.Union; /** @@ -31,15 +32,18 @@ */ public abstract class UnionSketchAlgebraicIntermediateFinal extends EvalFunc { private final int sketchSize_; - private final SummaryFactory summaryFactory_; + private final SummarySetOperations summarySetOps_; + private final SummaryDeserializer summaryDeserializer_; private boolean isFirstCall_ = true; /** * Constructs a function given a summary factory and default sketch size - * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperations + * @param summaryDeserializer an instance of SummaryDeserializer */ - public UnionSketchAlgebraicIntermediateFinal(final SummaryFactory summaryFactory) { - this(DEFAULT_NOMINAL_ENTRIES, summaryFactory); + public UnionSketchAlgebraicIntermediateFinal(final SummarySetOperations summarySetOps, + final SummaryDeserializer summaryDeserializer) { + this(DEFAULT_NOMINAL_ENTRIES, summarySetOps, summaryDeserializer); } /** @@ -47,11 +51,14 @@ public UnionSketchAlgebraicIntermediateFinal(final SummaryFactory summaryFact * @param sketchSize parameter controlling the size of the sketch and the accuracy. * It represents nominal number of entries in the sketch. Forced to the nearest power of 2 * greater than given value. - * @param summaryFactory an instance of SummaryFactory + * @param summarySetOps an instance of SummarySetOperations + * @param summaryDeserializer an instance of SummaryDeserializer */ - public UnionSketchAlgebraicIntermediateFinal(final int sketchSize, final SummaryFactory summaryFactory) { - this.sketchSize_ = sketchSize; - this.summaryFactory_ = summaryFactory; + public UnionSketchAlgebraicIntermediateFinal(final int sketchSize, + final SummarySetOperations summarySetOps, final SummaryDeserializer summaryDeserializer) { + sketchSize_ = sketchSize; + summarySetOps_ = summarySetOps; + summaryDeserializer_ = summaryDeserializer; } @Override @@ -61,7 +68,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException { Logger.getLogger(getClass()).info("algebraic is used"); isFirstCall_ = false; } - final Union union = new Union(sketchSize_, summaryFactory_); + final Union union = new Union(sketchSize_, summarySetOps_); final DataBag bag = (DataBag) inputTuple.get(0); if (bag == null) { @@ -73,13 +80,13 @@ public Tuple exec(final Tuple inputTuple) throws IOException { if (item instanceof DataBag) { // this is from a prior call to the initial function, so there is a nested bag. for (Tuple innerTuple: (DataBag) item) { - final Sketch incomingSketch = Util.deserializeSketchFromTuple(innerTuple); + final Sketch incomingSketch = Util.deserializeSketchFromTuple(innerTuple, summaryDeserializer_); union.update(incomingSketch); } } else if (item instanceof DataByteArray) { // This is a sketch from a call to the Intermediate function // Add it to the current union. - final Sketch incomingSketch = Util.deserializeSketchFromTuple(dataTuple); + final Sketch incomingSketch = Util.deserializeSketchFromTuple(dataTuple, summaryDeserializer_); union.update(incomingSketch); } else { // we should never get here. diff --git a/src/main/java/com/yahoo/sketches/pig/tuple/Util.java b/src/main/java/com/yahoo/sketches/pig/tuple/Util.java index 50500be..7a557a9 100644 --- a/src/main/java/com/yahoo/sketches/pig/tuple/Util.java +++ b/src/main/java/com/yahoo/sketches/pig/tuple/Util.java @@ -14,6 +14,7 @@ import com.yahoo.sketches.tuple.Sketch; import com.yahoo.sketches.tuple.Sketches; import com.yahoo.sketches.tuple.Summary; +import com.yahoo.sketches.tuple.SummaryDeserializer; final class Util { @@ -27,9 +28,10 @@ static Tuple doubleArrayToTuple(final double[] array) throws ExecException { return tuple; } - static Sketch deserializeSketchFromTuple(final Tuple tuple) throws ExecException { + static Sketch deserializeSketchFromTuple(final Tuple tuple, + final SummaryDeserializer summaryDeserializer) throws ExecException { final byte[] bytes = ((DataByteArray) tuple.get(0)).get(); - return Sketches.heapifySketch(Memory.wrap(bytes)); + return Sketches.heapifySketch(Memory.wrap(bytes), summaryDeserializer); } } diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketchTest.java index f064dd2..44d468b 100644 --- a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketchTest.java +++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromDoublesSketchTest.java @@ -25,10 +25,7 @@ public void emptySketch() throws Exception { EvalFunc func = new GetPmfFromDoublesSketch(); DoublesSketch sketch = DoublesSketch.builder().build(); Tuple resultTuple = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.5))); - Assert.assertNotNull(resultTuple); - Assert.assertEquals(resultTuple.size(), 2); - Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN); - Assert.assertEquals(((double) resultTuple.get(1)), Double.NaN); + Assert.assertNull(resultTuple); } @Test diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketchTest.java index ffb2b1c..ff87598 100644 --- a/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketchTest.java +++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetPmfFromStringsSketchTest.java @@ -30,10 +30,7 @@ public void emptySketch() throws Exception { EvalFunc func = new GetPmfFromStringsSketch(); ItemsSketch sketch = ItemsSketch.getInstance(COMPARATOR); Tuple resultTuple = func.exec(TUPLE_FACTORY.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray(SER_DE)), "a"))); - Assert.assertNotNull(resultTuple); - Assert.assertEquals(resultTuple.size(), 2); - Assert.assertEquals(((double) resultTuple.get(0)), Double.NaN); - Assert.assertEquals(((double) resultTuple.get(1)), Double.NaN); + Assert.assertNull(resultTuple); } @Test diff --git a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileFromDoublesSketchTest.java b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileFromDoublesSketchTest.java index 7a2d74c..4179703 100644 --- a/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileFromDoublesSketchTest.java +++ b/src/test/java/com/yahoo/sketches/pig/quantiles/GetQuantileFromDoublesSketchTest.java @@ -24,7 +24,7 @@ public void emptySketch() throws Exception { EvalFunc func = new GetQuantileFromDoublesSketch(); DoublesSketch sketch = DoublesSketch.builder().build(); Double result = func.exec(tupleFactory.newTuple(Arrays.asList(new DataByteArray(sketch.toByteArray()), 0.0))); - Assert.assertEquals(result, Double.POSITIVE_INFINITY); + Assert.assertEquals(result, Double.NaN); } @Test diff --git a/src/test/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketchTest.java b/src/test/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketchTest.java index f0e4d00..e637c1b 100644 --- a/src/test/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketchTest.java +++ b/src/test/java/com/yahoo/sketches/pig/tuple/DataToDoubleSummarySketchTest.java @@ -17,8 +17,10 @@ import com.yahoo.memory.Memory; import com.yahoo.sketches.tuple.DoubleSummary; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; import com.yahoo.sketches.tuple.DoubleSummaryFactory; import com.yahoo.sketches.tuple.Sketch; +import com.yahoo.sketches.tuple.SketchIterator; import com.yahoo.sketches.tuple.Sketches; import com.yahoo.sketches.tuple.UpdatableSketch; import com.yahoo.sketches.tuple.UpdatableSketchBuilder; @@ -48,7 +50,7 @@ public void execEmptyBag() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -92,11 +94,12 @@ public void execAllInputTypes() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 8.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 3.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 3.0); } } @@ -115,11 +118,12 @@ public void execMinMode() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 1.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 1.0); } } @@ -148,11 +152,12 @@ public void accumulator() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 3.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 3.0); } // after cleanup, the value should always be 0 @@ -162,7 +167,7 @@ public void accumulator() throws Exception { Assert.assertEquals(resultTuple.size(), 1); bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch2 = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch2 = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch2.getEstimate(), 0.0, 0.0); } @@ -223,11 +228,12 @@ public void algebraicIntermediateFinal() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 3.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 3.0); } } @@ -249,11 +255,12 @@ public void algebraicIntermediateFinalMaxMode() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 2.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 2.0); } } } diff --git a/src/test/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentileTest.java b/src/test/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentileTest.java index 2935cfd..639fa11 100644 --- a/src/test/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentileTest.java +++ b/src/test/java/com/yahoo/sketches/pig/tuple/DoubleSummarySketchToPercentileTest.java @@ -26,7 +26,7 @@ public void emptySketch() throws Exception { UpdatableSketch sketch = new UpdatableSketchBuilder(new DoubleSummaryFactory()).build(); Tuple inputTuple = TupleFactory.getInstance().newTuple(Arrays.asList(new DataByteArray(sketch.compact().toByteArray()), 0.0)); double result = func.exec(inputTuple); - Assert.assertEquals(result, Double.POSITIVE_INFINITY); + Assert.assertEquals(result, Double.NaN); } @Test diff --git a/src/test/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketchTest.java b/src/test/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketchTest.java index 4a681c2..82d7ed5 100644 --- a/src/test/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketchTest.java +++ b/src/test/java/com/yahoo/sketches/pig/tuple/UnionDoubleSummarySketchTest.java @@ -18,8 +18,10 @@ import com.yahoo.memory.Memory; import com.yahoo.sketches.tuple.DoubleSummary; +import com.yahoo.sketches.tuple.DoubleSummaryDeserializer; import com.yahoo.sketches.tuple.DoubleSummaryFactory; import com.yahoo.sketches.tuple.Sketch; +import com.yahoo.sketches.tuple.SketchIterator; import com.yahoo.sketches.tuple.Sketches; import com.yahoo.sketches.tuple.UpdatableSketch; import com.yahoo.sketches.tuple.UpdatableSketchBuilder; @@ -61,10 +63,11 @@ public void exec() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 2.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 2.0, 0.0); } } @@ -89,10 +92,11 @@ public void execMaxMode() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 3.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 3.0, 0.0); } } @@ -105,7 +109,7 @@ public void accumulatorNullInput() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -118,7 +122,7 @@ public void accumulatorEmptyInputTuple() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -131,7 +135,7 @@ public void accumulatorNotABag() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -144,7 +148,7 @@ public void accumulatorEmptyBag() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -157,7 +161,7 @@ public void accumulatorEmptyInnerTuple() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -170,7 +174,7 @@ public void accumulatorNullSketch() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -188,7 +192,7 @@ public void accumulatorEmptySketch() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 0.0); } @@ -218,10 +222,11 @@ public void accumulator() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 2.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 2.0, 0.0); } } @@ -267,10 +272,11 @@ public void algebraicIntemediateFinalExactMinMode() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 2.0, 0.0); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 1.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 1.0, 0.0); } } @@ -300,10 +306,11 @@ public void algebraicIntemediateFinalEstimation() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 40000.0, 40000.0 * 0.01); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 1.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 1.0, 0.0); } } @@ -325,10 +332,11 @@ public void algebraicIntemediateFinalSingleCall() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), 10000.0, 10000.0 * 0.02); - for (DoubleSummary summary: sketch.getSummaries()) { - Assert.assertEquals(summary.getValue(), 1.0, 0.0); + SketchIterator it = sketch.iterator(); + while (it.next()) { + Assert.assertEquals(it.getSummary().getValue(), 1.0, 0.0); } } @@ -401,11 +409,12 @@ public void algebraicIntemediateFinalRandomized() throws Exception { Assert.assertEquals(resultTuple.size(), 1); DataByteArray bytes = (DataByteArray) resultTuple.get(0); Assert.assertTrue(bytes.size() > 0); - Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get())); + Sketch sketch = Sketches.heapifySketch(Memory.wrap(bytes.get()), new DoubleSummaryDeserializer()); Assert.assertEquals(sketch.getEstimate(), uniques, uniques * 0.01); double sum = 0; - for (DoubleSummary summary: sketch.getSummaries()) { - sum += summary.getValue(); + SketchIterator it = sketch.iterator(); + while (it.next()) { + sum += it.getSummary().getValue(); } // each update added 10 to the total on average Assert.assertEquals(sum / sketch.getTheta(), updates * 10.0, updates * 10.0 * 0.02); // there is a slight chance of failing here