...
The value of PI can be calculated in a number of ways. Consider the following method of In this example, we are estimating PI
- Inscribe a circle in a square
- Randomly generate points in the square
- Determine the number of points in the square that are also in the circle
- Let r be the number of points in the circle divided by the number of points in the square
- PI ~ 4 r
Serial pseudo code for this procedure as below:
...
using following way:
- Each task executes locally its portion of the loop a number of times.
{{{
iterations = 10000
circle_count = 0
do j = 1,iterations
generate 2 random numbers between 0 and 1
xcoordinate = random1
ycoordinate = random2
if (xcoordinate, ycoordinate) inside circle
then circle_count = circle_count + 1
end do
PI = 4.0*circle_count/iterations
...
}}}
The BSP implementation for Pi
Parallel strategy is break the loop into portions that can be executed by the tasks.
- For the task of estimating PI:
- Each task executes its portion of the loop a number of times.
- Each task can do its work without requiring any information from the other tasks (there are no data dependencies). One task acts as master and collects the results through the BSP communication interface.
No Format |
---|
public class PiEstimatorPI { = pi_sum / n_processes |
1) Each process computes the value of Pi locally, and 2) sends it to master task using send() function. Then, 3) the master task can recieve the messages using sync() function. Finally, we can calculate the average value of sum of PI values from each peers as below:
Code Block | ||
---|---|---|
| ||
@Overrideprivate static String MASTER_TASK = "master.task."; public static class MyEstimator extends BSP { public static final Log LOG = LogFactory.getLog(MyEstimator.class);void bsp( private Configuration conf; BSPPeer<NullWritable, NullWritable, privateText, StringDoubleWritable> masterTask;peer) private static final int iterations = 10000; @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, SyncException, InterruptedException { int in = 0, out = 0; for (int i = 0; i < iterations; i++) { double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; if ((Math.sqrt(x * x + y * y) < 1.0)) { in++; } else { out++; } } byte[] tagName = Bytes.toBytes(getName().toString()); byte[] myDatadouble data = Bytes.toBytes(4.0 * (double) in / (double) iterations); BSPMessageDoubleMessage estimate = new BSPMessage(tagNameDoubleMessage(peer.getPeerName(), myDatadata); bspPeerpeer.send(bspPeer.getAddress(masterTask), estimate); bspPeerpeer.sync(); double pi = 0.0;} BSPMessage received;@Override public void while setup((received = bspPeer.getCurrentMessage()) != null) { BSPPeer<NullWritable, NullWritable, Text, LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));DoubleWritable> peer) if(pi == 0.0) throws IOException { // Choose one as pi = Bytes.toDouble(received.getData());a master } else { pithis.masterTask = (pi + Bytes.toDouble(received.getData()))peer.getPeerName(peer.getNumPeers() / 2); } } public void if (pi != 0.0) {cleanup( LOG.info("\nEstimated value of PI is " + pi);BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) } throws IOException }{ @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration confif (peer.getPeerName().equals(masterTask)) { this.conf = conf; this.masterTaskdouble pi = conf.get(MASTER_TASK)0.0; } } publicint staticnumPeers void= main(String[] args) throws InterruptedException, peer.getNumCurrentMessages(); IOExceptionDoubleMessage {received; // BSP job configuration while HamaConfiguration conf ((received = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); (DoubleMessage) peer.getCurrentMessage()) != null) { BSPJob bsppi += new BSPJob(conf, PiEstimator.classreceived.getData(); // Set the job name bsp.setJobName("pi estimation example"); bsp.setBspClass(MyEstimator.class); } BSPJobClient jobClientpi = pi new BSPJobClient(conf)/ numPeers; ClusterStatus cluster = jobClient.getClusterStatus(true); // Choose one as a master peer for (String name : cluster.getActiveGroomNameswrite()) { conf.set(MASTER_TASK, namenew Text("Estimated value of PI is"), new DoubleWritable(pi)); break;} } BSPJobClient.runJob(bsp); } } |