Shortest Paths Example

We are going to delve into a full Giraph example using the single source shortest paths algorithm. The actual code is part of the examples included in Giraph SimpleShortestPathsVertex. We will also define a very simple input and output format for the graph.

Be aware of the version 0.1 or 0.2 - in the later one the class SimpleShortestPathsVertex does not have a main() method, so you have to run it via the ToolRunner.

First, let’s start with the VertexInputFormat. In this example, we have extended VertexInputFormat to produce our own simple JSON based input format. VertexInputFormat is very similar to the Hadoop InputFormat. The relevant code snippet is below:

public static class SimpleShortestPathsVertexInputFormat extends
        TextVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
    @Override
    public VertexReader<LongWritable, DoubleWritable, FloatWritable>
            createVertexReader(InputSplit split,
                               TaskAttemptContext context)
                               throws IOException {
        return new SimpleShortestPathsVertexReader(
            textInputFormat.createRecordReader(split, context));
    }
}

public static class SimpleShortestPathsVertexReader extends
        TextVertexReader<LongWritable, DoubleWritable, FloatWritable> {

    public SimpleShortestPathsVertexReader(
            RecordReader<LongWritable, Text> lineRecordReader) {
        super(lineRecordReader);
    }

    @Override
    public boolean next(MutableVertex<LongWritable,
                        DoubleWritable, FloatWritable, ?> vertex)
            throws IOException, InterruptedException {
        if (!getRecordReader().nextKeyValue()) {
            return false;
        }

        Text line = getRecordReader().getCurrentValue();
        try {
            JSONArray jsonVertex = new JSONArray(line.toString());
            vertex.setVertexId(
                new LongWritable(jsonVertex.getLong(0)));
            vertex.setVertexValue(
                new DoubleWritable(jsonVertex.getDouble(1)));
            JSONArray jsonEdgeArray = jsonVertex.getJSONArray(2);
            for (int i = 0; i < jsonEdgeArray.length(); ++i) {
                JSONArray jsonEdge = jsonEdgeArray.getJSONArray(i);
                Edge<LongWritable, FloatWritable> edge =
                    new Edge<LongWritable, FloatWritable>(
                        new LongWritable(jsonEdge.getLong(0)),
                        new FloatWritable((float) jsonEdge.getDouble(1)));
                vertex.addEdge(edge);
            }
        } catch (JSONException e) {
            throw new IllegalArgumentException(
                "next: Couldn't get vertex from line " + line, e);
        }
        return true;
    }
}

The idea is to split the graph into manageable parts to distribute across the Giraph workers. The first thing that happens is that getSplits() is called by the master and then the workers will process the InputSplit objects with the VertexReader to load their portion of the graph into memory. In this example, we use composition to internally use the TextVertexInputFormat to do most work for us of generating one split per input file in the directory. Then, the SimpleShortestPathsVertexReader can read line by line, one vertex per line. We implemented a very simple description of the graph, a json array that contains the vertex id, a vertex value, then a json array of edges (each of which are a json array of destination vertex id and edge value). For example, [2,100,[[3,200]]] specified the vertex id = 2, vertex value = 100, and there is a single edge (destination vertex 3 with edge value of 200). Also, one more thing to note is that we do load the vertex values but will override them in our application on superstep 0.

We also have to have an output format to store our graph. Hence we have the SimpleShortestPathsVertexOutputFormat.

public static class SimpleShortestPathsVertexOutputFormat extends
        TextVertexOutputFormat<LongWritable, DoubleWritable,
        FloatWritable> {

    @Override
    public VertexWriter<LongWritable, DoubleWritable, FloatWritable>
            createVertexWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {
        RecordWriter<Text, Text> recordWriter =
            textOutputFormat.getRecordWriter(context);
        return new SimpleShortestPathsVertexWriter(recordWriter);
    }
}

public static class SimpleShortestPathsVertexWriter extends
        TextVertexWriter<LongWritable, DoubleWritable, FloatWritable> {
    public SimpleShortestPathsVertexWriter(
            RecordWriter<Text, Text> lineRecordWriter) {
        super(lineRecordWriter);
    }

    @Override
    public void writeVertex(BasicVertex<LongWritable, DoubleWritable,
                            FloatWritable, ?> vertex)
            throws IOException, InterruptedException {
        JSONArray jsonVertex = new JSONArray();
        try {
            jsonVertex.put(vertex.getVertexId().get());
            jsonVertex.put(vertex.getVertexValue().get());
            JSONArray jsonEdgeArray = new JSONArray();
            for (Edge<LongWritable, FloatWritable> edge :
                    vertex.getOutEdgeMap().values()) {
                JSONArray jsonEdge = new JSONArray();
                jsonEdge.put(edge.getDestVertexId().get());
                jsonEdge.put(edge.getEdgeValue().get());
                jsonEdgeArray.put(jsonEdge);
            }
            jsonVertex.put(jsonEdgeArray);
        } catch (JSONException e) {
            throw new IllegalArgumentException(
                "writeVertex: Couldn't write vertex " + vertex);
        }
        getRecordWriter().write(new Text(jsonVertex.toString()), null);
    }
}

The output format basically does the inverse of the input format, writing the vertices out as json arrays.

The next part of the code we focus on is the actual computation executed by every vertex:

private boolean isSource() {
    return (getVertexId().get() ==
        getContext().getConfiguration().getLong(SOURCE_ID,
                                                SOURCE_ID_DEFAULT));
}

@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
    if (getSuperstep() == 0) {
        setVertexValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    while (msgIterator.hasNext()) {
        minDist = Math.min(minDist, msgIterator.next().get());
    }
    if (LOG.isDebugEnabled()) {
        LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
                 " vertex value = " + getVertexValue());
    }
    if (minDist < getVertexValue().get()) {
        setVertexValue(new DoubleWritable(minDist));
        for (Edge<LongWritable, FloatWritable> edge :
                getOutEdgeMap().values()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Vertex " + getVertexId() + " sent to " +
                          edge.getDestVertexId() + " = " +
                          (minDist + edge.getEdgeValue().get()));
            }
            sendMsg(edge.getDestVertexId(),
                    new DoubleWritable(minDist +
                                       edge.getEdgeValue().get()));
        }
    }
    voteToHalt();
}

In the superstep 0, all the vertices initiatlize their vertex values to the maximum value (unreachable). Then, the source vertex will propagate the cost of going to its neighbors. In subsequent supersteps, all vertices will propagate the minimum cost of getting to its neightbors until the application is complete. At this point, all the vertex values reflect the cost of going there from the source vertex.

The final bit of code to discuss here is starting up the job:

@Override
public int run(String[] argArray) throws Exception {
    if (argArray.length != 4) {
        throw new IllegalArgumentException(
            "run: Must have 4 arguments <input path> <output path> " +
            "<source vertex id> <# of workers>");
    }
    GiraphJob job = new GiraphJob(getConf(), getClass().getName());
    job.setVertexClass(getClass());
    job.setVertexInputFormatClass(
        SimpleShortestPathsVertexInputFormat.class);
    job.setVertexOutputFormatClass(
        SimpleShortestPathsVertexOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(argArray[0]));
    FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
    job.getConfiguration().setLong(SimpleShortestPathsVertex.SOURCE_ID,
                                   Long.parseLong(argArray[2]));
    job.setWorkerConfiguration(Integer.parseInt(argArray[3]),
                               Integer.parseInt(argArray[3]),
                               100.0f);
    if (job.run(true) == true) {
        return 0;
    } else {
        return -1;
    }
}

public static void main(String[] args) throws Exception {
    System.exit(ToolRunner.run(new SimpleShortestPathsVertex(), args));
}

Implementing the Tool interface allows us to start Hadoop jobs from ToolRunner. The run method sets up the Giraph job with the appropriate vertex class (computation), vertex input / output formats, workers, etc. These configurations can also be set with generic Hadoop options as well. This is not the only way to start up a Hadoop job, but it’s a simple way.

Great! Now we need some input data. You can create your own, or you can download some example data from shortestPathsInputGraph. Untar the example data with the following command:

tar zxvf shortestPathsInputGraph.tar.gz 
x shortestPathsInputGraph/
x shortestPathsInputGraph/part-m-00001
x shortestPathsInputGraph/part-m-00002
x shortestPathsInputGraph/part-m-00003

Then upload the graph to HDFS:

hadoop fs -copyFromLocal shortestPathsInputGraph shortestPathsInputGraph

Running the example on a hadoop instance:

hadoop jar giraph-0.1-jar-with-dependencies.jar org.apache.giraph.examples.SimpleShortestPathsVertex shortestPathsInputGraph shortestPathsOutputGraph 0 3
11/08/02 12:44:40 INFO mapred.JobClient: Running job: job_201108021212_0026
11/08/02 12:44:41 INFO mapred.JobClient:  map 0% reduce 0%
11/08/02 12:44:58 INFO mapred.JobClient:  map 25% reduce 0%
11/08/02 12:45:04 INFO mapred.JobClient:  map 75% reduce 0%
11/08/02 12:45:10 INFO mapred.JobClient:  map 100% reduce 0%
11/08/02 12:45:15 INFO mapred.JobClient: Job complete: job_201108021212_0026
11/08/02 12:45:15 INFO mapred.JobClient: Counters: 41
11/08/02 12:45:15 INFO mapred.JobClient:   Job Counters 
11/08/02 12:45:15 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=70529
11/08/02 12:45:15 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
11/08/02 12:45:15 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
11/08/02 12:45:15 INFO mapred.JobClient:     Launched map tasks=4
11/08/02 12:45:15 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
11/08/02 12:45:15 INFO mapred.JobClient:   Giraph Timers
11/08/02 12:45:15 INFO mapred.JobClient:     Total (milliseconds)=12462
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 15 (milliseconds)=116
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 3 (milliseconds)=72
11/08/02 12:45:15 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=165
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 4 (milliseconds)=194
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 12 (milliseconds)=146
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 10 (milliseconds)=156
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 11 (milliseconds)=93
11/08/02 12:45:15 INFO mapred.JobClient:     Setup (milliseconds)=6046
11/08/02 12:45:15 INFO mapred.JobClient:     Shutdown (milliseconds)=73
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 7 (milliseconds)=78
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 9 (milliseconds)=78
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 0 (milliseconds)=2104
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 8 (milliseconds)=560
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 14 (milliseconds)=615
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 6 (milliseconds)=1308
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 5 (milliseconds)=140
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 13 (milliseconds)=146
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 2 (milliseconds)=187
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep 1 (milliseconds)=179
11/08/02 12:45:15 INFO mapred.JobClient:   Giraph Stats
11/08/02 12:45:15 INFO mapred.JobClient:     Aggregate edges=15
11/08/02 12:45:15 INFO mapred.JobClient:     Superstep=17
11/08/02 12:45:15 INFO mapred.JobClient:     Current workers=3
11/08/02 12:45:15 INFO mapred.JobClient:     Sent messages=0
11/08/02 12:45:15 INFO mapred.JobClient:     Aggregate finished vertices=15
11/08/02 12:45:15 INFO mapred.JobClient:     Aggregate vertices=15
11/08/02 12:45:15 INFO mapred.JobClient:   File Output Format Counters 
11/08/02 12:45:15 INFO mapred.JobClient:     Bytes Written=0
11/08/02 12:45:15 INFO mapred.JobClient:   FileSystemCounters
11/08/02 12:45:15 INFO mapred.JobClient:     FILE_BYTES_READ=354
11/08/02 12:45:15 INFO mapred.JobClient:     HDFS_BYTES_READ=465
11/08/02 12:45:15 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=86688
11/08/02 12:45:15 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=8554
11/08/02 12:45:15 INFO mapred.JobClient:   File Input Format Counters 
11/08/02 12:45:15 INFO mapred.JobClient:     Bytes Read=0
11/08/02 12:45:15 INFO mapred.JobClient:   Map-Reduce Framework
11/08/02 12:45:15 INFO mapred.JobClient:     Map input records=4
11/08/02 12:45:15 INFO mapred.JobClient:     Spilled Records=0
11/08/02 12:45:15 INFO mapred.JobClient:     Map output records=0
11/08/02 12:45:15 INFO mapred.JobClient:     SPLIT_RAW_BYTES=176

Now let’s check the results.

hadoop fs -ls shortestPathsOutputGraph
Found 5 items
-rw-r--r--   1 aching supergroup          0 2011-08-02 13:20 /user/aching/shortestPathsOutputGraph/_SUCCESS
drwxr-xr-x   - aching supergroup          0 2011-08-02 13:20 /user/aching/shortestPathsOutputGraph/_logs
-rw-r--r--   1 aching supergroup         96 2011-08-02 13:20 /user/aching/shortestPathsOutputGraph/part-m-00001
-rw-r--r--   1 aching supergroup        109 2011-08-02 13:20 /user/aching/shortestPathsOutputGraph/part-m-00002
-rw-r--r--   1 aching supergroup         84 2011-08-02 13:20 /user/aching/shortestPathsOutputGraph/part-m-00003

hadoop fs -cat shortestPathsOutputGraph/part*
[5, 1000, [ [6, 500] ] ]
[6, 1500, [ [7, 600] ] ]
[7, 2100, [ [8, 700] ] ]
[8, 2800, [ [9, 800] ] ]
[9, 3600, [ [10,900] ] ]
[10, 4500, [ [11, 1000] ] ]
[11, 5500, [ [12, 1100] ] ]
[12, 6600, [ [13, 1200] ] ]
[13, 7800, [ [14, 1300] ] ]
[14, 9100, [ [0, 1400] ] ]
[0, 0, [ [1, 0] ] ]
[1, 0, [ [2, 100] ] ]
[2, 100, [ [3, 200] ] ]
[3, 300, [ [4, 300] ] ]
[4, 600, [ [5, 400] ] ]
  • No labels