Tuesday, 25 February 2014

Implementing Custom WritableComparable

In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.

WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
{
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.

The code for IntPair class which is used in In-mapper Combiner Program to Calculate Average post is given below:

package com.hadoop.imcdp;
import java.io.*;
import java.util.Set;
import java.lang.Integer;
import org.apache.hadoop.io.*;
public class IntPair implements WritableComparable<IntPair>{
private IntWritable first;
private IntWritable second;
public IntPair() {
set(new IntWritable(), new IntWritable());
}
public IntPair(Integer first, Integer second) {
set(new IntWritable(first), new IntWritable(second));
}
public void set(IntWritable first, IntWritable second) {
this.first = first;
this.second = second;
}
public IntWritable getFirst() {
return first;
}
public Integer getFirstInt() {
return new Integer(first.toString());
}
public Integer getSecondInt() {
return new Integer(second.toString());
}
public IntWritable getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof IntPair) {
IntPair tp = (IntPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public int compareTo(IntPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
view raw IntPair.java hosted with ❤ by GitHub

As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.

1 comment:

  1. How to add a RowComparator Or Writable Comparator to this Example ?
    Could you please illustrate implementation of WritableComparator implementation on a bit complex custom data type, let say, the data type has 4 fields, Intwritable, Text, MapWritable, Text ?

    ReplyDelete