Thursday, 26 December 2013

In-mapper Combiner Program to Calculate Average

In-mapper Combiner Program to Calculate Average

In my previous post I described how we can use in-mapper combiner to make our M/R program more efficient. In the post, we also saw both M/R algorithm for average calculation with/without using in-mapper combiner optimization.

In this post I am posting codes for both the algorithm.

The M/R program to calculate average without in-mapper combiner is given below:

The M/R program to calculate average with in-mapper combiner is given below:


The programs took 56sec and 42sec respectively for execution on my laptop for 10million records. So we can see a 33% improvement in time while using in-mapper combiner program.

In-Mapper Combiner

In-Mapper Combiner

Recently I read a book on Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.

Advantages of in-mapper combiner over traditional combiner:

When a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are:
  1. The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
  2. Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
  3. A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.



Consider the case of calculating average marks for student. Let us consider we have below dataset

s_id    c_id    marks
8001    101    78
8001    102    88
8002    101    56
8002    102    77

The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
    method Map(integer s_id, integer m)
        Emit(integer s_id, integer m)

class Reducer
    method Reduce(integer s_id, integer [m1 , m2 , . . .])
        sum ← 0
        cnt ← 0
        for all integer m ∈ integer [m1 , m2 , . . .] do
            sum ← sum + m
            cnt ← cnt + 1
        avg_m ← sum/cnt
        Emit(integer s_id, float avg_m )

If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to  reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.

We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:

class Mapper
    method Initialize
        S ← new AssociativeArray
        C ← new AssociativeArray
    method Map(integer s_id, integer m)
        S{s_id} ← S{s_id} + m
        C{s_id} ← C{s_id} + 1
    method Close
        for all integer s_id ∈ S do
            Emit(integer s_id, pair (S{s_id}, C{s_id}))

class Reducer
    method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
        sum ← 0
        cnt ← 0
    for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
            sum ← sum + s
            cnt ← cnt + c
    avg_m ← sum/cnt
    Emit(integer s_id, float avg_m )

Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.

In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.

Friday, 20 December 2013

Hive Data Types

Hive Data Types 

Hive supports different data types to be used in table columns. The data types supported by Hive can be broadly classified in Primitive and Complex data types.

The primitive data types supported by Hive are listed below:

1. Numeric Types

  • TINYINT (1-byte signed integer, from -128 to 127)
  • SMALLINT (2-byte signed integer, from -32,768 to 32,767)
  • INT (4-byte signed integer, from -2,147,483,648 to 2,147,483,647)
  • BIGINT (8-byte signed integer, from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807)
  • FLOAT (4-byte single precision floating point number)
  • DOUBLE (8-byte double precision floating point number)
  • DECIMAL (Hive 0.13.0 introduced user definable precision and scale)

2. Date/Time Types

  • TIMESTAMP
  • DATE

3. String Types

  • STRING
  • VARCHAR
  • CHAR

4. Misc Types

  • BOOLEAN
  • BINARY
Apart from these primitive data types Hive offers some complex data types which are listed below:

5. Complex Types

  • arrays: ARRAY<data_type>
  • maps: MAP<primitive_type, data_type>
  • structs: STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • union: UNIONTYPE<data_type, data_type, ...>

Wednesday, 18 December 2013

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

In this blog I am going to show case how we can develop a quick and easy demo application for clickstream weblog ingestion, search and visualization. We will achieve this using Logstash for log ingestion, store it in ElasticSearch and make a pretty dashboard using Kibana. For clickstream weblog I am using logs data from ECML/PKDD 2005 Discovery Challenge . You can download complete weblogs after registering there. These weblog are delimited by semi-colon (;) and have below mentioned fields in order:
  • shop_id
  • unixtime
  • client ip
  • session
  • visted page
  • referrer
Here are some sample log lines:
15;1075658406;212.96.166.162;052ecba084545d8348806f087b6e09bb;/ls/?&id=77&view=2,6,31&pozice=20;http://www.shop5.cz/ls/?id=77
12;1075658406;195.146.109.248;05aa4f4db0162e5723331042eb9ce8a7;/ct/?c=153;http://www.shop3.cz/
12;1075658407;212.65.194.144;86140090a2e102f1644f29e5ddadad9b;/ls/?id=34;http://www.shop3.cz/ct/?c=155
14;1075658407;80.188.85.210;f07f39ec63abf67f965684f3fa5729c4;/findp/?&id=63&view=1,2,3,14,20,15&p_14=nerez;http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0
17;1075658408;194.108.232.234;be0970125c4eb3ee4fc380be05b3c58f;/ls/?id=155&sort=45;http://www.shop7.cz/ls/?id=155&sort=45
12;1075658409;62.24.70.41;851f20e644eb8bf82bfdbe4379050e2e;/txt/?c=734;http://www.shop3.cz/onakupu/
For creating this demo we need to create a logstash configuration file (lets name this file clickstream.conf) which consists of specifying inputs, filters and outputs. The clickstream.conf file looks like:
input { 
  file {
# path for clickstream log
    path => "/home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log"
# define a type for all events handeled by this input
    type => "weblog"
    start_position => "beginning"
# the clickstream log is in character set ISO-8859-1
    codec => plain {charset => "ISO-8859-1"}
   }
}

filter {
  csv {
# define columns present in weblog
    columns => [shop_id, unixtime, client_ip, session, page, referrer]
    separator => ";"
  }
  grok {
# get visited page and page parameters
    match => ["page", "%{URIPATH:page_visited}(?:%{URIPARAM:page_params})?"]
     remove_field => ["page"]
  }
  date {
# as we are getting unixtime field in epoch seconds we will convert it to normal timestamp
    match => [ "unixtime", "UNIX" ]
  }
  geoip {
# this will convert ip to longitude-latitude using GeoLiteCity database from Maxmind
    source => "client_ip"
    fields => ["latitude","longitude"]
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  mutate {
# this will convert geoip.coordinates to float values
    convert => [ "[geoip][coordinates]", "float" ]
  }
}

output { 
# store output in local elasticsearch cluster
  elasticsearch {
    host => "127.0.0.1"
  }
}
To start logstash agent we run below command:
java -jar logstash-1.2.2-flatjar.jar agent -f clickstream.conf
A sample record in ElasticSearch looks like this:
{

    _index: logstash-2004.02.01
    _type: logs
    _id: I1N0MboUR0O1O3RZ-qXqnw
    _version: 1
    _score: 1
    _source: {
        message: [
            14;1075658407;80.188.85.210;f07f39ec63abf67f965684f3fa5729c4;/findp/?&id=63&view=1,2,3,14,20,15&p_14=nerez;http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0 
        ]
        @timestamp: 2004-02-01T18:00:07.000Z
        @version: 1
        type: weblog
        host: HMECL000315.happiestminds.com
        path: /home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log
        shop_id: 14
        unixtime: 1075658407
        client_ip: 80.188.85.210
        session: f07f39ec63abf67f965684f3fa5729c4
        referrer: http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0
        page_visited: /findp/
        page_params: ?&id=63&view=1,2,3,14,20,15&p_14=nerez
        geoip: {
            latitude: 50.08330000000001
            longitude: 14.466700000000003
            coordinates: [
                14.466700000000003
                50.08330000000001
            ]
        }
    }

}
So we have parsed complex log message into simpler components and converted fields like unixtime to datetime, IP to latitude-longitude and got page visited by the client. Now using Kibana we can quickly make dashboard with these panels

Thursday, 31 October 2013

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

Hadoop MapReduce On MongoDB

MongoDB supports running Hadoop MapReduce programs.
In this post we shall use Enron dataset (more information about the dataset can be found here ) to build a list of the unique sender/recipient pairs, counting how many times each pair occurs.
Download the enron dataset from above link and load it to MongoDB using mongorestore to enron.messages collection.

For this MapReduce program we shall use a custom key which is defined below.

MailPair WritableComparable Class

MailPair is just a simple "POJO" that contains Strings for the from and to values, and implements WritableComparable so that it can be serialized across Hadoop nodes and sorted.

package com.example

public class MailPair implements WritableComparable{

    String from;
    String to;

    public MailPair(){
    }

    public MailPair(String from, String to){
        this.from = from;
        this.to = to;
    }

    public void readFields(DataInput in) throws IOException{
        this.from = in.readUTF();
        this.to = in.readUTF();
    }

    public void write(DataOutput out) throws IOException{
        out.writeUTF(this.from);
        out.writeUTF(this.to);
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof MailPair) {
           MailPair mp = (MailPair) o;
           return from == mp.from && to == mp.to;
        }
        return false;
    }

    @Override
    public int compareTo(Object o) {
        if(!(o instanceof MailPair)){
            return -1;
        }
        MailPair mp = (MailPair)o;
        int first = from.compareTo(mp.from);
        if(first != 0) return first;
        int second = to.compareTo(mp.to); 
        if(second != 0) return second;
        return 0;
    }

}

Mapper Class

The mapper class will get the headers field from each document, parse out the sender from the From field and the recipients from the To field, and construct a MailPair object containing each pair which will act as the key. Then we emit the value 1 for each key.

package com.example

public class EnronMailMapper
 extends Mapper{

    private static final Log LOG = LogFactory.getLog( EnronMailMapper.class );


 @Override
 public void map(Object key, BSONObject val, final Context context)
        throws IOException, InterruptedException{
  if(val.containsKey("headers")){
   BSONObject headers = (BSONObject)val.get("headers");
   if(headers.containsKey("From") && headers.containsKey("To")){
    String from = (String)headers.get("From");
    String to = (String)headers.get("To");
                String[] recips = to.split(",");
                for(int i=0;i 0){
                        context.write(new MailPair(from, recip), new IntWritable(1));
                    }
                }
   }
  }
 }

}

Reducer Class

The reduce class will take the collected values for each key, sum them together, and record the output.

package com.example

public class EnronMailReducer
 extends Reducer  {

    private static final Log LOG = LogFactory.getLog( EnronMailReducer.class );

    @Override
    public void reduce( final MailPair pKey,
                        final Iterable pValues,
                        final Context pContext )
            throws IOException, InterruptedException{
        int sum = 0;
        for ( final IntWritable value : pValues ){
            sum += value.get();
        }
        BSONObject outDoc = new BasicDBObjectBuilder().start().add( "f" , pKey.from).add( "t" , pKey.to ).get();
        BSONWritable pkeyOut = new BSONWritable(outDoc);

        pContext.write( pkeyOut, new IntWritable(sum) );
    }
}

Driver Class


package com.example

public class EnronMail extends MongoTool {

    public int run( String[] args ) throws Exception{
        final Configuration conf = getConf();

        final Job job = new Job( conf, "enron-messgaes" );
        job.setJarByClass(EnronMail.class);
        job.setMapperClass(EnronMailMapper.class);
        job.setReducerClass(EnronMailReducer.class);

        job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);
        job.setMapOutputKeyClass(MailPair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(com.mongodb.hadoop.MongoOutputFormat.class);
        job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
        job.setOutputValueClass(org.apache.hadoop.io.IntWritable.class);

        return job.waitForCompletion( true ) ? 0 : 1;
    }
 
    public static void main( final String[] pArgs ) throws Exception{
        System.exit( ToolRunner.run( new EnronMail(), pArgs ) );
    }
}
Compile the jar and save it by the name enron.jar To run this MapReduce use below command:

hadoop jar enron.jar com.example.EnronMail -Dmongo.input.uri=mongodb://localhost:27017/enron.messages -Dmongo.output.uri=mongodb://localhost:27017/enron.messages_out
After running the MapReduce program we can query enron.messages_out collection to see if it is populated properly.

use enron
db.messages_out.findOne()
{
 "_id" : {
  "f" : "-nikole@excite.com",
  "t" : "bill.williams@enron.com"
 },
 "value" : 8
}

Implementing Custom Writable

Implementing Custom Writable

Implementing Custom Writable

Hadoop MapReduce uses implementations of Writables for interacting with user-provided Mappers and Reducers. Hadoop provides a lot of implementations of Writables which are listed here, but sometimes we need to pass custom objects and these custom objects should implement Hadoop's Writable interface. In this post we are going to describe a custom class IntPair. To implement the Writable interface we require two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}
The code for IntPair is given below:

public class IntPair implements Writable{
 private IntWritable first;
 private IntWritable second;
 
 public IntPair() {
  set(new IntWritable(), new IntWritable());
 }
 
 public IntPair(Integer first, Integer second) {
  set(new IntWritable(first), new IntWritable(second));
 }
 
 public void set(IntWritable first, IntWritable second) {
  this.first = first;
  this.second = second;
 }
 
 public IntWritable getFirst() {
  return first;
 }
 
 public Integer getFirstInt() {
  return new Integer(first.toString());
 }
 
 public Integer getSecondInt() {
  return new Integer(second.toString());
 }
 public IntWritable getSecond() {
  return second;
 }
 
 public void write(DataOutput out) throws IOException {
 first.write(out);
 second.write(out);
 }
 
 public void readFields(DataInput in) throws IOException {
 first.readFields(in);
 second.readFields(in);
 }
}
Now we can use this IntPair class in Hadoop MapReduce as value type. If we want to use IntPair as key in MapReduce then it needs to implement WritableComparable, which we shall cover in a different post.

Wednesday, 16 October 2013

MapReduce on HBase Table

MapReduce on HBase Table

MapReduce on HBase Table

In my last post HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.

For our input we shall create a HBase table with below data:

create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'

create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.

The mapper class is:

public class testMapper extends TableMapper {
 
 @Override
 public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
   throws IOException, InterruptedException {

  try {
   // get rowKey and convert it to string
   String inKey = new String(rowKey.get());
   // set new key having only date
   String oKey = inKey.split("#")[0];
   // get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
   byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
   String sSales = new String(bSales);
   Integer sales = new Integer(sSales);
   // emit date and sales values
   context.write(new Text(oKey), new IntWritable(sales));
  } catch (RuntimeException e){
   e.printStackTrace();
  }
 }
}
The reducer class is:

public class testReducer extends TableReducer{
 
 @Override
 public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
  try {
   int sum = 0;
   // loop through different sales vales and add it to sum
   for (IntWritable sales : values) {
    Integer intSales = new Integer(sales.toString());
    sum += intSales;
   } 
   
   // create hbase put with rowkey as date
   Put insHBase = new Put(key.getBytes());
   // insert sum value to hbase 
   insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
   // write data to Hbase table
   context.write(null, insHBase);

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
The driver class is:

public class testDriver {
  public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 
 // define scan and define column families to scan
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes("cf1"));

 Job job = new Job(conf); 
 
    job.setJarByClass(testDriver.class);
 // define input hbase table
    TableMapReduceUtil.initTableMapperJob(
     "test1",
        scan,
        testMapper.class,
        Text.class,
        IntWritable.class,
        job);
 // define output table
    TableMapReduceUtil.initTableReducerJob(
      "test2",
      testReducer.class, 
      job);

    job.waitForCompletion(true);
  }
}
We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.

hbase(main):013:0> scan 'test2'
ROW                                COLUMN+CELL                                                                                        
 20130101                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2                                    
 20130102                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A                                    
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.

hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.