Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The value of PI can be calculated in a number of ways. Consider the following method of In this example, we are estimating PI

  • Inscribe a circle in a square
  • Randomly generate points in the square
  • Determine the number of points in the square that are also in the circle
  • Let r be the number of points in the circle divided by the number of points in the square
  • PI ~ 4 r

Serial pseudo code for this procedure as below:

...

using following way:

  • Each task executes locally its portion of the loop a number of times.
    {{{
    iterations = 10000
    circle_count = 0

do j = 1,iterations
generate 2 random numbers between 0 and 1
xcoordinate = random1
ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do

PI = 4.0*circle_count/iterations

...


}}}

The BSP implementation for Pi

A distributed strategy in HAMA with BSP programming model, is break the loop into portions that can be executed by the tasks.

  • Each task executes its portion of the loop a number of times.
  • Each task can do its work without requiring any information from the other tasks (there are no data dependencies).
  • One task acts as master and collects the results through the BSP communication interface.
No Format
public class PiEstimatorPI {
= pi_sum / n_processes

1) Each process computes the value of Pi locally, and 2) sends it to master task using send() function. Then, 3) the master task can recieve the messages using sync() function. Finally, we can calculate the average value of sum of PI values from each peers as below:

Code Block
languagejava

    @Overrideprivate static String MASTER_TASK = "master.task.";

  public static class MyEstimator extends BSP {
    public static final Log LOG = LogFactory.getLog(MyEstimator.class);void bsp(
    private Configuration conf;
  BSPPeer<NullWritable, NullWritable, privateText, StringDoubleWritable> masterTask;peer)
    private static final int iterations = 10000;

    @Override
    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
       SyncException, InterruptedException {

      int in = 0, out = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        } else {
          out++;
        }
      }

      byte[] tagName = Bytes.toBytes(getName().toString());
      byte[] myDatadouble data = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      BSPMessageDoubleMessage estimate = new BSPMessage(tagNameDoubleMessage(peer.getPeerName(), myDatadata);

      bspPeerpeer.send(bspPeer.getAddress(masterTask), estimate);
      bspPeerpeer.sync();

      double pi = 0.0;}

      BSPMessage received;@Override
    public void while setup((received
 = bspPeer.getCurrentMessage()) != null) {
   BSPPeer<NullWritable, NullWritable, Text,   LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));DoubleWritable> peer)
        if(pi == 0.0) throws IOException {
      // Choose one as pi = Bytes.toDouble(received.getData());a master
        } else {
          pithis.masterTask = (pi + Bytes.toDouble(received.getData()))peer.getPeerName(peer.getNumPeers() / 2);
        }
      }

    public void if (pi != 0.0) {cleanup(
        LOG.info("\nEstimated value of PI is " + pi);BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)
      }
  throws IOException }{

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration confif (peer.getPeerName().equals(masterTask)) {
      this.conf = conf;
      this.masterTaskdouble pi = conf.get(MASTER_TASK)0.0;
    }

  }

  publicint staticnumPeers void= main(String[] args) throws InterruptedException,
peer.getNumCurrentMessages();
        IOExceptionDoubleMessage {received;
    // BSP job configuration
 while   HamaConfiguration conf ((received = new HamaConfiguration();
    // Execute locally
    // conf.set("bsp.master.address", "local");

(DoubleMessage) peer.getCurrentMessage()) != null) {
         BSPJob bsppi += new BSPJob(conf, PiEstimator.classreceived.getData();
    // Set the job name
    bsp.setJobName("pi estimation example");
    bsp.setBspClass(MyEstimator.class);

}

       BSPJobClient jobClientpi = pi new BSPJobClient(conf)/ numPeers;
     ClusterStatus cluster = jobClient.getClusterStatus(true);
    // Choose one as a master
 peer
        for (String name : cluster.getActiveGroomNameswrite()) {
      conf.set(MASTER_TASK, namenew Text("Estimated value of PI is"), new DoubleWritable(pi));
      break;}
    }

    BSPJobClient.runJob(bsp);
  }
}