Hama Pipes is equivalent to Hadoop Pipes and offers the possibility to use Hama with C/C++.
Installation
You can compile Hama Pipes by executing the following commands:
cd $HAMA_HOME/c++/utils ./configure make install cd $HAMA_HOME/c++/pipes ./configure make install
Interface
Hama Pipes provides the following methods for C/C++ integration: (similar to the BSPModel)
Function |
Description |
|
Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order. |
|
Returns a message from the peer's received messages queue (a FIFO). |
|
Returns the number of messages in the peer's received messages queue. |
|
Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers. |
|
Returns the count of current super-step. |
|
Returns the name of this peer in the format "hostname:port". |
|
Returns the name of n-th peer from sorted array by name. |
|
Returns the index of this peer from sorted array by name. |
|
Returns the names of all the peers executing tasks from the same job (including this peer). |
|
Returns the number of peers. |
|
Clears all queues entries. |
|
Writes a key/value pair to the output collector. |
|
Deserializes the next input key value into the given objects. |
|
Closes the input and opens it right away, so that the file pointer is at the beginning again. |
The following additional methods support access to SequenceFiles under C/C++:
Function |
Description |
|
Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID. |
|
Reads the next key/value pair from the SequenceFile. |
|
Appends the next key/value pair to the SequenceFile. |
|
Closes a SequenceFile. |
C++ BSP example
Finally here is the Pi Estimator example implemented with Hama Pipes:
#include "hama/Pipes.hh" #include "hama/TemplateFactory.hh" #include "hadoop/StringUtils.hh" #include <time.h> #include <math.h> #include <string> #include <iostream> #include <cstdlib> using std::string; using std::cout; using HamaPipes::BSP; using HamaPipes::BSPContext; using namespace HadoopUtils; class PiCalculationBSP: public BSP { private: string masterTask; int iterations; public: PiCalculationBSP(BSPContext& context) { iterations = 10000; } inline double closed_interval_rand(double x0, double x1) { return x0 + (x1 - x0) * rand() / ((double) RAND_MAX); } void bsp(BSPContext& context) { // initialize random seed srand(time(NULL)); int in = 0; for (int i = 0; i < iterations; i++) { //rand() -> greater than or equal to 0.0 and less than 1.0. double x = 2.0 * closed_interval_rand(0, 1) - 1.0; double y = 2.0 * closed_interval_rand(0, 1) - 1.0; if (sqrt(x * x + y * y) < 1.0) { in++; } } double data = 4.0 * in / iterations; context.sendMessage(masterTask, toString(data)); context.sync(); } void setup(BSPContext& context) { // Choose one as a master masterTask = context.getPeerName(context.getNumPeers() / 2); } void cleanup(BSPContext& context) { if (context.getPeerName().compare(masterTask)==0) { double pi = 0.0; int msgCount = context.getNumCurrentMessages(); string received; for (int i=0; i<msgCount; i++) { string received = context.getCurrentMessage(); pi += toDouble(received); } pi = pi / msgCount; //msgCount = numPeers context.write("Estimated value of PI is", toString(pi)); } } }; int main(int argc, char *argv[]) { return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>()); }
Makefile for this example:
CC = g++ CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include PiCalculation: PiCalculation.cc $(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@ clean: rm -f PiCalculation
The corresponding job configuration PiCalculation_job.xml
looks like that:
<?xml version="1.0"?> <configuration> <property> // Set the binary path on DFS <name>hama.pipes.executable</name> <value>bin/PiCalculation</value> </property> <property> <name>hama.pipes.java.recordreader</name> <value>true</value> </property> <property> <name>hama.pipes.java.recordwriter</name> <value>true</value> </property> <property> <name>bsp.input.format.class</name> <value>org.apache.hama.bsp.NullInputFormat</value> </property> <property> <name>bsp.output.format.class</name> <value>org.apache.hama.bsp.TextOutputFormat</value> </property> <property> <name>hama.pipes.logging</name> <value>false</value> </property> <property> <name>bsp.peers.num</name> <value>10</value> </property> </configuration>
Finally the PiCalculation example can be submitted with these commands:
# delete output dir hadoop dfs -rmr output/PiCalculation # copy piCalculation binary to HDFS hadoop dfs -rmr bin/PiCalculation hadoop dfs -put PiCalculation bin/PiCalculation # submit job hama pipes -conf PiCalculation_job.xml -output output/PiCalculation