...
No Format |
---|
@Override public final void bsp( BSPPeer<LongWritable, Text, KEYOUT, VALUEOUT>VALUEOUT, MESSAGE_TYPE> peer) throws IOException, InterruptedException, SyncException { // this method reads the next key value record from file KeyValuePair<LongWritable, Text> pair = peer.readNext(); // the following lines do the same: LongWritable key = new LongWritable(); Text value = new Text(); peer.readNext(key, value); } |
...
No Format |
---|
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, LongMessage> peer) throws IOException, SyncException, InterruptedException { for (String peerName : peer.getAllPeerNames()) { peer.send(peerName, new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis())); } peer.sync(); } |
...
No Format |
---|
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, Writable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < 100; i++) { // send some messages peer.sync(); } } |
...
No Format |
---|
// enum definition enum LoopCounter{ LOOPS } @Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < iterations; i++) { // details ommitted peer.getCounter(LoopCounter.LOOPS).increment(1L); } // rest ommitted } |
Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.
Setup and Cleanup
Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:
...
No Format |
---|
public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable>IntegerMessage> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < 1000; i++) { peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i)); } peer.sync(); if (peer.getPeerName().equals(masterTask)) { IntegerMessage received; while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) { sum += received.getData(); } } } |
...