Skip to content

Commit

Permalink
Merge pull request #49 from DataSketches/core-11-tuple-api-change
Browse files Browse the repository at this point in the history
Core 11 tuple api change
  • Loading branch information
leerho committed Apr 6, 2018
2 parents 15b038d + 693669e commit 7ef7184
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.10.3</version>
<version>0.11.0</version>
</dependency>

<!-- Pig -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public Tuple exec(final Tuple input) throws IOException {
}
splitPoints[i - 1] = (double) input.get(i);
}
return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));
final double[] pmf = sketch.getPMF(splitPoints);
if (pmf == null) { return null; }
return Util.doubleArrayToTuple(pmf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public Tuple exec(final Tuple input) throws IOException {
}
splitPoints[i - 1] = (String) input.get(i);
}
return Util.doubleArrayToTuple(sketch.getPMF(splitPoints));
final double[] pmf = sketch.getPMF(splitPoints);
if (pmf == null) { return null; }
return Util.doubleArrayToTuple(pmf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.pig.Algebraic;

import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.DoubleSummaryFactory;
import com.yahoo.sketches.tuple.DoubleSummarySetOperations;

/**
* This UDF creates a Sketch&lt;DoubleSummary&gt; from raw data.
Expand Down Expand Up @@ -86,7 +88,7 @@ public static class IntermediateFinal
* Default sketch size and default mode
*/
public IntermediateFinal() {
super(new DoubleSummaryFactory());
super(new DoubleSummaryFactory(), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
Expand All @@ -95,7 +97,8 @@ public IntermediateFinal() {
* @param sketchSize String representation of sketch size
*/
public IntermediateFinal(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(), new DoubleSummarySetOperations(),
new DoubleSummaryDeserializer());
}

/**
Expand All @@ -105,7 +108,9 @@ public IntermediateFinal(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public IntermediateFinal(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.apache.pig.data.Tuple;

import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SummaryDeserializer;
import com.yahoo.sketches.tuple.SummaryFactory;
import com.yahoo.sketches.tuple.SummarySetOperations;
import com.yahoo.sketches.tuple.Union;
import com.yahoo.sketches.tuple.UpdatableSketch;
import com.yahoo.sketches.tuple.UpdatableSketchBuilder;
Expand All @@ -34,44 +36,56 @@
public abstract class DataToSketchAlgebraicIntermediateFinal<U, S extends UpdatableSummary<U>>
extends EvalFunc<Tuple> {
private final int sketchSize_;
private final SummaryFactory<S> summaryFactory_;
private final SummarySetOperations<S> summarySetOps_;
private final SummaryDeserializer<S> summaryDeserializer_;
private final UpdatableSketchBuilder<U, S> sketchBuilder_;
private boolean isFirstCall_ = true;

/**
* Constructs a function given a summary factory, default sketch size and default
* sampling probability of 1.
* Constructs a function given a summary factory, summary set operations, summary deserializer,
* default sketch size and default sampling probability of 1.
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory<S> summaryFactory) {
this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory);
public DataToSketchAlgebraicIntermediateFinal(final SummaryFactory<S> summaryFactory,
final SummarySetOperations<S> summarySetOps, final SummaryDeserializer<S> summaryDeserializer) {
this(DEFAULT_NOMINAL_ENTRIES, 1f, summaryFactory, summarySetOps, summaryDeserializer);
}

/**
* Constructs a function given a sketch size, summary factory and default
* sampling probability of 1.
* Constructs a function given a sketch size, summary factory, summary set operations,
* summary deserializer and default sampling probability of 1.
* @param sketchSize parameter controlling the size of the sketch and the accuracy.
* It represents nominal number of entries in the sketch. Forced to the nearest power of 2
* greater than given value.
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize,
final SummaryFactory<S> summaryFactory) {
this(sketchSize, 1f, summaryFactory);
final SummaryFactory<S> summaryFactory, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
this(sketchSize, 1f, summaryFactory, summarySetOps, summaryDeserializer);
}

/**
* Constructs a function given a sketch size, sampling probability and summary factory
* Constructs a function given a sketch size, sampling probability, summary factory,
* summary set operations and summary deserializer
* @param sketchSize parameter controlling the size of the sketch and the accuracy.
* It represents nominal number of entries in the sketch. Forced to the nearest power of 2
* greater than given value.
* @param samplingProbability parameter from 0 to 1 inclusive
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperaions
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize,
final float samplingProbability, final SummaryFactory<S> summaryFactory) {
public DataToSketchAlgebraicIntermediateFinal(final int sketchSize, final float samplingProbability,
final SummaryFactory<S> summaryFactory, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
sketchSize_ = sketchSize;
summaryFactory_ = summaryFactory;
summarySetOps_ = summarySetOps;
summaryDeserializer_ = summaryDeserializer;
sketchBuilder_ = new UpdatableSketchBuilder<U, S>(summaryFactory)
.setNominalEntries(sketchSize).setSamplingProbability(samplingProbability);
}
Expand All @@ -83,7 +97,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
Logger.getLogger(getClass()).info("algebraic is used");
isFirstCall_ = false;
}
final Union<S> union = new Union<S>(sketchSize_, summaryFactory_);
final Union<S> union = new Union<S>(sketchSize_, summarySetOps_);

final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null) {
Expand All @@ -102,7 +116,7 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
// This is a sketch from a prior call to the
// Intermediate function. merge it with the
// current sketch.
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(dataTuple);
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(dataTuple, summaryDeserializer_);
union.update(incomingSketch);
} else {
// we should never get here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import com.yahoo.memory.Memory;
import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SketchIterator;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.SummaryDeserializer;

/**
* This UDF converts a Sketch&lt;DoubleSummary&gt; to estimates.
Expand All @@ -29,14 +31,18 @@
*/
public class DoubleSummarySketchToEstimates extends EvalFunc<Tuple> {

private static final SummaryDeserializer<DoubleSummary> SUMMARY_DESERIALIZER =
new DoubleSummaryDeserializer();

@Override
public Tuple exec(final Tuple input) throws IOException {
if ((input == null) || (input.size() == 0)) {
return null;
}

final DataByteArray dba = (DataByteArray) input.get(0);
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(dba.get()));
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(
Memory.wrap(dba.get()), SUMMARY_DESERIALIZER);

final Tuple output = TupleFactory.getInstance().newTuple(2);
output.set(0, sketch.getEstimate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.SketchIterator;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.SummaryDeserializer;

/**
* This UDF is to get a percentile value from a Sketch&lt;DoubleSummary&gt;.
Expand All @@ -29,6 +31,8 @@
*/
public class DoubleSummarySketchToPercentile extends EvalFunc<Double> {

private static final SummaryDeserializer<DoubleSummary> SUMMARY_DESERIALIZER =
new DoubleSummaryDeserializer();
private static final int QUANTILES_SKETCH_SIZE = 1024;

@Override
Expand All @@ -38,7 +42,8 @@ public Double exec(final Tuple input) throws IOException {
}

final DataByteArray dba = (DataByteArray) input.get(0);
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(Memory.wrap(dba.get()));
final Sketch<DoubleSummary> sketch = Sketches.heapifySketch(
Memory.wrap(dba.get()), SUMMARY_DESERIALIZER);

final double percentile = (double) input.get(1);
if ((percentile < 0) || (percentile > 100)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import org.apache.pig.Algebraic;

import com.yahoo.sketches.tuple.DoubleSummary;
import com.yahoo.sketches.tuple.DoubleSummaryFactory;
import com.yahoo.sketches.tuple.DoubleSummaryDeserializer;
import com.yahoo.sketches.tuple.DoubleSummarySetOperations;

/**
* This is to union Sketch&lt;DoubleSummary&gt;.
Expand All @@ -20,15 +21,15 @@ public class UnionDoubleSummarySketch extends UnionSketch<DoubleSummary> impleme
* Constructor with default sketch size and default mode (sum)
*/
public UnionDoubleSummarySketch() {
super(new DoubleSummaryFactory());
super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
* Constructor with default mode (sum)
* @param sketchSize String representation of sketch size
*/
public UnionDoubleSummarySketch(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
Expand All @@ -37,7 +38,9 @@ public UnionDoubleSummarySketch(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public UnionDoubleSummarySketch(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}

@Override
Expand Down Expand Up @@ -84,7 +87,7 @@ public static class IntermediateFinal extends UnionSketchAlgebraicIntermediateFi
* Default sketch size and default mode.
*/
public IntermediateFinal() {
super(new DoubleSummaryFactory());
super(new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
Expand All @@ -93,7 +96,7 @@ public IntermediateFinal() {
* @param sketchSize String representation of sketch size
*/
public IntermediateFinal(final String sketchSize) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory());
super(Integer.parseInt(sketchSize), new DoubleSummarySetOperations(), new DoubleSummaryDeserializer());
}

/**
Expand All @@ -103,7 +106,9 @@ public IntermediateFinal(final String sketchSize) {
* @param summaryMode String representation of mode (sum, min or max)
*/
public IntermediateFinal(final String sketchSize, final String summaryMode) {
super(Integer.parseInt(sketchSize), new DoubleSummaryFactory(DoubleSummary.Mode.valueOf(summaryMode)));
super(Integer.parseInt(sketchSize),
new DoubleSummarySetOperations(DoubleSummary.Mode.valueOf(summaryMode)),
new DoubleSummaryDeserializer());
}
}

Expand Down
43 changes: 25 additions & 18 deletions src/main/java/com/yahoo/sketches/pig/tuple/UnionSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import com.yahoo.sketches.tuple.Sketch;
import com.yahoo.sketches.tuple.Sketches;
import com.yahoo.sketches.tuple.Summary;
import com.yahoo.sketches.tuple.SummaryFactory;
import com.yahoo.sketches.tuple.SummaryDeserializer;
import com.yahoo.sketches.tuple.SummarySetOperations;
import com.yahoo.sketches.tuple.Union;

/**
Expand All @@ -29,29 +30,35 @@
*/
public abstract class UnionSketch<S extends Summary> extends EvalFunc<Tuple> implements Accumulator<Tuple> {
private final int sketchSize_;
private final SummaryFactory<S> summaryFactory_;
private final SummarySetOperations<S> summarySetOps_;
private final SummaryDeserializer<S> summaryDeserializer_;
private Union<S> union_;
private boolean isFirstCall_ = true;

/**
* Constructs a function given a summary factory and default sketch size
* @param summaryFactory an instance of SummaryFactory
* Constructs a function given a summary set operations, summary deserializer and default sketch size
* @param summarySetOps an instance of SummarySetOperations
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public UnionSketch(final SummaryFactory<S> summaryFactory) {
this(DEFAULT_NOMINAL_ENTRIES, summaryFactory);
public UnionSketch(final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
this(DEFAULT_NOMINAL_ENTRIES, summarySetOps, summaryDeserializer);
}

/**
* Constructs a function given a sketch size and summary factory
* Constructs a function given a sketch size, summary set operations and summary deserializer
* @param sketchSize parameter controlling the size of the sketch and the accuracy.
* It represents nominal number of entries in the sketch. Forced to the nearest power of 2
* greater than given value.
* @param summaryFactory an instance of SummaryFactory
* @param summarySetOps an instance of SummarySetOperations
* @param summaryDeserializer an instance of SummaryDeserializer
*/
public UnionSketch(final int sketchSize, final SummaryFactory<S> summaryFactory) {
public UnionSketch(final int sketchSize, final SummarySetOperations<S> summarySetOps,
final SummaryDeserializer<S> summaryDeserializer) {
super();
this.sketchSize_ = sketchSize;
this.summaryFactory_ = summaryFactory;
sketchSize_ = sketchSize;
summarySetOps_ = summarySetOps;
summaryDeserializer_ = summaryDeserializer;
}

@Override
Expand All @@ -65,8 +72,8 @@ public Tuple exec(final Tuple inputTuple) throws IOException {
return null;
}
final DataBag bag = (DataBag) inputTuple.get(0);
final Union<S> union = new Union<S>(sketchSize_, summaryFactory_);
updateUnion(bag, union);
final Union<S> union = new Union<S>(sketchSize_, summarySetOps_);
updateUnion(bag, union, summaryDeserializer_);
return Util.tupleFactory.newTuple(new DataByteArray(union.getResult().toByteArray()));
}

Expand All @@ -83,9 +90,9 @@ public void accumulate(final Tuple inputTuple) throws IOException {
final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null || bag.size() == 0) { return; }
if (union_ == null) {
union_ = new Union<S>(sketchSize_, summaryFactory_);
union_ = new Union<S>(sketchSize_, summarySetOps_);
}
updateUnion(bag, union_);
updateUnion(bag, union_, summaryDeserializer_);
}

@Override
Expand All @@ -101,13 +108,13 @@ public void cleanup() {
if (union_ != null) { union_.reset(); }
}

private static <S extends Summary> void updateUnion(final DataBag bag, final Union<S> union)
throws ExecException {
private static <S extends Summary> void updateUnion(final DataBag bag, final Union<S> union,
final SummaryDeserializer<S> summaryDeserializer) throws ExecException {
for (final Tuple innerTuple: bag) {
if ((innerTuple.size() != 1) || (innerTuple.get(0) == null)) {
continue;
}
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(innerTuple);
final Sketch<S> incomingSketch = Util.deserializeSketchFromTuple(innerTuple, summaryDeserializer);
union.update(incomingSketch);
}
}
Expand Down
Loading

0 comments on commit 7ef7184

Please sign in to comment.