Skip to content

Commit

Permalink
[#17] Add pattern matching benchmark (#20)
Browse files Browse the repository at this point in the history
closes #17
  • Loading branch information
ChrizZz110 authored May 10, 2021
1 parent 2749dac commit 32412ec
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 27 deletions.
88 changes: 84 additions & 4 deletions src/main/java/org/gradoop/benchmarks/AbstractRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
import org.gradoop.flink.io.impl.csv.indexed.IndexedCSVDataSource;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;
import org.gradoop.temporal.io.api.TemporalDataSink;
import org.gradoop.temporal.io.api.TemporalDataSource;
import org.gradoop.temporal.io.impl.csv.TemporalCSVDataSink;
import org.gradoop.temporal.io.impl.csv.TemporalCSVDataSource;
import org.gradoop.temporal.io.impl.csv.indexed.TemporalIndexedCSVDataSink;
import org.gradoop.temporal.io.impl.csv.indexed.TemporalIndexedCSVDataSource;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.util.TemporalGradoopConfig;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -87,11 +95,24 @@ protected static LogicalGraph readLogicalGraph(String directory) throws IOExcept
* @return EPGM logical graph
* @throws IOException on failure
*/
protected static LogicalGraph readLogicalGraph(String directory, String format)
throws IOException {
protected static LogicalGraph readLogicalGraph(String directory, String format) throws IOException {
return getDataSource(directory, format).getLogicalGraph();
}

/**
* Reads a TPGM graph from a given directory. Currently there are two supported formats: {@code csv} which
* uses a {@link TemporalCSVDataSource} and {@code indexed} which uses a
* {@link TemporalIndexedCSVDataSource}.
*
* @param directory path to the TPGM database
* @param format format in which the graph is stored (csv, indexed)
* @return a TPGM graph instance
* @throws IOException in case of an error
*/
protected static TemporalGraph readTemporalGraph(String directory, String format) throws IOException {
return getTemporalDataSource(directory, format).getTemporalGraph();
}

/**
* Writes a logical graph into the specified directory using a {@link CSVDataSink}.
*
Expand All @@ -108,7 +129,7 @@ protected static void writeLogicalGraph(LogicalGraph graph, String directory) th
*
* @param graph logical graph
* @param directory output path
* @param format output format (csv, indexed)
* @param format output format
* @throws Exception on failure
*/
protected static void writeLogicalGraph(LogicalGraph graph, String directory, String format)
Expand All @@ -117,6 +138,20 @@ protected static void writeLogicalGraph(LogicalGraph graph, String directory, St
getExecutionEnvironment().execute();
}

/**
* Writes a temporal graph into a given directory.
*
* @param graph the temporal graph to write
* @param directory the target directory
* @param format the output format (csv, indexed)
* @throws Exception in case of an error
*/
protected static void writeTemporalGraph(TemporalGraph graph, String directory, String format)
throws Exception {
graph.writeTo(getTemporalDataSink(directory, format, graph.getConfig()), true);
getExecutionEnvironment().execute();
}

/**
* Returns a Flink execution environment.
*
Expand Down Expand Up @@ -177,7 +212,30 @@ private static DataSource getDataSource(String directory, String format) {
}

/**
* Returns an EPGM DataSink for a given directory and format.
* Creates a TPGM data source for a given directory and format. The format string {@code csv} creates a
* {@link TemporalCSVDataSource} whereas {@code indexed} creates a {@link TemporalIndexedCSVDataSource}.
*
* @param directory the input path to the TPGM database
* @param format the input format
* @return a data source instance
*/
private static TemporalDataSource getTemporalDataSource(String directory, String format) {
directory = appendSeparator(directory);
TemporalGradoopConfig config = TemporalGradoopConfig.createConfig(getExecutionEnvironment());
format = format.toLowerCase();

switch (format) {
case "csv":
return new TemporalCSVDataSource(directory, config);
case "indexed":
return new TemporalIndexedCSVDataSource(directory, config);
default:
throw new IllegalArgumentException("Unsupported format: " + format);
}
}

/**
* Returns an EPGM DataSink
*
* @param directory output path
* @param format output format (csv, indexed)
Expand All @@ -197,4 +255,26 @@ private static DataSink getDataSink(String directory, String format, GradoopFlin
throw new IllegalArgumentException("Unsupported format: " + format);
}
}

/**
* Returns a TPGM data sink for a given directory and format.
*
* @param directory the directory where the graph will be stored
* @param format the output format (csv, indexed)
* @param config the temporal config
* @return a temporal data sink instance
*/
private static TemporalDataSink getTemporalDataSink(String directory, String format, TemporalGradoopConfig config) {
directory = appendSeparator(directory);
format = format.toLowerCase();

switch (format) {
case "csv":
return new TemporalCSVDataSink(directory, config);
case "indexed":
return new TemporalIndexedCSVDataSink(directory, config);
default:
throw new IllegalArgumentException("Unsupported format: " + format);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014 - 2020 Leipzig University (Database Research Group)
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/gradoop/benchmarks/tpgm/BaseTpgmBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ abstract class BaseTpgmBenchmark extends AbstractRunner {
* Option to declare path to indexed input graph
*/
private static final String OPTION_INPUT_PATH = "i";
/**
* Option to declare the graph input format (csv or indexed).
*/
private static final String OPTION_INPUT_FORMAT = "f";
/**
* Option to declare path to output graph
*/
Expand All @@ -61,6 +65,10 @@ abstract class BaseTpgmBenchmark extends AbstractRunner {
* Used input path
*/
static String INPUT_PATH;
/**
* Used input format (csv or indexed)
*/
static String INPUT_FORMAT;
/**
* Used output path
*/
Expand All @@ -76,6 +84,7 @@ abstract class BaseTpgmBenchmark extends AbstractRunner {

static {
OPTIONS.addRequiredOption(OPTION_INPUT_PATH, "input", true, "Path to source files.");
OPTIONS.addOption(OPTION_INPUT_FORMAT, "format", true, "Input graph format (csv (default), indexed).");
OPTIONS.addRequiredOption(OPTION_OUTPUT_PATH, "output", true, "Path to output file.");
OPTIONS.addRequiredOption(OPTION_CSV_PATH, "csv", true,
"Path to csv result file (will be created if not available).");
Expand Down Expand Up @@ -122,6 +131,7 @@ static void writeOrCountGraph(TemporalGraph temporalGraph, GradoopFlinkConfig co
*/
static void readBaseCMDArguments(CommandLine cmd) {
INPUT_PATH = cmd.getOptionValue(OPTION_INPUT_PATH);
INPUT_FORMAT = cmd.getOptionValue(OPTION_INPUT_FORMAT);
OUTPUT_PATH = cmd.getOptionValue(OPTION_OUTPUT_PATH);
CSV_PATH = cmd.getOptionValue(OPTION_CSV_PATH);
COUNT_RESULT = cmd.hasOption(OPTION_COUNT_RESULT);
Expand All @@ -147,5 +157,4 @@ static void writeToCSVFile(String csvHead, String csvTail) throws IOException {
Files.write(path, linesToWrite, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
}

}
31 changes: 13 additions & 18 deletions src/main/java/org/gradoop/benchmarks/tpgm/CitibikeBenchmark.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2014 - 2020 Leipzig University (Database Research Group)
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,14 +23,12 @@
import org.gradoop.flink.model.impl.operators.combination.ReduceCombination;
import org.gradoop.flink.model.impl.operators.keyedgrouping.GroupingKeys;
import org.gradoop.flink.model.impl.operators.keyedgrouping.KeyedGrouping;
import org.gradoop.temporal.io.impl.csv.TemporalCSVDataSource;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.functions.predicates.Overlaps;
import org.gradoop.temporal.model.impl.operators.aggregation.functions.AverageDuration;
import org.gradoop.temporal.model.impl.operators.keyedgrouping.TemporalGroupingKeys;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopConfig;

import java.io.IOException;
import java.time.LocalDateTime;
Expand All @@ -45,8 +43,6 @@
* The benchmark is expected to be executed on the Citibike data set.
*/
public class CitibikeBenchmark extends BaseTpgmBenchmark {


/**
* Main program to run the benchmark.
* <p>
Expand All @@ -65,13 +61,7 @@ public static void main(String[] args) throws Exception {

readBaseCMDArguments(cmd);

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TemporalGradoopConfig cfg = TemporalGradoopConfig.createConfig(env);

TemporalCSVDataSource source = new TemporalCSVDataSource(INPUT_PATH, cfg);

TemporalGraph citibikeGraph = source
.getTemporalGraph()
TemporalGraph citibikeGraph = readTemporalGraph(INPUT_PATH, INPUT_FORMAT)
// Snapshot
.snapshot(new Overlaps(LocalDateTime.of(2017,1,1,0,0), LocalDateTime.of(2019,1,1,0,0)), VALID_TIME)
// Transformation
Expand All @@ -86,7 +76,7 @@ public static void main(String[] args) throws Exception {
"AND v2.id != v3.id " +
"AND v3.id != v1.id " +
// A performance optimization trick
"AND v1.tx_to > 1970-01-01" +
//"AND v1.tx_to > 1970-01-01" +
//"AND t1.bike_id = t2.bike_id " +
"AND t1.val.precedes(t2.val) " +
"AND t1.val.lengthAtLeast(Minutes(30)) " +
Expand All @@ -97,23 +87,29 @@ public static void main(String[] args) throws Exception {
.callForGraph(
new KeyedGrouping<>(
// Vertex grouping key functions
Arrays.asList(GroupingKeys.label(), GroupingKeys.property("name"), GroupingKeys.property("id"),
Arrays.asList(
GroupingKeys.label(),
GroupingKeys.property("name"),
GroupingKeys.property("cellId")),
// Vertex aggregates
null,
// Edge grouping key functions
Arrays.asList(GroupingKeys.label(),
Arrays.asList(
GroupingKeys.label(),
TemporalGroupingKeys.timeStamp(VALID_TIME, TimeDimension.Field.FROM, ChronoField.MONTH_OF_YEAR)),
// Edge aggregates
Arrays.asList(new Count("countTripsOfMonth"),
Arrays.asList(
new Count("countTripsOfMonth"),
new AverageDuration("avgTripDurationOfMonth", VALID_TIME))))
// Subgraph
.subgraph(
v -> true,
e -> e.getPropertyValue("countTripsOfMonth").getLong() >= 1)
.verify();

writeOrCountGraph(citibikeGraph, cfg);
writeOrCountGraph(citibikeGraph, citibikeGraph.getConfig());

ExecutionEnvironment env = citibikeGraph.getConfig().getExecutionEnvironment();

env.execute(CitibikeBenchmark.class.getSimpleName() + " - P: " + env.getParallelism());
writeCSV(env);
Expand All @@ -133,5 +129,4 @@ private static void writeCSV(ExecutionEnvironment env) throws IOException {

writeToCSVFile(head, tail);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.benchmarks.tpgm;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.TemporalGraphCollection;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Dedicated program to benchmark the query operator on temporal data.
* The benchmark is expected to be executed on the LDBC data set.
*/
public class PatternMatchingBenchmark extends BaseTpgmBenchmark {
/**
* Main program to run the benchmark. Arguments are the available options.
* Example: {@code /path/to/flink run -c org.gradoop.benchmarks.tpgm.PatternMatchingBenchmark
* path/to/gradoop-benchmarks.jar -i hdfs:///graph -f indexed -o hdfs:///output -c results.csv}
* <p>
* It is advisable to use the {@link org.gradoop.temporal.io.impl.csv.indexed.TemporalIndexedCSVDataSource}
* for a better performance by using parameter {@code -f indexed}.
*
* @param args program arguments
* @throws Exception in case of an error
*/
public static void main(String[] args) throws Exception {
CommandLine cmd = parseArguments(args, PatternMatchingBenchmark.class.getName());

if (cmd == null) {
return;
}

readBaseCMDArguments(cmd);

TemporalGraph graph = readTemporalGraph(INPUT_PATH, INPUT_FORMAT);

ExecutionEnvironment env = graph.getConfig().getExecutionEnvironment();

String query = "MATCH (p:person)-[l:likes]->(c:comment), (c)-[r:replyOf]->(po:post) " +
"WHERE l.val_from.after(Timestamp(2012-06-01)) AND " +
" l.val_from.before(Timestamp(2012-06-02)) AND " +
" c.val_from.after(Timestamp(2012-05-30)) AND " +
" c.val_from.before(Timestamp(2012-06-02)) AND " +
" po.val_from.after(Timestamp(2012-05-30)) AND " +
" po.val_from.before(Timestamp(2012-06-02))";

TemporalGraphCollection results = graph.temporalQuery(query);

// only count the results and write it to a csv file
DataSet<Tuple2<String, Long>> sum = results.getGraphHeads()
.map(g -> new Tuple2<>("G", 1L)).returns(new TypeHint<Tuple2<String, Long>>() {})
// group by the element type (V or E)
.groupBy(0)
// sum the values
.sum(1);

sum.writeAsCsv(appendSeparator(OUTPUT_PATH) + "count.csv", FileSystem.WriteMode.OVERWRITE);

env.execute(PatternMatchingBenchmark.class.getSimpleName() + " - P: " + env.getParallelism());
writeCSV(env);
}

/**
* Method to create and add lines to a csv-file
*
* @param env given ExecutionEnvironment
* @throws IOException exception during file writing
*/
private static void writeCSV(ExecutionEnvironment env) throws IOException {
String head = String.format("%s|%s|%s|%s", "Parallelism", "dataset", "format", "Runtime(s)");
String tail = String.format("%s|%s|%s|%s", env.getParallelism(), INPUT_PATH, INPUT_FORMAT,
env.getLastJobExecutionResult().getNetRuntime(TimeUnit.SECONDS));
writeToCSVFile(head, tail);
}
}
Loading

0 comments on commit 32412ec

Please sign in to comment.