Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added MapReduceDriver example.

...

The corresponding Mapper and Reducer are

Code Block
java
java

public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  private Text status = new Text();
  private final static IntWritable addOne = new IntWritable(1);

  /**
   * Returns the SMS status code and its count
   */
  protected void map(LongWritable key, Text value, Context context)
      throws java.io.IOException, InterruptedException {

    //655209;1;796764372490213;804422938115889;6 is the Sample record format
    String[] line = value.toString().split(";");
    // If record is of SMS CDR
    if (Integer.parseInt(line[1]) == 1) {
      status.set(line[4]);
      context.write(status, addOne);
    }
  }
}

The corresponding Reducer code is

Code Block
java
java

public class SMSCDRReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {

  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
      sum += value.get();
    }
    context.write(key, new IntWritable(sum));
  }
}

The MRUnit test class for the Mapper, Reduce and full MapReduce is

Code Block
java
java

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class SMSCDRMapperReducerTest {

  MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
  ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
  MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

  @Before
  public void setUp() {
    SMSCDRMapper mapper = new SMSCDRMapper();
    SMSCDRReducer reducer = new SMSCDRReducer();
    mapDriver = MapDriver.newMapDriver(mapper);;
    reduceDriver = ReduceDriver.newReduceDriver(reducer);
    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
  }

  @Test
  public void testMapper() {
    mapDriver.withInput(new LongWritable(), new Text(
        "655209;1;796764372490213;804422938115889;6"));
    mapDriver.withOutput(new Text("6"), new IntWritable(1));
    mapDriver.runTest();
  }

  @Test
  public void testReducer() {
    List<IntWritable> values = new ArrayList<IntWritable>();
    values.add(new IntWritable(1));
    values.add(new IntWritable(1));
    reduceDriver.withInput(new Text("6"), values);
    reduceDriver.withOutput(new Text("6"), new IntWritable(2));
    reduceDriver.runTest();
  }
  
  @Test
  public void testMapReduce() {
    mapReduceDriver.withInput(new LongWritable(), new Text(
              "655209;1;796764372490213;804422938115889;6"));
    List<IntWritable> values = new ArrayList<IntWritable>();
    values.add(new IntWritable(1));
    values.add(new IntWritable(1));
    mapReduceDriver.withOutput(new Text("6"), new IntWritable(2));
    mapReduceDriver.runTest();
  }
}

Run the test class as JUnit class and it will pass or fail the test depending upon if the mapper is correctly written or not.

...

The revised Mapper with Counter is shown below.

Code Block
java
java

public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  private Text status = new Text();
  private final static IntWritable addOne = new IntWritable(1);

  static enum CDRCounter {
    NonSMSCDR;
  };

  /**
   * Returns the SMS status code and its count
   */
  protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {

    String[] line = value.toString().split(";");
    // If record is of SMS CDR
    if (Integer.parseInt(line[1]) == 1) {
      status.set(line[4]);
      context.write(status, addOne);
    } else {// CDR record is not of type SMS so increment the counter
      context.getCounter(CDRCounter.NonSMSCDR).increment(1);
    }
  }
}

The revised testMapper() method:

Code Block
java
java

public void testMapper() {
    mapDriver.withInput(new LongWritable(), new Text(
        "655209;0;796764372490213;804422938115889;6"));
    //mapDriver.withOutput(new Text("6"), new IntWritable(1));
    mapDriver.runTest();
      assertEquals("Expected 1 counter increment", 1, mapDriver.getCounters()
              .findCounter(CDRCounter.NonSMSCDR).getValue());
  }

...

Declare new Configruation object for your test class

Code Block

Configuration conf = new Configuration();

In setUp() method add following

Code Block

mapDriver.setConfiguration(conf);
conf.set("myParameter1", "20");
conf.set("myParameter2", "23");

...