Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Each task executes locally its portion of the loop a number of times.
  • One task acts as master and collects the results through the BSP communication interface.
No Format
public class PiEstimatorSerializePrinting {
  private static String MASTERTMP_TASKOUTPUT = "master.task./tmp/test-example/";

  public static class MyEstimatorHelloBSP extends BSP {
    public static final Log LOG = LogFactory.getLog(MyEstimatorHelloBSP.class);
    private Configuration conf;
    private String masterTask;
    private final static final int iterationsPRINT_INTERVAL = 100005000;

    @Override
    public void bsp(BSPPeerBSPPeerProtocol bspPeer) throws IOException,
 KeeperException,
       KeeperException, InterruptedException {
      int innum = 0, out = 0Integer.parseInt(conf.get("bsp.peers.num"));
      forFileSystem (intfileSys i = 0; i < iterations; i++) {FileSystem.get(conf);

      int  double xi = 2.0;
  * Math.random() - 1.0, yfor = 2.0 * Math.random() - 1.0;(String otherPeer : bspPeer.getAllPeerNames()) {
        if ((MathbspPeer.sqrt(x * x + y * y) < 1.0getPeerName().equals(otherPeer)) {
          in++;
        } else {
          out++;
        }LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
      }

      byte[] tagName =+ Bytes.toBytes(bspPeer.getPeerName());

       byte[] myData   SequenceFile.Writer writer = BytesSequenceFile.toBytes(4.0 * (double) in / (double) iterations);
createWriter(fileSys, conf,
           BSPMessage estimate = new BSPMessage(tagName, myData);

      bspPeer.send(masterTask, estimate);Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
      bspPeer.sync();

      double pi = 0CompressionType.0NONE);
      BSPMessage received;
      while ((received = bspPeer.getCurrentMessagewriter.append(new LongWritable(System.currentTimeMillis()), != null) {new Text(
        LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
   "Hello BSP from " + if(pii == 0.0+ 1) {
+ " of " + num +    pi = Bytes.toDouble(received.getData());
  ": "
      } else {
          pi = (pi + BytesbspPeer.toDoublegetPeerName(received.getData()));
 / 2;
        }writer.close();

        }

      if (pi != 0.0) { Thread.sleep(PRINT_INTERVAL);
        LOGbspPeer.info("\nEstimated value of PI is " + pi)sync();
        i++;
      }
    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;
      this.masterTask = conf.get(MASTER_TASK);
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();
    // Execute locally
    // conf.set("bsp.master.address", "local");

    BSPJob bsp = new BSPJob(conf, PiEstimatorSerializePrinting.class);
    // Set the job name
    bsp.setJobName("piserialize estimation exampleprinting");
    bsp.setBspClass(MyEstimatorHelloBSP.class);

    // Set the task size as a number of GroomServer
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(false);
    bsp.setNumBspTask(cluster.getGroomServers());

    FileSystem fileSys = FileSystem.get(conf);
    if (fileSys.exists(new Path(TMP_OUTPUT))) {
      fileSys.delete(new Path(TMP_OUTPUT), true);
    }
    // Choose one as a masterBSPJobClient.runJob(bsp);

    System.out.println("Each task printed the \"Hello World\" as below:");
    for (String peerName :int i = 0; i < cluster.getActiveGroomNamesgetGroomServers().values()); i++) {
      SequenceFile.Reader reader = new conf.set(MASTER_TASK, peerName);
      break;
    }

    BSPJobClient.runJob(bsp);SequenceFile.Reader(fileSys, new Path(
          TMP_OUTPUT + i), conf);
      LongWritable timestamp = new LongWritable();
      Text message = new Text();
      reader.next(timestamp, message);
      System.out.println(new Date(timestamp.get()) + ": " + message);
      reader.close();
    }
  }
}