Thursday, 26 December 2013

In-mapper Combiner Program to Calculate Average

In-mapper Combiner Program to Calculate Average

In my previous post I described how we can use in-mapper combiner to make our M/R program more efficient. In the post, we also saw both M/R algorithm for average calculation with/without using in-mapper combiner optimization.

In this post I am posting codes for both the algorithm.

The M/R program to calculate average without in-mapper combiner is given below:

The M/R program to calculate average with in-mapper combiner is given below:

The programs took 56sec and 42sec respectively for execution on my laptop for 10million records. So we can see a 33% improvement in time while using in-mapper combiner program.

In-Mapper Combiner

In-Mapper Combiner

Recently I read a book on Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.

Advantages of in-mapper combiner over traditional combiner:

When a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are:
  1. The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
  2. Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
  3. A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.

Consider the case of calculating average marks for student. Let us consider we have below dataset

s_id    c_id    marks
8001    101    78
8001    102    88
8002    101    56
8002    102    77

The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
    method Map(integer s_id, integer m)
        Emit(integer s_id, integer m)

class Reducer
    method Reduce(integer s_id, integer [m1 , m2 , . . .])
        sum ← 0
        cnt ← 0
        for all integer m ∈ integer [m1 , m2 , . . .] do
            sum ← sum + m
            cnt ← cnt + 1
        avg_m ← sum/cnt
        Emit(integer s_id, float avg_m )

If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to  reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.

We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:

class Mapper
    method Initialize
        S ← new AssociativeArray
        C ← new AssociativeArray
    method Map(integer s_id, integer m)
        S{s_id} ← S{s_id} + m
        C{s_id} ← C{s_id} + 1
    method Close
        for all integer s_id ∈ S do
            Emit(integer s_id, pair (S{s_id}, C{s_id}))

class Reducer
    method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
        sum ← 0
        cnt ← 0
    for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
            sum ← sum + s
            cnt ← cnt + c
    avg_m ← sum/cnt
    Emit(integer s_id, float avg_m )

Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.

In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.

Friday, 20 December 2013

Hive Data Types

Hive Data Types 

Hive supports different data types to be used in table columns. The data types supported by Hive can be broadly classified in Primitive and Complex data types.

The primitive data types supported by Hive are listed below:

1. Numeric Types

  • TINYINT (1-byte signed integer, from -128 to 127)
  • SMALLINT (2-byte signed integer, from -32,768 to 32,767)
  • INT (4-byte signed integer, from -2,147,483,648 to 2,147,483,647)
  • BIGINT (8-byte signed integer, from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807)
  • FLOAT (4-byte single precision floating point number)
  • DOUBLE (8-byte double precision floating point number)
  • DECIMAL (Hive 0.13.0 introduced user definable precision and scale)

2. Date/Time Types

  • DATE

3. String Types

  • CHAR

4. Misc Types

Apart from these primitive data types Hive offers some complex data types which are listed below:

5. Complex Types

  • arrays: ARRAY<data_type>
  • maps: MAP<primitive_type, data_type>
  • structs: STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • union: UNIONTYPE<data_type, data_type, ...>

Wednesday, 18 December 2013

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

In this blog I am going to show case how we can develop a quick and easy demo application for clickstream weblog ingestion, search and visualization. We will achieve this using Logstash for log ingestion, store it in ElasticSearch and make a pretty dashboard using Kibana. For clickstream weblog I am using logs data from ECML/PKDD 2005 Discovery Challenge . You can download complete weblogs after registering there. These weblog are delimited by semi-colon (;) and have below mentioned fields in order:
  • shop_id
  • unixtime
  • client ip
  • session
  • visted page
  • referrer
Here are some sample log lines:
For creating this demo we need to create a logstash configuration file (lets name this file clickstream.conf) which consists of specifying inputs, filters and outputs. The clickstream.conf file looks like:
input { 
  file {
# path for clickstream log
    path => "/home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log"
# define a type for all events handeled by this input
    type => "weblog"
    start_position => "beginning"
# the clickstream log is in character set ISO-8859-1
    codec => plain {charset => "ISO-8859-1"}

filter {
  csv {
# define columns present in weblog
    columns => [shop_id, unixtime, client_ip, session, page, referrer]
    separator => ";"
  grok {
# get visited page and page parameters
    match => ["page", "%{URIPATH:page_visited}(?:%{URIPARAM:page_params})?"]
     remove_field => ["page"]
  date {
# as we are getting unixtime field in epoch seconds we will convert it to normal timestamp
    match => [ "unixtime", "UNIX" ]
  geoip {
# this will convert ip to longitude-latitude using GeoLiteCity database from Maxmind
    source => "client_ip"
    fields => ["latitude","longitude"]
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  mutate {
# this will convert geoip.coordinates to float values
    convert => [ "[geoip][coordinates]", "float" ]

output { 
# store output in local elasticsearch cluster
  elasticsearch {
    host => ""
To start logstash agent we run below command:
java -jar logstash-1.2.2-flatjar.jar agent -f clickstream.conf
A sample record in ElasticSearch looks like this:

    _index: logstash-2004.02.01
    _type: logs
    _id: I1N0MboUR0O1O3RZ-qXqnw
    _version: 1
    _score: 1
    _source: {
        message: [
        @timestamp: 2004-02-01T18:00:07.000Z
        @version: 1
        type: weblog
        path: /home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log
        shop_id: 14
        unixtime: 1075658407
        session: f07f39ec63abf67f965684f3fa5729c4
        page_visited: /findp/
        page_params: ?&id=63&view=1,2,3,14,20,15&p_14=nerez
        geoip: {
            latitude: 50.08330000000001
            longitude: 14.466700000000003
            coordinates: [

So we have parsed complex log message into simpler components and converted fields like unixtime to datetime, IP to latitude-longitude and got page visited by the client. Now using Kibana we can quickly make dashboard with these panels