Thursday 26 December 2013

In-Mapper Combiner

In-Mapper Combiner

Recently I read a book on Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.

Advantages of in-mapper combiner over traditional combiner:

When a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are:
  1. The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
  2. Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
  3. A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.



Consider the case of calculating average marks for student. Let us consider we have below dataset

s_id    c_id    marks
8001    101    78
8001    102    88
8002    101    56
8002    102    77

The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
    method Map(integer s_id, integer m)
        Emit(integer s_id, integer m)

class Reducer
    method Reduce(integer s_id, integer [m1 , m2 , . . .])
        sum ← 0
        cnt ← 0
        for all integer m ∈ integer [m1 , m2 , . . .] do
            sum ← sum + m
            cnt ← cnt + 1
        avg_m ← sum/cnt
        Emit(integer s_id, float avg_m )

If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to  reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.

We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:

class Mapper
    method Initialize
        S ← new AssociativeArray
        C ← new AssociativeArray
    method Map(integer s_id, integer m)
        S{s_id} ← S{s_id} + m
        C{s_id} ← C{s_id} + 1
    method Close
        for all integer s_id ∈ S do
            Emit(integer s_id, pair (S{s_id}, C{s_id}))

class Reducer
    method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
        sum ← 0
        cnt ← 0
    for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
            sum ← sum + s
            cnt ← cnt + c
    avg_m ← sum/cnt
    Emit(integer s_id, float avg_m )

Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.

In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.

No comments:

Post a Comment