Pi Estimator
The value of PI can be calculated in a number of ways. Consider the following method of estimating PI
- Inscribe a circle in a square
- Randomly generate points in the square
- Determine the number of points in the square that are also in the circle
- Let r be the number of points in the circle divided by the number of points in the square
- PI ~ 4 r
Serial pseudo code for this procedure as below:
iterations = 10000 circle_count = 0 do j = 1,iterations generate 2 random numbers between 0 and 1 xcoordinate = random1 ycoordinate = random2 if (xcoordinate, ycoordinate) inside circle then circle_count = circle_count + 1 end do PI = 4.0*circle_count/iterations
The BSP implementation for Pi
A distributed strategy in HAMA with BSP programming model, is break the loop into portions that can be executed by the tasks.
- Each task executes locally its portion of the loop a number of times.
- One task acts as master and collects the results through the BSP communication interface.
public class SerializePrinting { private static String TMP_OUTPUT = "/tmp/test-example/"; public static class HelloBSP extends BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); private Configuration conf; private final static int PRINT_INTERVAL = 5000; public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException, InterruptedException { int num = Integer.parseInt(conf.get("bsp.peers.num")); FileSystem fileSys = FileSystem.get(conf); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { if (bspPeer.getPeerName().equals(otherPeer)) { LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " + bspPeer.getPeerName()); SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, CompressionType.NONE); writer.append(new LongWritable(System.currentTimeMillis()), new Text( "Hello BSP from " + (i + 1) + " of " + num + ": " + bspPeer.getPeerName())); writer.close(); } Thread.sleep(PRINT_INTERVAL); bspPeer.sync(); i++; } } public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(false); bsp.setNumBspTask(cluster.getGroomServers()); FileSystem fileSys = FileSystem.get(conf); if (fileSys.exists(new Path(TMP_OUTPUT))) { fileSys.delete(new Path(TMP_OUTPUT), true); } BSPJobClient.runJob(bsp); System.out.println("Each task printed the \"Hello World\" as below:"); for (int i = 0; i < cluster.getGroomServers(); i++) { SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( TMP_OUTPUT + i), conf); LongWritable timestamp = new LongWritable(); Text message = new Text(); reader.next(timestamp, message); System.out.println(new Date(timestamp.get()) + ": " + message); reader.close(); } } }