Skip to content

Commit

Permalink
[dbs-leipzig#1571] Added interval calculation and changed tests to ch…
Browse files Browse the repository at this point in the history
…eck for intervals
  • Loading branch information
ChristopherLausch committed Jan 16, 2024
1 parent 8bd6763 commit 6c2d234
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.operators.metric.functions.TransformDeltaToAbsoluteDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.FlatMapVertexIdEdgeInterval;
import org.gradoop.temporal.model.impl.operators.metric.functions.BuildTemporalDegreeTree;
import org.gradoop.temporal.model.impl.operators.metric.functions.GroupDegreeTreesToVariance;
import org.gradoop.temporal.model.impl.operators.metric.functions.*;

import java.util.Objects;

/**
* Operator that calculates the degree variance evolution of a temporal graph for the
* whole lifetime of the graph.
*/
public class DegreeVarianceEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple2<Long, Double>>> {
public class DegreeVarianceEvolution implements UnaryBaseGraphToValueOperator<TemporalGraph, DataSet<Tuple3<Long, Long, Double>>> {
/**
* The time dimension that will be considered.
*/
Expand All @@ -55,7 +53,7 @@ public DegreeVarianceEvolution(VertexDegree degreeType, TimeDimension dimension)
}

@Override
public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
public DataSet<Tuple3<Long, Long, Double>> execute(TemporalGraph graph) {
return graph.getEdges()
// 1) Extract vertex id(s) and corresponding time intervals
.flatMap(new FlatMapVertexIdEdgeInterval(dimension, degreeType))
Expand All @@ -66,6 +64,7 @@ public DataSet<Tuple2<Long, Double>> execute(TemporalGraph graph) {
// 4) Transform each tree to aggregated evolution
.map(new TransformDeltaToAbsoluteDegreeTree())
// 6) Merge trees together and calculate aggregation
.reduceGroup(new GroupDegreeTreesToVariance());
.reduceGroup(new GroupDegreeTreesToVariance())
.mapPartition(new MapDegreesToInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
/* if (Long.MAX_VALUE == timePoint) {
continue;
}
}*/
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
Expand All @@ -85,12 +85,15 @@ public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
}
}

//sum of the degrees of all vertices
Optional<Integer> opt = vertexDegrees.values().stream().reduce(Math::addExact);
Optional<Double> opt2 = Optional.empty();

double mean;

//if we have a sum at the current timestamp
if (opt.isPresent()) {
//calculate the avg degree of the graph at the current timestamp
mean = (double) opt.get() / (double) numberOfVertices;
opt2 = Optional.of(vertexDegrees.values().stream()
.mapToDouble(val -> (val - mean) * (val - mean)).sum());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

/**
* A map partition function that calculates the intervals in which the values stay the same
* by checking if the current value is the same as the previous one
*
*/

public class MapDegreesToInterval implements MapPartitionFunction<Tuple2<Long, Double>, Tuple3<Long, Long, Double>> {
@Override
public void mapPartition(Iterable<Tuple2<Long, Double>> values, Collector<Tuple3<Long, Long, Double>> out) {

//set starting values to null
Long startTimestamp = null;
Long endTimestamp = null;
Double value = null;
Boolean collected = false;

//loop through each tuple
for (Tuple2<Long, Double> tuple : values) {
if (startTimestamp == null) {
// First element in the group
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
} else {
if (!tuple.f1.equals(value)) {
// Value changed, emit the current interval and start a new one
out.collect(new Tuple3<>(startTimestamp, tuple.f0, value));
startTimestamp = tuple.f0;
endTimestamp = tuple.f0;
value = tuple.f1;
collected = true;
} else {
// Extend the current interval
endTimestamp = tuple.f0;
collected = false;
}
}
}
//check if the latest interval was collected, if not, collect it
//this happens when the last interval has the value 0
if (!collected) {
out.collect(new Tuple3<>(startTimestamp, endTimestamp, value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.impl.operators.sampling.functions.VertexDegree;
import org.gradoop.temporal.model.api.TimeDimension;
Expand All @@ -42,40 +43,66 @@ public class DegreeVarianceEvolutionTest extends TemporalGradoopTestBase {
/**
* The expected in-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_IN_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Double>> EXPECTED_IN_DEGREES = new ArrayList<>();
/**
* The expected out-degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_OUT_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Double>> EXPECTED_OUT_DEGREES = new ArrayList<>();
/**
* The expected degrees for each vertex label.
*/
private static final List<Tuple2<Long, Double>> EXPECTED_BOTH_DEGREES = new ArrayList<>();
private static final List<Tuple3<Long, Long, Double>> EXPECTED_BOTH_DEGREES = new ArrayList<>();

static {
// IN DEGREES
/*
EXPECTED_IN_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_IN_DEGREES.add(new Tuple2<>(0L, 0.1875));
EXPECTED_IN_DEGREES.add(new Tuple2<>(4L, 0.5));
EXPECTED_IN_DEGREES.add(new Tuple2<>(5L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple2<>(6L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple2<>(7L, 0.1875));
*/
EXPECTED_IN_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0.0));
EXPECTED_IN_DEGREES.add(new Tuple3<>(0L, 4L, 0.1875));
EXPECTED_IN_DEGREES.add(new Tuple3<>(4L, 5L, 0.5));
EXPECTED_IN_DEGREES.add(new Tuple3<>(5L, 7L, 0.25));
EXPECTED_IN_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 0.1875));



// OUT DEGREES
EXPECTED_OUT_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0.0));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(0L, 4L, 0.1875));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(4L, 5L, 0.5));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(5L, 7L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 0.1875));

/*
EXPECTED_OUT_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(0L, 0.1875));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(4L, 0.5));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(5L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(6L, 0.25));
EXPECTED_OUT_DEGREES.add(new Tuple2<>(7L, 0.1875));
*/

// DEGREES
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(Long.MIN_VALUE, 0L, 0.0));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(0L, 4L, 0.24000000000000005));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(4L, 5L, 0.64));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(5L, 6L, 0.16));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(6L, 7L, 0.56));
EXPECTED_BOTH_DEGREES.add(new Tuple3<>(7L, Long.MAX_VALUE, 0.24000000000000005));

/*
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(Long.MIN_VALUE, 0.0));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(0L, 0.24000000000000005));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(4L, 0.64));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(5L, 0.16));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(6L, 0.56));
EXPECTED_BOTH_DEGREES.add(new Tuple2<>(7L, 0.24000000000000005));
*/
}

/**
Expand Down Expand Up @@ -130,9 +157,9 @@ public void setUp() throws Exception {
*/
@Test
public void testDegreeVariance() throws Exception {
Collection<Tuple2<Long, Double>> resultCollection = new ArrayList<>();
Collection<Tuple3<Long, Long, Double>> resultCollection = new ArrayList<>();

final DataSet<Tuple2<Long, Double>> resultDataSet = testGraph
final DataSet<Tuple3<Long, Long, Double>> resultDataSet = testGraph
.callForValue(new DegreeVarianceEvolution(degreeType, TimeDimension.VALID_TIME));

resultDataSet.output(new LocalCollectionOutputFormat<>(resultCollection));
Expand Down

0 comments on commit 6c2d234

Please sign in to comment.