Thursday 31 October 2013

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

MongoDB supports running Hadoop MapReduce programs.
In this post we shall use Enron dataset (more information about the dataset can be found here ) to build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
Download the enron dataset from above link and load it to MongoDB using mongorestore to enron.messages collection.

For this MapReduce program we shall use a custom key which is defined below.

MailPair WritableComparable Class

MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it can be serialized across Hadoop nodes and sorted.

package com.example

public class MailPair implements WritableComparable{

    String from;
    String to;

    public MailPair(){
    }

    public MailPair(String from, String to){
        this.from = from;
        this.to = to;
    }

    public void readFields(DataInput in) throws IOException{
        this.from = in.readUTF();
        this.to = in.readUTF();
    }

    public void write(DataOutput out) throws IOException{
        out.writeUTF(this.from);
        out.writeUTF(this.to);
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof MailPair) {
           MailPair mp = (MailPair) o;
           return from == mp.from && to == mp.to;
        }
        return false;
    }

    @Override
    public int compareTo(Object o) {
        if(!(o instanceof MailPair)){
            return -1;
        }
        MailPair mp = (MailPair)o;
        int first = from.compareTo(mp.from);
        if(first != 0) return first;
        int second = to.compareTo(mp.to); 
        if(second != 0) return second;
        return 0;
    }

}

Mapper Class

The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key.

package com.example

public class EnronMailMapper
 extends Mapper{

    private static final Log LOG = LogFactory.getLog( EnronMailMapper.class );


 @Override
 public void map(Object key, BSONObject val, final Context context)
        throws IOException, InterruptedException{
  if(val.containsKey("headers")){
   BSONObject headers = (BSONObject)val.get("headers");
   if(headers.containsKey("From") && headers.containsKey("To")){
    String from = (String)headers.get("From");
    String to = (String)headers.get("To");
                String[] recips = to.split(",");
                for(int i=0;i 0){
                        context.write(new MailPair(from, recip), new IntWritable(1));
                    }
                }
   }
  }
 }

}

Reducer Class

The reduce class will take the collected values for each key, sum them together, and record the output.

package com.example

public class EnronMailReducer
 extends Reducer  {

    private static final Log LOG = LogFactory.getLog( EnronMailReducer.class );

    @Override
    public void reduce( final MailPair pKey,
                        final Iterable pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        int sum = 0;
        for ( final IntWritable value : pValues ){
            sum += value.get();
        }
        BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
        BSONWritable pkeyOut = new BSONWritable(outDoc);

        pContext.write( pkeyOut, new IntWritable(sum) );
    }
}

Driver Class


package com.example

public class EnronMail extends MongoTool {

    public int run( String[] args ) throws Exception{
        final Configuration conf = getConf();

        final Job job = new Job( conf, "enron-messgaes" );
        job.setJarByClass(EnronMail.class);
        job.setMapperClass(EnronMailMapper.class);
        job.setReducerClass(EnronMailReducer.class);

        job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);
        job.setMapOutputKeyClass(MailPair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(com.mongodb.hadoop.MongoOutputFormat.class);
        job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
        job.setOutputValueClass(org.apache.hadoop.io.IntWritable.class);

        return job.waitForCompletion( true ) ? 0 : 1;
    }
 
    public static void main( final String[] pArgs ) throws Exception{
        System.exit( ToolRunner.run( new EnronMail(), pArgs ) );
    }
}
Compile the jar and save it by the name enron.jar To run this MapReduce use below command:

hadoop jar enron.jar com.example.EnronMail -Dmongo.input.uri=mongodb://localhost:27017/enron.messages -Dmongo.output.uri=mongodb://localhost:27017/enron.messages_out
After running the MapReduce program we can query enron.messages_out collection to see if it is populated properly.

use enron
db.messages_out.findOne()
{
 "_id" : {
  "f" : "-nikole@excite.com",
  "t" : "bill.williams@enron.com"
 },
 "value" : 8
}

1 comment: