...
- Each task executes locally its portion of the loop a number of times.
- One task acts as master and collects the results through the BSP communication interface.
No Format |
---|
public class PiEstimatorSerializePrinting { private static String MASTERTMP_TASKOUTPUT = "master.task./tmp/test-example/"; public static class MyEstimatorHelloBSP extends BSP { public static final Log LOG = LogFactory.getLog(MyEstimatorHelloBSP.class); private Configuration conf; private String masterTask; private final static final int iterationsPRINT_INTERVAL = 100005000; @Override public void bsp(BSPPeerBSPPeerProtocol bspPeer) throws IOException, KeeperException, KeeperException, InterruptedException { int innum = 0, out = 0Integer.parseInt(conf.get("bsp.peers.num")); forFileSystem (intfileSys i = 0; i < iterations; i++) {FileSystem.get(conf); int double xi = 2.0; * Math.random() - 1.0, yfor = 2.0 * Math.random() - 1.0;(String otherPeer : bspPeer.getAllPeerNames()) { if ((MathbspPeer.sqrt(x * x + y * y) < 1.0getPeerName().equals(otherPeer)) { in++; } else { out++; }LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " } byte[] tagName =+ Bytes.toBytes(bspPeer.getPeerName()); byte[] myData SequenceFile.Writer writer = BytesSequenceFile.toBytes(4.0 * (double) in / (double) iterations); createWriter(fileSys, conf, BSPMessage estimate = new BSPMessage(tagName, myData); bspPeer.send(masterTask, estimate);Path(TMP_OUTPUT + i), LongWritable.class, Text.class, bspPeer.sync(); double pi = 0CompressionType.0NONE); BSPMessage received; while ((received = bspPeer.getCurrentMessagewriter.append(new LongWritable(System.currentTimeMillis()), != null) {new Text( LOG.info("Receives messages:" + Bytes.toDouble(received.getData())); "Hello BSP from " + if(pii == 0.0+ 1) { + " of " + num + pi = Bytes.toDouble(received.getData()); ": " } else { pi = (pi + BytesbspPeer.toDoublegetPeerName(received.getData())); / 2; }writer.close(); } if (pi != 0.0) { Thread.sleep(PRINT_INTERVAL); LOGbspPeer.info("\nEstimated value of PI is " + pi)sync(); i++; } } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; this.masterTask = conf.get(MASTER_TASK); } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); BSPJob bsp = new BSPJob(conf, PiEstimatorSerializePrinting.class); // Set the job name bsp.setJobName("piserialize estimation exampleprinting"); bsp.setBspClass(MyEstimatorHelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(false); bsp.setNumBspTask(cluster.getGroomServers()); FileSystem fileSys = FileSystem.get(conf); if (fileSys.exists(new Path(TMP_OUTPUT))) { fileSys.delete(new Path(TMP_OUTPUT), true); } // Choose one as a masterBSPJobClient.runJob(bsp); System.out.println("Each task printed the \"Hello World\" as below:"); for (String peerName :int i = 0; i < cluster.getActiveGroomNamesgetGroomServers().values()); i++) { SequenceFile.Reader reader = new conf.set(MASTER_TASK, peerName); break; } BSPJobClient.runJob(bsp);SequenceFile.Reader(fileSys, new Path( TMP_OUTPUT + i), conf); LongWritable timestamp = new LongWritable(); Text message = new Text(); reader.next(timestamp, message); System.out.println(new Date(timestamp.get()) + ": " + message); reader.close(); } } } |