...
The value of PI can be calculated in a number of ways. In this example, we are estimating PI using following way:
- Each task executes locally its portion of the loop a number of times.
{{{
iterations = 10000
circle_count = 0
do j = 1,iterations
generate 2 random numbers between 0 and 1
xcoordinate = random1
ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do
...
Consider the following method of estimating PI
- Inscribe a circle in a square
- Randomly generate points in the square
- Determine the number of points in the square that are also in the circle
- Let r be the number of points in the circle divided by the number of points in the square
- PI ~ 4 r
Serial pseudo code for this procedure as below:
No Format |
---|
iterations = 10000 circle_count = 0 do j = 1,iterations generate 2 random numbers between 0 and 1 xcoordinate = random1 ycoordinate = random2 if (xcoordinate, ycoordinate) inside circle then circle_count = circle_count + 1 end do PI = 4.0*circle_count/iterations |
...
|
The BSP implementation for Pi
Parallel strategy is break the loop into portions that can be executed by the tasks.
- For the task of estimating PI:
- Each task executes its portion of the loop a number of times.
- Each task can do its work without requiring any information from the other tasks (there are no data dependencies).
- One task acts as master and collects the results
- .
No Format |
---|
public class PiEstimator { private PIstatic =String piMASTER_sumTASK / n_processes |
1) Each process computes the value of Pi locally, and 2) sends it to master task using send() function. Then, 3) the master task can recieve the messages using sync() function. Finally, we can calculate the average value of sum of PI values from each peers as below:
Code Block | ||
---|---|---|
| ||
@Override= "master.task."; public static class MyEstimator extends BSP { public void bsp( static final Log LOG = LogFactory.getLog(MyEstimator.class); private Configuration conf; private BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)String masterTask; private static final int iterations = 10000; @Override public void bsp(BSPPeer bspPeer) throws IOException, SyncException,KeeperException, InterruptedException { int in = 0, out = 0; for (int i = 0; i < iterations; i++) { double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; if ((Math.sqrt(x * x + y * y) < 1.0)) { in++; } else { out++; } } byte[] tagName = Bytes.toBytes(getName().toString()); doublebyte[] datamyData = Bytes.toBytes(4.0 * (double) in / (double) iterations); DoubleMessageBSPMessage estimate = new DoubleMessage(peer.getPeerName()BSPMessage(tagName, datamyData); peerbspPeer.send(bspPeer.getAddress(masterTask), estimate); peerbspPeer.sync(); double pi = } 0.0; BSPMessage @Overridereceived; public void setup(while ((received = bspPeer.getCurrentMessage()) != null) { BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer)LOG.info("Receives messages:" + Bytes.toDouble(received.getData())); throws IOExceptionif(pi == 0.0) { // Choose one as a master pi = Bytes.toDouble(received.getData()); } else { this.masterTaskpi = (pi + peerBytes.getPeerNametoDouble(peerreceived.getNumPeersgetData())) / 2); } } public voidif cleanup( (pi != 0.0) { LOG.info("\nEstimated value of BSPPeer<NullWritable,PI NullWritable,is Text," DoubleWritable>+ peerpi); } throws IOException {} @Override if (peer.getPeerName().equals(masterTask) public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf double pi= conf; this.masterTask = 0.0conf.get(MASTER_TASK); } } intpublic numPeersstatic =void peer.getNumCurrentMessages(); main(String[] args) throws InterruptedException, DoubleMessageIOException received;{ // BSP job configuration whileHamaConfiguration ((receivedconf = new HamaConfiguration(DoubleMessage) peer.getCurrentMessage()) != null) { ; // Execute locally // conf.set("bsp.master.address", "local"); BSPJob pibsp += received.getData(new BSPJob(conf, PiEstimator.class); // Set the job } name bsp.setJobName("pi estimation example"); bsp.setBspClass(MyEstimator.class); BSPJobClient pijobClient = pi / numPeersnew BSPJobClient(conf); ClusterStatus cluster = peer jobClient.getClusterStatus(true); // Choose one as a master for (String name : cluster.writegetActiveGroomNames(new Text("Estimated value of PI is"), new DoubleWritable(pi))) { conf.set(MASTER_TASK, name); break; } }BSPJobClient.runJob(bsp); } } |