Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Each task executes locally its portion of the loop a number of times.
  • One task acts as master and collects the results through the BSP communication interface.
No Format
public class SerializePrinting {
  private static String TMP_OUTPUT = "/tmp/test-example/";

  public static class HelloBSP extends BSP {
    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
    private Configuration conf;
    private final static int PRINT_INTERVAL = 5000;

    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
        KeeperException, InterruptedException {
      int numin = Integer.parseInt(conf.get("bsp.peers.num"));
      FileSystem fileSys = FileSystem.get(conf);

0, out = 0;
      for (int i = 0;
 i <    for (String otherPeer : bspPeer.getAllPeerNames()iterations; i++) {
        if (bspPeer.getPeerName().equals(otherPeer)) {
          LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
              + bspPeer.getPeerName());

          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
              CompressionType.NONE)double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
         if writer.append(new LongWritable(SystemMath.currentTimeMillissqrt()),x new Text(
              "Hello BSP from "* x + (iy +* 1y) + " of " + num + ": "
                  + bspPeer.getPeerName()));< 1.0)) {
          writer.close();
in++;
        }

        Thread.sleep(PRINT_INTERVAL);else {
        bspPeer.sync();
        iout++;
      }
    }

    public Configuration getConf() {
      return conf;}

    }

  byte[] tagName public void setConf(Configuration conf) {
      this.conf = conf= Bytes.toBytes(bspPeer.getPeerName());
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    // Set the task size as a number of GroomServer
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(falsebyte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      BSPMessage estimate = new BSPMessage(tagName, myData);

      bspPeer.send(masterTask, estimate);
    bsp  bspPeer.setNumBspTasksync(cluster.getGroomServers());

     FileSystem double fileSyspi = FileSystem.get(conf)0.0;
    if (fileSys.exists(new Path(TMP_OUTPUT))) {
      fileSys.delete(new Path(TMP_OUTPUT), true int numPeers = bspPeer.getNumCurrentMessages();
    }
    BSPJobClient.runJob(bsp);
BSPMessage received;
    System.out.println("Each task printed the \"Hello World\" as below:");
    for (int i = 0; i < cluster.getGroomServers(); i++while ((received = bspPeer.getCurrentMessage()) != null) {
       SequenceFile.Reader readerpi += new SequenceFileBytes.Reader(fileSys, new Path(
          TMP_OUTPUT + i), conftoDouble(received.getData());
      LongWritable timestamp = new LongWritable();}

      Text message = new Text();
      reader.next(timestamp, message);if (bspPeer.getPeerName().equals(masterTask)) {
      System.out.println(new Date(timestamp.get()) + ": " + message);
 pi = pi / numPeers;
        reader.closewriteResult(pi);
      }
  }
  }