...
- Each task executes locally its portion of the loop a number of times.
- One task acts as master and collects the results through the BSP communication interface.
No Format |
---|
public class SerializePrinting { private static String TMP_OUTPUT = "/tmp/test-example/"; public static class HelloBSP extends BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); private Configuration conf; private final static int PRINT_INTERVAL = 5000; public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException, InterruptedException { int numin = Integer.parseInt(conf.get("bsp.peers.num")); FileSystem fileSys = FileSystem.get(conf); 0, out = 0; for (int i = 0; i < for (String otherPeer : bspPeer.getAllPeerNames()iterations; i++) { if (bspPeer.getPeerName().equals(otherPeer)) { LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + bspPeer.getPeerName()); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, CompressionType.NONE)double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; if writer.append(new LongWritable(SystemMath.currentTimeMillissqrt()),x new Text( "Hello BSP from "* x + (iy +* 1y) + " of " + num + ": " + bspPeer.getPeerName()));< 1.0)) { writer.close(); in++; } Thread.sleep(PRINT_INTERVAL);else { bspPeer.sync(); iout++; } } public Configuration getConf() { return conf;} } byte[] tagName public void setConf(Configuration conf) { this.conf = conf= Bytes.toBytes(bspPeer.getPeerName()); } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(falsebyte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations); BSPMessage estimate = new BSPMessage(tagName, myData); bspPeer.send(masterTask, estimate); bsp bspPeer.setNumBspTasksync(cluster.getGroomServers()); FileSystem double fileSyspi = FileSystem.get(conf)0.0; if (fileSys.exists(new Path(TMP_OUTPUT))) { fileSys.delete(new Path(TMP_OUTPUT), true int numPeers = bspPeer.getNumCurrentMessages(); } BSPJobClient.runJob(bsp); BSPMessage received; System.out.println("Each task printed the \"Hello World\" as below:"); for (int i = 0; i < cluster.getGroomServers(); i++while ((received = bspPeer.getCurrentMessage()) != null) { SequenceFile.Reader readerpi += new SequenceFileBytes.Reader(fileSys, new Path( TMP_OUTPUT + i), conftoDouble(received.getData()); LongWritable timestamp = new LongWritable();} Text message = new Text(); reader.next(timestamp, message);if (bspPeer.getPeerName().equals(masterTask)) { System.out.println(new Date(timestamp.get()) + ": " + message); pi = pi / numPeers; reader.closewriteResult(pi); } } } |