Skip to content

Commit

Permalink
GIRAPH-600: Create an option to do output during computation (majakab…
Browse files Browse the repository at this point in the history
…iljo)
  • Loading branch information
Maja Kabiljo committed Mar 30, 2013
1 parent da2a687 commit 67f5f74
Show file tree
Hide file tree
Showing 15 changed files with 521 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Giraph Change Log

Release 0.2.0 - unreleased
GIRAPH-600: Create an option to do output during computation (majakabiljo)

GIRAPH-599: Hive IO dependency issues with some Hadoop profiles (nitay via majakabiljo)

GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
Expand Down Expand Up @@ -222,6 +223,13 @@ void exchangeVertexPartitions(
*/
void prepareSuperstep();

/**
* Get the superstep output class
*
* @return SuperstepOutput
*/
SuperstepOutput<I, V, E> getSuperstepOutput();

/**
* Clean up the service (no calls may be issued after this)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,46 @@ public final void setVertexOutputFormatClass(
VERTEX_OUTPUT_FORMAT_CLASS.set(this, vertexOutputFormatClass);
}

/**
* Check if output should be done during computation
*
* @return True iff output should be done during computation
*/
public final boolean doOutputDuringComputation() {
return DO_OUTPUT_DURING_COMPUTATION.get(this);
}

/**
* Set whether or not we should do output during computation
*
* @param doOutputDuringComputation True iff we want output to happen
* during computation
*/
public final void setDoOutputDuringComputation(
boolean doOutputDuringComputation) {
DO_OUTPUT_DURING_COMPUTATION.set(this, doOutputDuringComputation);
}

/**
* Check if VertexOutputFormat is thread-safe
*
* @return True iff VertexOutputFormat is thread-safe
*/
public final boolean vertexOutputFormatThreadSafe() {
return VERTEX_OUTPUT_FORMAT_THREAD_SAFE.get(this);
}

/**
* Set whether or not selected VertexOutputFormat is thread-safe
*
* @param vertexOutputFormatThreadSafe True iff selected VertexOutputFormat
* is thread-safe
*/
public final void setVertexOutputFormatThreadSafe(
boolean vertexOutputFormatThreadSafe) {
VERTEX_OUTPUT_FORMAT_THREAD_SAFE.set(this, vertexOutputFormatThreadSafe);
}

/**
* Set the vertex combiner class (optional)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ public interface GiraphConstants {
ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
ClassConfOption.create("giraph.vertexOutputFormatClass", null,
VertexOutputFormat.class);
/**
* If you use this option, instead of having saving vertices in the end of
* application, saveVertex will be called right after each vertex.compute()
* is called.
* NOTE: This feature doesn't work well with checkpointing - if you restart
* from a checkpoint you won't have any ouptut from previous supresteps.
*/
BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
new BooleanConfOption("giraph.doOutputDuringComputation", false);
/**
* Vertex output format thread-safe - if your VertexOutputFormat allows
* several vertexWriters to be created and written to in parallel,
* you should set this to true.
*/
BooleanConfOption VERTEX_OUTPUT_FORMAT_THREAD_SAFE =
new BooleanConfOption("giraph.vertexOutputFormatThreadSafe", false);

/** Output Format Path (for Giraph-on-YARN) */
String GIRAPH_OUTPUT_DIR = "giraph.output.dir";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
Expand All @@ -50,6 +54,7 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Progressable;

import static org.apache.giraph.conf.GiraphConstants.USE_UNSAFE_SERIALIZATION;
Expand Down Expand Up @@ -200,6 +205,25 @@ public VertexOutputFormat<I, V, E> createVertexOutputFormat() {
return ReflectionUtils.newInstance(klass, this);
}

/**
* Create the proper superstep output, based on the configuration settings.
*
* @param context Mapper context
* @return SuperstepOutput
*/
public SuperstepOutput<I, V, E> createSuperstepOutput(
Mapper<?, ?, ?, ?>.Context context) {
if (doOutputDuringComputation()) {
if (vertexOutputFormatThreadSafe()) {
return new MultiThreadedSuperstepOutput<I, V, E>(this, context);
} else {
return new SynchronizedSuperstepOutput<I, V, E>(this, context);
}
} else {
return new NoOpSuperstepOutput<I, V, E>();
}
}

/**
* Does the job have an {@link EdgeInputFormat}?
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.io.SimpleVertexWriter;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
/** Sends the messages (unique per Callable) */
private WorkerClientRequestProcessor<I, V, E, M>
workerClientRequestProcessor;
/** VertexWriter for this ComputeCallable */
private SimpleVertexWriter<I, V, E> vertexWriter;
/** Get the start time in nanos */
private final long startNanos = TIME.getNanoseconds();

Expand Down Expand Up @@ -143,6 +146,8 @@ public Collection<PartitionStats> call() {
context, graphState.getGraphTaskManager(), workerClientRequestProcessor,
aggregatorUsage);

vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();

List<PartitionStats> partitionStatsList = Lists.newArrayList();
while (!partitionIdQueue.isEmpty()) {
Integer partitionId = partitionIdQueue.poll();
Expand All @@ -165,11 +170,17 @@ public Collection<PartitionStats> call() {
} catch (IOException e) {
throw new IllegalStateException("call: Caught unexpected IOException," +
" failing.", e);
} catch (InterruptedException e) {
throw new IllegalStateException("call: Caught unexpected " +
"InterruptedException, failing.", e);
} finally {
serviceWorker.getPartitionStore().putPartition(partition);
}
}

// Return VertexWriter after the usage
serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);

if (LOG.isInfoEnabled()) {
float seconds = Times.getNanosSince(TIME, startNanos) /
Time.NS_PER_SECOND_AS_FLOAT;
Expand All @@ -193,7 +204,7 @@ public Collection<PartitionStats> call() {
* @return Partition stats for this computed partition
*/
private PartitionStats computePartition(Partition<I, V, E, M> partition)
throws IOException {
throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0);
// Make sure this is thread-safe across runs
Expand Down Expand Up @@ -225,6 +236,8 @@ private PartitionStats computePartition(Partition<I, V, E, M> partition)
}
// Need to unwrap the mutated edges (possibly)
vertex.unwrapMutableEdges();
// Write vertex to superstep output (no-op if it is not used)
vertexWriter.writeVertex(vertex);
// Need to save the vertex changes (possibly)
partition.saveVertex(vertex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ public void execute() throws IOException, InterruptedException {
/**
* Handle post-application callbacks.
*/
private void postApplication() {
private void postApplication() throws IOException, InterruptedException {
GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
serviceWorker.getWorkerContext().postApplication();
serviceWorker.getSuperstepOutput().postApplication();
postAppTimerContext.stop();
context.progress();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.io;

import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.IOException;

/**
* Interface which can only write vertices
*
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
*/
public interface SimpleVertexWriter<I extends WritableComparable,
V extends Writable, E extends Writable> {
/**
* Writes the next vertex and associated data
*
* @param vertex set the properties of this vertex
* @throws IOException
* @throws InterruptedException
*/
void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
InterruptedException;
}
13 changes: 1 addition & 12 deletions giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;

import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand All @@ -34,7 +33,7 @@
*/
@SuppressWarnings("rawtypes")
public interface VertexWriter<I extends WritableComparable, V extends Writable,
E extends Writable> {
E extends Writable> extends SimpleVertexWriter<I, V, E> {
/**
* Use the context to setup writing the vertices.
* Guaranteed to be called prior to any other function.
Expand All @@ -46,16 +45,6 @@ public interface VertexWriter<I extends WritableComparable, V extends Writable,
void initialize(TaskAttemptContext context) throws IOException,
InterruptedException;

/**
* Writes the next vertex and associated data
*
* @param vertex set the properties of this vertex
* @throws IOException
* @throws InterruptedException
*/
void writeVertex(Vertex<I, V, E, ?> vertex)
throws IOException, InterruptedException;

/**
* Close this {@link VertexWriter} to future operations.
*
Expand Down
Loading

0 comments on commit 67f5f74

Please sign in to comment.