Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The value of PI can be calculated in a number of ways. In this example, we are estimating PI using following way:

  • Each task executes locally its portion of the loop a number of times.
    {{{
    iterations = 10000
    circle_count = 0

do j = 1,iterations
generate 2 random numbers between 0 and 1
xcoordinate = random1
ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do

...

Consider the following method of estimating PI

  • Inscribe a circle in a square
  • Randomly generate points in the square
  • Determine the number of points in the square that are also in the circle
  • Let r be the number of points in the circle divided by the number of points in the square
  • PI ~ 4 r

Serial pseudo code for this procedure as below:

No Format

iterations = 10000
circle_count = 0

do j = 1,iterations
  generate 2 random numbers between 0 and 1
  xcoordinate = random1
  ycoordinate = random2
  if (xcoordinate, ycoordinate) inside circle
  then circle_count = circle_count + 1
end do

PI = 4.0*circle_count/iterations

...


The BSP implementation for Pi

Parallel strategy is break the loop into portions that can be executed by the tasks.

  • For the task of estimating PI:
    • Each task executes its portion of the loop a number of times.
    • Each task can do its work without requiring any information from the other tasks (there are no data dependencies).
    • One task acts as master and collects the results
    through the BSP communication interface
    • .
No Format

public class PiEstimator {
  private PIstatic =String piMASTER_sumTASK / n_processes

1) Each process computes the value of Pi locally, and 2) sends it to master task using send() function. Then, 3) the master task can recieve the messages using sync() function. Finally, we can calculate the average value of sum of PI values from each peers as below:

Code Block
languagejava

    @Override= "master.task.";

  public static class MyEstimator extends BSP {
    public void bsp(
   static final Log LOG = LogFactory.getLog(MyEstimator.class);
    private Configuration conf;
    private  BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)String masterTask;
    private static final int iterations = 10000;

    @Override
    public void bsp(BSPPeer bspPeer) throws IOException, SyncException,KeeperException,
        InterruptedException {

      int in = 0, out = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        } else {
          out++;
        }
      }

      byte[] tagName = Bytes.toBytes(getName().toString());
      doublebyte[] datamyData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      DoubleMessageBSPMessage estimate = new DoubleMessage(peer.getPeerName()BSPMessage(tagName, datamyData);

      peerbspPeer.send(bspPeer.getAddress(masterTask), estimate);
      peerbspPeer.sync();

      double pi = }

0.0;
      BSPMessage @Overridereceived;
      public void setup(while ((received = bspPeer.getCurrentMessage()) != null) {
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
        throws IOExceptionif(pi == 0.0) {
      // Choose one as a master
    pi = Bytes.toDouble(received.getData());
        } else {
          this.masterTaskpi = (pi + peerBytes.getPeerNametoDouble(peerreceived.getNumPeersgetData())) / 2);
        }
      }

     public voidif cleanup(
(pi != 0.0) {
        LOG.info("\nEstimated value of BSPPeer<NullWritable,PI NullWritable,is Text," DoubleWritable>+ peerpi);
      }
  throws IOException {}

    @Override
   if (peer.getPeerName().equals(masterTask) public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf  double pi= conf;
      this.masterTask = 0.0conf.get(MASTER_TASK);
    }

  }

  intpublic numPeersstatic =void peer.getNumCurrentMessages();
  main(String[] args) throws InterruptedException,
      DoubleMessageIOException received;{
    // BSP job configuration
    whileHamaConfiguration ((receivedconf = new HamaConfiguration(DoubleMessage) peer.getCurrentMessage()) != null) {
;
    // Execute locally
    // conf.set("bsp.master.address", "local");

    BSPJob pibsp += received.getData(new BSPJob(conf, PiEstimator.class);
    // Set the job }

   name
    bsp.setJobName("pi estimation example");
    bsp.setBspClass(MyEstimator.class);

    BSPJobClient pijobClient = pi / numPeersnew BSPJobClient(conf);
    ClusterStatus cluster =  peer
    jobClient.getClusterStatus(true);
    // Choose one as a master
    for (String name : cluster.writegetActiveGroomNames(new Text("Estimated value of PI is"), new DoubleWritable(pi))) {
      conf.set(MASTER_TASK, name);
      break;
    }

    }BSPJobClient.runJob(bsp);
  }
}