Skip to content

Commit

Permalink
[dbs-leipzig#1571] Added a map partition step to combine multiple ver…
Browse files Browse the repository at this point in the history
…tices into a "super-vertex" so the final group-reduce step gets a smaller input
  • Loading branch information
ChristopherLausch committed Jan 23, 2024
1 parent bf44672 commit 6f154ab
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapPartitionCombineDegreeTrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

Expand Down Expand Up @@ -53,7 +55,9 @@ public AvgDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.AVG))
.mapPartition(new MapDegreesToInterval());
.mapPartition(new MapPartitionCombineDegreeTrees(AggregationType.AVG))
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.AVG))
.sortPartition(0, Order.ASCENDING)
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapPartitionCombineDegreeTrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

Expand Down Expand Up @@ -53,7 +55,11 @@ public MaxDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MAX))
.mapPartition(new MapDegreesToInterval());
.mapPartition(new MapPartitionCombineDegreeTrees(AggregationType.MAX))
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MAX))
.sortPartition(0, Order.ASCENDING)
.mapPartition(new MapDegreesToInterval());
//.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MAX))
// .mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package org.gradoop.temporal.model.impl.operators.metric;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.AggregationType;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapPartitionCombineDegreeTrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToAggregateDegrees;
import org.gradoop.temporal.model.impl.operators.metric.functions.MapDegreesToInterval;

Expand Down Expand Up @@ -54,7 +56,9 @@ public MinDegreeEvolution(VertexDegree degreeType, TimeDimension dimension) {
@Override
public DataSet<Tuple3<Long, Long, Float>> execute(TemporalGraph graph) {
return preProcess(graph)
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MIN))
.mapPartition(new MapDegreesToInterval());
.mapPartition(new MapPartitionCombineDegreeTrees(AggregationType.MIN))
.reduceGroup(new GroupDegreeTreesToAggregateDegrees(AggregationType.MIN))
.sortPartition(0, Order.ASCENDING)
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

Expand All @@ -27,11 +28,13 @@
import java.util.Map;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
* A group reduce function that merges all Tuples (vId, degreeTree, Integer)
* to a dataset of tuples (time, aggDegree) that represents the aggregated
* degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToAggregateDegrees
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Float>> {
implements GroupReduceFunction<Tuple3<GradoopId, TreeMap<Long, Integer>, Integer>,
Tuple2<Long, Float>> {

/**
* The aggregate type to use (min,max,avg).
Expand All @@ -46,40 +49,38 @@ public class GroupDegreeTreesToAggregateDegrees
public GroupDegreeTreesToAggregateDegrees(AggregationType aggregateType) {
this.aggregateType = aggregateType;
}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Float>> collector) throws Exception {

public void reduce(Iterable<Tuple3<GradoopId, TreeMap<Long, Integer>, Integer>> iterable,
Collector<Tuple2<Long, Float>> collector) throws Exception {

// init necessary maps and set
// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
SortedSet<Long> timePoints = new TreeSet<>();
int numberOfVertices = 0;

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
for (Tuple3<GradoopId, TreeMap<Long, Integer>, Integer> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
numberOfVertices = numberOfVertices + tuple.f2;
}

int numberOfVertices = degreeTrees.size();
long lastTimestamp = Long.MIN_VALUE;
float degreeValue = 0f;

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {

// Iterate over all vertices
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Expand All @@ -90,7 +91,7 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
}
}

lastTimestamp = timePoint;

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
switch (aggregateType) {
case MIN:
Expand All @@ -106,10 +107,8 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
default:
throw new IllegalArgumentException("Aggregate type not specified.");
}

//collect the results
//collect the results
collector.collect(new Tuple2<>(timePoint, degreeValue));

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.TreeSet;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.HashMap;
import java.util.Map;

/**
* A map partition function that merges/aggregates all Tuples (vId, degreeTree) on a partition into a
* new Tuple (GradoopId, degreeTree, Integer) that represents a new "super" vertex which is the aggregated
* combination of all vertices on that partition. The Integer represents the count of
* degreeTrees that were used to create this new one.
*/
public class MapPartitionCombineDegreeTrees
implements MapPartitionFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>,
Tuple3<GradoopId, TreeMap<Long, Integer>, Integer>> {

/**
* The aggregate type to use (min,max,avg).
*/
private final AggregationType aggregateType;

/**
* Creates an instance of this map partition function.
*
* @param aggregateType the aggregate type to use (min,max,avg).
*/
public MapPartitionCombineDegreeTrees(AggregationType aggregateType) {
this.aggregateType = aggregateType;
}

@Override
public void mapPartition(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple3<GradoopId, TreeMap<Long, Integer>, Integer>> collector)
throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();
TreeMap<Long, Integer> values = new TreeMap<>();
SortedSet<Long> timePoints = new TreeSet<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
timePoints.addAll(tuple.f1.keySet());
}

int numberOfVertices = degreeTrees.size();

//check if partition is empty, if yes, collect nothing and return
if (numberOfVertices == 0) {
return;
}

int degreeValue = 0;

// Add default times
timePoints.add(Long.MIN_VALUE);

//create new GradoopId for the new "super-vertex"
GradoopId key = GradoopId.get();

for (Long timePoint : timePoints) {
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

switch (aggregateType) {
case MIN:
degreeValue = vertexDegrees.values().stream().reduce(Math::min).orElse(0);
break;
case MAX:
degreeValue = vertexDegrees.values().stream().reduce(Math::max).orElse(0);
break;
case AVG:
degreeValue = vertexDegrees.values().stream().reduce(Math::addExact).orElse(0);
break;
default:
throw new IllegalArgumentException("Aggregate type not specified.");
}
values.put(timePoint, degreeValue);
}
//collect the results
collector.collect(new Tuple3<>(key, values, numberOfVertices));
}
}

0 comments on commit 6f154ab

Please sign in to comment.