Thursday, 26 December 2013

In-mapper Combiner Program to Calculate Average

In my previous post I described how we can use in-mapper combiner to make our M/R program more efficient. In the post, we also saw both M/R algorithm for average calculation with/without using in-mapper combiner optimization.

In this post I am posting codes for both the algorithm.

The M/R program to calculate average without in-mapper combiner is given below:
package com.hadoop.imcdp;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvgDriver extends Configured implements Tool{
public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
String record;
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
record = value.toString();
String[] fields = record.split(",");
Integer s_id = new Integer(fields[0]);
Integer marks = new Integer(fields[2]);
context.write(new IntWritable(s_id), new IntWritable(marks));
} // end of map method
} // end of mapper class
public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {
protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
Integer s_id = key.get();
Integer sum = 0;
Integer cnt = 0;
for (IntWritable value:values) {
sum = sum + value.get();
cnt = cnt + 1;
Float avg_m = (float) sum/cnt;
context.write(new IntWritable(s_id), new FloatWritable(avg_m));
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];
Job job = new Job(conf, "Avg");
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
return (job.waitForCompletion(true) ? 0 : 1);
public static void main(String[] args) throws Exception {
int exitCode = AvgDriver(), args);
The M/R program to calculate average with in-mapper combiner is given below:

package com.hadoop.imcdp;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ImcdpAvgDriver extends Configured implements Tool{
public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntPair> {
String record;
Map partial_sum = new HashMap<Integer, Integer>();
Map record_count = new HashMap<Integer, Integer>();
protected void map(LongWritable key, Text value, Mapper.Context context) {
record = value.toString();
String[] fields = record.split(",");
Integer s_id = new Integer(fields[0]);
Integer marks = new Integer(fields[2]);
if (partial_sum.containsKey(s_id)) {
Integer sum = (Integer) partial_sum.get(s_id) + marks;
partial_sum.put(s_id, sum);
} else {
partial_sum.put(s_id, marks);
if (record_count.containsKey(s_id)) {
Integer count = (Integer) record_count.get(s_id) + 1;
record_count.put(s_id, count);
} else {
record_count.put(s_id, 1);
} // end of map method
protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator<Map.Entry<Integer, Integer>> itr1 = partial_sum.entrySet().iterator();
while (itr1.hasNext()) {
Entry<Integer, Integer> entry1 =;
Set record_count_set = record_count.entrySet();
Integer s_id_1 = entry1.getKey();
Integer partial_sum_1 = entry1.getValue();
Integer record_count_1 = (Integer) record_count.get(s_id_1);
context.write(new IntWritable(s_id_1), new IntPair(partial_sum_1, record_count_1));
} // end of cleanup
} // end of mapper class
public static class ImcdpReduce extends Reducer<IntWritable, IntPair, IntWritable, FloatWritable> {
protected void reduce(IntWritable key, Iterable<IntPair> values, Reducer<IntWritable, IntPair, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
Integer s_id = key.get();
Integer sum = 0;
Integer cnt = 0;
for (IntPair value:values) {
sum = sum + value.getFirstInt();
cnt = cnt + value.getSecondInt();
Float avg_m = (float) sum/cnt;
context.write(new IntWritable(s_id), new FloatWritable(avg_m));
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];
Job job = new Job(conf, "IMCDP");
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
return (job.waitForCompletion(true) ? 0 : 1);
public static void main(String[] args) throws Exception {
int exitCode = ImcdpAvgDriver(), args);

The programs took 56sec and 42sec respectively for execution on my laptop for 10million records. So we can see a 33% improvement in time while using in-mapper combiner program.

In-Mapper Combiner

Recently I read a book on Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.

Advantages of in-mapper combiner over traditional combiner:

When a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are:
  1. The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
  2. Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
  3. A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.

Consider the case of calculating average marks for student. Let us consider we have below dataset

s_id    c_id    marks
8001    101    78
8001    102    88
8002    101    56
8002    102    77

The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
    method Map(integer s_id, integer m)
        Emit(integer s_id, integer m)

class Reducer
    method Reduce(integer s_id, integer [m1 , m2 , . . .])
        sum ← 0
        cnt ← 0
        for all integer m ∈ integer [m1 , m2 , . . .] do
            sum ← sum + m
            cnt ← cnt + 1
        avg_m ← sum/cnt
        Emit(integer s_id, float avg_m )

If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to  reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.

We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:

class Mapper
    method Initialize
        S ← new AssociativeArray
        C ← new AssociativeArray
    method Map(integer s_id, integer m)
        S{s_id} ← S{s_id} + m
        C{s_id} ← C{s_id} + 1
    method Close
        for all integer s_id ∈ S do
            Emit(integer s_id, pair (S{s_id}, C{s_id}))

class Reducer
    method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
        sum ← 0
        cnt ← 0
    for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
            sum ← sum + s
            cnt ← cnt + c
    avg_m ← sum/cnt
    Emit(integer s_id, float avg_m )

Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.

In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.

Friday, 20 December 2013

Hive Data Types 

Hive supports different data types to be used in table columns. The data types supported by Hive can be broadly classified in Primitive and Complex data types.

The primitive data types supported by Hive are listed below:

1. Numeric Types

  • TINYINT (1-byte signed integer, from -128 to 127)
  • SMALLINT (2-byte signed integer, from -32,768 to 32,767)
  • INT (4-byte signed integer, from -2,147,483,648 to 2,147,483,647)
  • BIGINT (8-byte signed integer, from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807)
  • FLOAT (4-byte single precision floating point number)
  • DOUBLE (8-byte double precision floating point number)
  • DECIMAL (Hive 0.13.0 introduced user definable precision and scale)

2. Date/Time Types

  • DATE

3. String Types

  • CHAR

4. Misc Types

Apart from these primitive data types Hive offers some complex data types which are listed below:

5. Complex Types

  • arrays: ARRAY<data_type>
  • maps: MAP<primitive_type, data_type>
  • structs: STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • union: UNIONTYPE<data_type, data_type, ...>

Wednesday, 18 December 2013

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

In this blog I am going to show case how we can develop a quick and easy demo application for clickstream weblog ingestion, search and visualization. We will achieve this using Logstash for log ingestion, store it in ElasticSearch and make a pretty dashboard using Kibana. For clickstream weblog I am using logs data from ECML/PKDD 2005 Discovery Challenge . You can download complete weblogs after registering there. These weblog are delimited by semi-colon (;) and have below mentioned fields in order:
  • shop_id
  • unixtime
  • client ip
  • session
  • visted page
  • referrer
Here are some sample log lines:
For creating this demo we need to create a logstash configuration file (lets name this file clickstream.conf) which consists of specifying inputs, filters and outputs. The clickstream.conf file looks like:
input { 
  file {
# path for clickstream log
    path => "/home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log"
# define a type for all events handeled by this input
    type => "weblog"
    start_position => "beginning"
# the clickstream log is in character set ISO-8859-1
    codec => plain {charset => "ISO-8859-1"}

filter {
  csv {
# define columns present in weblog
    columns => [shop_id, unixtime, client_ip, session, page, referrer]
    separator => ";"
  grok {
# get visited page and page parameters
    match => ["page", "%{URIPATH:page_visited}(?:%{URIPARAM:page_params})?"]
     remove_field => ["page"]
  date {
# as we are getting unixtime field in epoch seconds we will convert it to normal timestamp
    match => [ "unixtime", "UNIX" ]
  geoip {
# this will convert ip to longitude-latitude using GeoLiteCity database from Maxmind
    source => "client_ip"
    fields => ["latitude","longitude"]
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  mutate {
# this will convert geoip.coordinates to float values
    convert => [ "[geoip][coordinates]", "float" ]

output { 
# store output in local elasticsearch cluster
  elasticsearch {
    host => ""
To start logstash agent we run below command:
java -jar logstash-1.2.2-flatjar.jar agent -f clickstream.conf
A sample record in ElasticSearch looks like this:

    _index: logstash-2004.02.01
    _type: logs
    _id: I1N0MboUR0O1O3RZ-qXqnw
    _version: 1
    _score: 1
    _source: {
        message: [
        @timestamp: 2004-02-01T18:00:07.000Z
        @version: 1
        type: weblog
        path: /home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log
        shop_id: 14
        unixtime: 1075658407
        session: f07f39ec63abf67f965684f3fa5729c4
        page_visited: /findp/
        page_params: ?&id=63&view=1,2,3,14,20,15&p_14=nerez
        geoip: {
            latitude: 50.08330000000001
            longitude: 14.466700000000003
            coordinates: [

So we have parsed complex log message into simpler components and converted fields like unixtime to datetime, IP to latitude-longitude and got page visited by the client. Now using Kibana we can quickly make dashboard with these panels

Thursday, 31 October 2013

Hadoop MapReduce On MongoDB

MongoDB supports running Hadoop MapReduce programs.
In this post we shall use Enron dataset (more information about the dataset can be found here ) to build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
Download the enron dataset from above link and load it to MongoDB using mongorestore to enron.messages collection.

For this MapReduce program we shall use a custom key which is defined below.

MailPair WritableComparable Class

MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it can be serialized across Hadoop nodes and sorted.

package com.example

public class MailPair implements WritableComparable{

    String from;
    String to;

    public MailPair(){

    public MailPair(String from, String to){
        this.from = from; = to;

    public void readFields(DataInput in) throws IOException{
        this.from = in.readUTF(); = in.readUTF();

    public void write(DataOutput out) throws IOException{

    public boolean equals(Object o) {
        if (o instanceof MailPair) {
           MailPair mp = (MailPair) o;
           return from == mp.from && to ==;
        return false;

    public int compareTo(Object o) {
        if(!(o instanceof MailPair)){
            return -1;
        MailPair mp = (MailPair)o;
        int first = from.compareTo(mp.from);
        if(first != 0) return first;
        int second = to.compareTo(; 
        if(second != 0) return second;
        return 0;


Mapper Class

The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key.

package com.example

public class EnronMailMapper
 extends Mapper{

    private static final Log LOG = LogFactory.getLog( EnronMailMapper.class );

 public void map(Object key, BSONObject val, final Context context)
        throws IOException, InterruptedException{
   BSONObject headers = (BSONObject)val.get("headers");
   if(headers.containsKey("From") && headers.containsKey("To")){
    String from = (String)headers.get("From");
    String to = (String)headers.get("To");
                String[] recips = to.split(",");
                for(int i=0;i 0){
                        context.write(new MailPair(from, recip), new IntWritable(1));


Reducer Class

The reduce class will take the collected values for each key, sum them together, and record the output.

package com.example

public class EnronMailReducer
 extends Reducer  {

    private static final Log LOG = LogFactory.getLog( EnronMailReducer.class );

    public void reduce( final MailPair pKey,
                        final Iterable pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        int sum = 0;
        for ( final IntWritable value : pValues ){
            sum += value.get();
        BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , ).get();
        BSONWritable pkeyOut = new BSONWritable(outDoc);

        pContext.write( pkeyOut, new IntWritable(sum) );

Driver Class

package com.example

public class EnronMail extends MongoTool {

    public int run( String[] args ) throws Exception{
        final Configuration conf = getConf();

        final Job job = new Job( conf, "enron-messgaes" );



        return job.waitForCompletion( true ) ? 0 : 1;
    public static void main( final String[] pArgs ) throws Exception{
        System.exit( new EnronMail(), pArgs ) );
Compile the jar and save it by the name enron.jar To run this MapReduce use below command:

hadoop jar enron.jar com.example.EnronMail -Dmongo.input.uri=mongodb://localhost:27017/enron.messages -Dmongo.output.uri=mongodb://localhost:27017/enron.messages_out
After running the MapReduce program we can query enron.messages_out collection to see if it is populated properly.

use enron
 "_id" : {
  "f" : "",
  "t" : ""
 "value" : 8

Implementing Custom Writable

Hadoop MapReduce uses implementations of Writables for interacting with user-provided Mappers and Reducers. Hadoop provides a lot of implementations of Writables which are listed here, but sometimes we need to pass custom objects and these custom objects should implement Hadoop's Writable interface. In this post we are going to describe a custom class IntPair. To implement the Writable interface we require two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
The code for IntPair is given below:

public class IntPair implements Writable{
 private IntWritable first;
 private IntWritable second;
 public IntPair() {
  set(new IntWritable(), new IntWritable());
 public IntPair(Integer first, Integer second) {
  set(new IntWritable(first), new IntWritable(second));
 public void set(IntWritable first, IntWritable second) {
  this.first = first;
  this.second = second;
 public IntWritable getFirst() {
  return first;
 public Integer getFirstInt() {
  return new Integer(first.toString());
 public Integer getSecondInt() {
  return new Integer(second.toString());
 public IntWritable getSecond() {
  return second;
 public void write(DataOutput out) throws IOException {
 public void readFields(DataInput in) throws IOException {
Now we can use this IntPair class in Hadoop MapReduce as value type. If we want to use IntPair as key in MapReduce then it needs to implement WritableComparable, which we shall cover in a different post.

Wednesday, 16 October 2013

MapReduce on HBase Table

In my last post HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.

For our input we shall create a HBase table with below data:

create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'

create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.

The mapper class is:

public class testMapper extends TableMapper {
 public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
   throws IOException, InterruptedException {

  try {
   // get rowKey and convert it to string
   String inKey = new String(rowKey.get());
   // set new key having only date
   String oKey = inKey.split("#")[0];
   // get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
   byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
   String sSales = new String(bSales);
   Integer sales = new Integer(sSales);
   // emit date and sales values
   context.write(new Text(oKey), new IntWritable(sales));
  } catch (RuntimeException e){
The reducer class is:

public class testReducer extends TableReducer{
 public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
  try {
   int sum = 0;
   // loop through different sales vales and add it to sum
   for (IntWritable sales : values) {
    Integer intSales = new Integer(sales.toString());
    sum += intSales;
   // create hbase put with rowkey as date
   Put insHBase = new Put(key.getBytes());
   // insert sum value to hbase 
   insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
   // write data to Hbase table
   context.write(null, insHBase);

  } catch (Exception e) {
The driver class is:

public class testDriver {
  public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 // define scan and define column families to scan
 Scan scan = new Scan();

 Job job = new Job(conf); 
 // define input hbase table
 // define output table

We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.

hbase(main):013:0> scan 'test2'
ROW                                COLUMN+CELL                                                                                        
 20130101                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2                                    
 20130102                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A                                    
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.

hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.

HBase Table MapReduce Basics

In this post I am going to explain how we can use HBase tables as source and target for MapReduce program.

For writing MapReduce on HBase tables we should follow below guidelines:

1. Mapper Class

  • Mapper class should extend TableMapper.
  • Input key to mapper is ImmutableBytesWritable object which has rowkey of HBase table.
  • Input value is Result object (org.apache.hadoop.hbase.client.Result) which contains the requested column families (define the required columns/column families in Scan) from HBase table.

2. Reducer Class

  • Reducer class should extend TableReducer.
  • Output key is NULL.
  • Output value is Put (org.apache.hadoop.hbase.client.Put) object.

3. MapReduce Driver

  • Configure a Scan (org.apache.hadoop.hbase.client.Scan) object. For this scan object we can define many parameters like:
    • Start row.
    • Stop row.
    • Row filter.
    • Column Familiy(s) to retrieve.
    • Column(s) to retrieve.
  • Define input table using TableMapReduceUtil.initTableMapperJob. In this method we can define input table, Mapper, MapOutputKey, MapOutputValue, etc.
  • Define output table using TableMapReduceUtil.initTableReducerJob. In this method we can define output table, Reducer and Partitioner.
In my next post I shall give an example MapReduce program using HBase tables as input and output.

Monday, 14 October 2013

Introduction To Hive's Partitioning

A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can over come this issue by implementing partitions in Hive. Hive makes it very easy to implement partition by using automatic partition scheme when the table is created. In Hive’s implementation of partitioning, data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query. Today we are going to see how we can load a csv file to a partitioned table. For this we are going to use Airline OnTime dataset. Loading csv data to a partitioned table involves below mentioned two steps:
  1. Load csv file to a non-partitioned table.
  2. Load non-partitioned table data to partitioned table.
We shall partition Airline OnTime data based on two columns - year and month. 1. Load csv file to a non-partitioned table. We shall create a staging table to hold data from csv file. The hive commands to create schema and table are given below:
create schema stg_airline;
use stg_airline;

create table stg_airline.onTimePerf
(Year INT ,
Month INT ,
DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Distance INT ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
After creating the table load the csv data (note - delete header from csv) to table using below hive command:
LOAD DATA LOCAL INPATH "1987.csv" OVERWRITE INTO TABLE stg_airline.onTimePerf;
2. Load non-partitioned table data to partitioned table. We shall now create a table partitioned by year and month columns, the commands for this are given below:
create schema airline;
use airline;

create table airline.onTimePerf
(DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Distance INT ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
To load partitioned table we use below command:
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE airline.onTimePerf PARTITION(Year, Month) SELECT DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay, Year, Month FROM stg_airline.onTimePerf;
While writing insert statement for a partitioned table make sure that you specify the partition columns at the last in select clause. The 2 SET commands instruct hive to change our query to dynamically load partitions. If you don't execute the above 2 SET commands you will get below error:
FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
In my next blog I shall describe about using HCatalog in MapReduce program.

Sunday, 13 October 2013

Connecting Hive to MongoDB Using MongoStorageHandler

Mongo-hadoop connector supports the creation of MongoDB-based Hive tables and BSON-based Hive tables. Both MongoDB-based Hive tables and BSON-based Hive tables can be:
  • Queried just like HDFS-based Hive tables.
  • Combined with HDFS-based Hive tables in joins and sub-queries.
Note - The current mongo-hadoop jars present in mongo-hadoop github doesn't contain MongoStorageHandler class so you need to build the jars yourself. In this post we shall use MongoStorageHandler to create a MongoDB-based hive table. Lets create a test.users collection in MongoDB with following data:
use test
db.users.insert({ "_id": 1, "name": "Tom", "age": 28 })
db.users.insert({ "_id": 2, "name": "Alice", "age": 18 })
db.users.insert({ "_id": 3, "name": "Bob", "age": 29 })
Now we shall create test.user table in Hive which is just a metadata created in Hive and the actual data will be stored in MongoDB. The commands for this are given below:
create schema test;
use test;

CREATE TABLE users (id int, name string, age int)
STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id","name":"name","age":"age"}')
TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");
Note - Specifying BSONSerDe Mappings is optional. Without it, all the field names in the Hive table are expected to be the same as in the underlying MongoDB collection. If the exact field names in Hive cannot be found in a MongoDB document, their values in the Hive table will be null. Now we can query the MongoDB collection in Hive also using HQL. We shall now try INSERT INTO TABLE command in HQL to see if we can load MongoDB collection from Hive. Lets create a temporary table in Hive, say users2 with same schema as users table and insert some data to it using a flat file.
cat users.txt

CREATE TABLE users2 (id int , name string , age int)

Now we shall use INSERT INTO TABLE command to insert data from Hive table users2 to users table.
INSERT INTO TABLE test.mongo_users SELECT id,name,age from test.users2;
There are some limitations of using MongoStorageHandler which are given below:
  • Using a StorageHandler will mark the table as non-native. Data cannot be loaded into the table directory from a "LOAD DATA" command. Instead, the data can only come from the result of a query.
  • Files are only stored as a single output file, not one per reduce call. This will cause much lock leasing problems.
  • INSERT INTO a StorageHandler table behaves as INSERT OVERWRITE.
In some other post I shall write about using BSON files as storage format for Hive tables.

Saturday, 12 October 2013

MapReduce On Hive Tables Using HCatalog

In my last post Introduction To Hive's Partitioning I described how we can load csv data to a partitioned hive table. Today we shall see how we can use HCatalog to run MapReduce on Hive table and store the output in another Hive table.

HCatalog makes Hive metadata available to users of other Hadoop tools like Pig, MapReduce and Hive. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse.
HCatalog’s table abstraction presents users with a relational view of data in the (HDFS) and ensures that users need not worry about where or in what format their data is stored, so users don't need to know if data is stored in RCFile format, text files, or sequence files.
It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog provides HCatInputFormat/HCatOutputFormat to enable MapReduce users to read/write data in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

We shall see how we can use HCatalog to do a count of records for each year-month combination.
In mapper class we get table schema and use this schema information to get the required columns and their values.
Note - In context.write I am using a custom writable IntPair which I will describe in a separate post.

public class onTimeMapper extends Mapper {
    protected void map(WritableComparable key, HCatRecord value,
     org.apache.hadoop.mapreduce.Mapper.Context context)
     throws IOException, InterruptedException {

     // Get table schema
     HCatSchema schema = HCatBaseInputFormat.getTableSchema(context);

     Integer year = new Integer(value.getString("year", schema));
     Integer month = new Integer(value.getString("month", schema));
     Integer DayofMonth = value.getInteger("dayofmonth", schema);

     context.write(new IntPair(year, month), new IntWritable(DayofMonth));
In reducer class we create a record schema for holding the columns and their values, which is written to target Hive table.

public class onTimeReducer extends Reducer {
 public void reduce (IntPair key, Iterable value, Context context) 
  throws IOException, InterruptedException{
  int count = 0; // records counter for particular year-month
  for (IntWritable s:value) {
  // define output record schema
  List columns = new ArrayList(3);
  columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,""));
  HCatSchema schema = new HCatSchema(columns);
  HCatRecord record = new DefaultHCatRecord(3);
  record.setInteger("year", schema, key.getFirstInt()); 
  record.set("month", schema, key.getSecondInt());
  record.set("flightCount", schema, count);
  context.write(null, record);
Finally we write driver class with input/output schema and table details:

public class onTimeDriver extends Configured implements Tool{
    private static final Log log = LogFactory.getLog( onTimeDriver.class );

    public int run( String[] args ) throws Exception{
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job);
     HCatOutputFormat.setSchema(job, s);
     return (job.waitForCompletion(true)? 0:1);
    public static void main(String[] args) throws Exception{
  int exitCode = onTimeDriver(), args);
Before running this code we need to create the output table in Hive using below commands:
create table airline.flight_count
(Year INT ,
Month INT ,
flightCount INT)
After compiling and making jar file we can run this code using hadoop jar command without any argument. You can check output using HQL by querying airline.flight_count table.

Note - I faced some hive errors while running the jar from any folder except $HIVE_HOME with apache setup, so if you face any issue it better to give a try by placing and running the jar from $HIVE_HOME folder.
Just for verifying that our MapReduce program is independent of output table schema we shall alter airline.flight_count to have a dummy column:
ALTER TABLE airline.flight_count1 ADD COLUMNS (dummy STRING);
After truncating this table we will start up our MapReduce program again and we can see that our program runs in same manner.

Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  ResultScanner scanner1 = htable.getScanner(scan1); 
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      public void beforeBulk(long executionId, BulkRequest request) {
"Going to execute new bulk composed of {} actions", request.numberOfActions());

      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
"Executed bulk composed of {} actions", request.numberOfActions());

      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
  // shutdown Elasticsearch node

Wednesday, 3 July 2013

Starting with ElasticSearch

ElasticSearch is a distributed, RESTful search server based on Apache Lucene. The important features of it are listed below:
  • Real time data
  • RESTful API
  • Distributed
  • Multi-tenancy
  • Document oriented
  • Schema free
  • Per-operation persistence
  • Built on top of Apache Lucene
  • High availability
  • Full text search
  • Conflict management
More detailed information about ElasticSearch can be found at A small example program to add, retrieve and search for documents in ES is given below:


import static org.elasticsearch.node.NodeBuilder.*;


import org.elasticsearch.node.Node;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.get.GetResponse;
import  org.elasticsearch.action.index.IndexResponse;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

public class esConnect {

 // on startup

 public static void main (String args[]) throws IOException{

  // create a client for ElasticSearch running on localhost listening to port 9300
  Node node = nodeBuilder().node();
  Client client = new TransportClient().addTransportAddress(
      new InetSocketTransportAddress("localhost",9300));
  // add some record
  IndexResponse iresponse = client.prepareIndex("testindex", "tweets", "1")
                     .field("user", "rishav")
                     .field("postDate", "2013-01-30")
                     .field("message", "trying out Elastic Search")
  // retrieve record based on id
  GetResponse gresponse = client.prepareGet("testindex", "tweets", "1")
  // search for a record based on below filters
  SearchResponse sresponse = client.prepareSearch("testindex").
         setQuery(QueryBuilders.termQuery("user", "rishav")).  // query
         setFilter(FilterBuilders.termFilter("message", "out")).
  // shutdown client

Monday, 1 July 2013

XML parsing with Mapreduce

Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:


To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static;
import static;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setWriteBufferSize(1024 * 1024 * 12);
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
      new ByteArrayInputStream(currLine.getBytes()));
    String currentElement = "";
    int col = 0;
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code =;
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
    // writing values to mapper output file
    // can remove this context.write
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
  } catch (Exception e) {
  public void cleanup(Context context) 
   throws IOException, InterruptedException {

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  Job job = new Job(conf);
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  outPath.getFileSystem(conf).delete(outPath, true);

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.

Friday, 15 March 2013

MapReduce Fundamentals

MapReduce is a programming model for efficient distributed computing.
The core concept of MapReduce in Hadoop is that input may be split into logical chunks, and each chunk may be initially processed independently, by a map task. The results of these individual processing chunks can be physically partitioned into distinct sets, which are then sorted. Each sorted chunk is passed to a reduce task.
Every MapReduce program must specify a Mapper and typically a Reducer.

Input and Output Formats

A MapReduce may specify how it’s input is to be read by specifying an InputFormat to be used
– InputSplit
– RecordReader
A MapReduce may specify how it’s output is to be written by specifying an OutputFormat to be used.
These default to TextInputFormat and TextOutputFormat, which process line-based text data.
SequenceFile: SequenceFileInputFormat and SequenceFileOutputFormat.
These are file-based, but they are not required to be.

How many Maps and Reduces

– Usually as many as the number of HDFS blocks being processed, this is the default.
– Else the number of maps can be specified as a hint.
– The number of maps can also be controlled by specifying the minimum split size.
– The actual sizes of the map inputs are computed by:
• max(min(block_size, data/#maps), min_split_size)
– Unless the amount of data being processed is small
• 0.95*num_nodes*mapred.tasktracker.reduce.tasks.maximum

vi/vim Cheatsheet

There are many ways to edit files in Unix and for me one of the best ways is using screen-oriented text editor vi. There is an improved version of vi also called vim (vi improved).

Starting the vi editor:

vi filenameCreates a new file if it already does not exist, otherwise opens existing file.
vi -R filenameOpens an existing file in read only mode.

Operation Modes:
vi has two modes of operation
  • Command mode: This mode enables you to perform administrative tasks such as saving files, executing commands, moving the cursor, cutting (yanking) and pasting lines or words, and finding and replacing. In this mode, whatever you type is interpreted as a command.
  • Insert mode: This mode enables you to insert text into the file. Everything that's typed in this mode is interpreted as input and finally it is put in the file .
The vi always starts in command mode. To enter text, you must be in insert mode. To come in insert mode you simply type i. To get out of insert mode, press the Esc key, which will put you back into command mode.
Hint: If you are not sure which mode you are in, press the Esc key twice, and then you'll be in command mode. You open a file using vi editor and start type some characters and then come in command mode to understand the difference.

vi cheatsheet:

Wednesday, 13 March 2013

MapReduce in AWK

In my last post Lets MapReduce with Pentaho I had written a MapReduce program in Pentaho Data Integration. Now I am writing some MapReduce code in AWK to:
  • Calculate number/percentage of flights delayed by over 15 minutes aggregated at day level (on airline dataset).
I will be using hadoop streaming package to run my AWK MapReduce task.

Any MapReduce code should follow the common basics listed below:
  • The Mapper has a map method that transforms input (key, value) pairs into any number of intermediate (key’, value’) pairs.
  • The Reducer has a reduce method that transforms intermediate (key’, value’*) aggregates into any number of output (key’’, value’’) pairs.

Number/percentage of flights delayed by over 15 minutes aggregated at day level

I write a Map code to emit date in yyyyMMdd format, constant value 1 for each flight and a boolean value for flights with DepDelay (column#16) > 15.
#!/usr/bin/awk -f
        if ($16 > 15)
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,1);}
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,0);}
The intermediate output of Map code will look like this:
Note that the output of mapper is sorted.
The reduce code will keep a count of flights per day and delayed flights.
To aggregate data (sum) by day level I will use array in awk.
#!/bin/awk -f

BEGIN {FS=",";
{       A[$1]+=$2;      # Add $2 to the array A having index of $1
        B[$1]+=$3;      # Add $3 to the array B having index of $1
        for(i in A)    # Get all the indexes of array A in i
        {printf "%s,%d,%d,%5.2f\n", i,A[i],B[i],B[i]*100/A[i]}
Run this MapReduce code with hadoop streaming:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.0.jar \
  -input /airline/nh_1987.csv \
  -output /airline/output \
  -mapper path_to/airline2_m.awk \
  -reducer path_to/airline2_r.awk \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -outputformat org.apache.hadoop.mapred.TextOutputFormat \
  -file path_to/airline2_m.awk \
  -file path_to/airline2_r.awk
We can check the output of above code by using cat command or getting the file from hdfs to local filesystem:
hadoop fs -cat /airline/output/part-00000
hadoop fs -get /airline/output/part-00000

Tuesday, 26 February 2013

Lets MapReduce with Pentaho

Lets MapReduce with Pentaho Data Integrator

I was exploring Pentaho Data Integrator for quite some time, and always wanted to see how to work with BigData using Pentaho. Today I got a chance to do some simple MapReduce with Pentaho on the Airline dataset which I had dumped into my Cloudera Hadoop cluster (Sample Datasets).
Before doing MR in Pentaho, we need it to configure it to work with Cloudera (by default it is configured to work with Apache). It can be easily done in 5 minutes by following this link:

Configure Pentaho for Cloudera CDH4

After doing all the listed steps make sure you are able to connect to HDFS through Pentaho. For this:

1. Create a new transformation.

2. Add a Hadoop File Input step.

3. Browse HDFS, give address of your namenode, port as 8020 , user id as hdfs and by default there is no password.

4. If you are able to browse HDFS then you have successfully configured Pentaho.

5. You can discard this transformation. Was just checking the connectivity J.

To do a simple MapReduce in Pentaho we need to create:

1. Mapper transformation

2. Combiner transformation (optional)

3. Reducer transformation

4. MapReduce job

I am going to create a simple MR task which counts the number to flights for each month for our entire airline dataset.

First create a Mapper transformation:

1. Create a new transformation in Pentaho, let us call it “airline_mapper”.

2. Add a MapReduce Input step and configure it as shown:

3. Add a Split Fields Step to split csv file to different fields and connect it with previous step, configure current step as shown:

4. Add a String Operations step to pad month field with leading 0 (to make all month fields as 2 digit integers), connect it with previous step. It should be configure as shown:


 5. Add a User Defined Java Expression step to concatenate year with month, connect it with previous step. It should be configure as shown:

 6. Add a MapReduce Output step and connect it with previous step. It should be configure as shown:


I have used Carrier field to count number of flights for each month, we can use any other field also for doing same.

7. Now save this transformation. It should like this:


Now create a Reducer transformation:

1. Create a new transformation in Pentaho, let us call it “airline_reducer”.

2. Add a MapReduce Input step and configure it as shown:

 3. Add a Group By step and connect it with previous step. It should be configure as shown:

 4. Add a MapReduce Output step, connect it with previous step. It should be configure as shown:

 5. Now save this transformation. It should like this:

 Our mapper and reducer transformation are complete, now let us combine them in a MR job.

Create a MapReduce job:

1. Create a new job in Pentaho, let us call it “airline_job”.

2. Add a Start step and a Pentaho MapReduce step and connect them. It should look like:

 3. Configure Pentaho MapReduce as shown in below screenshots:

 You can give any name in Hadoop Job Name, let us call our job as airline_agg.
Mapper transformation should point to airline_mapper transformation.
Mapper Input/Output Step Name should match the step names with our mapper transformation.

4. We don’t have combiner step, so leave next tab empty.

5. Configure Reducer tab like this:

 Again Reducer Input/Output Step Name should match the step names with our reducer transformation.

6. Configure Job Setup step as shown:

 7. Configure Cluster tab like this:

 8. Save and run this job.

While the job is running we can see its progress in Pentaho and also in JobTracker UI (http://jobtracker_ip:50030/).

 You can also see the details of a running job by clicking on it:

 On my Cloudera setup it took around 9 minutes to execute this job.

I shall try to do some complex analysis on this dataset using Pentaho and post it very soon.