In-Mapper Combiner
Recently I read a book on
Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.
Advantages of in-mapper combiner over traditional combiner:
When a mapper with a traditional combiner (the mini-reducer) emits the
key-value pair, they are collected in the memory buffer and then the
combiner aggregates a batch of these key-value pairs before sending them
to the reducer. The drawbacks of this approach are:
- The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
- Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
- A combiner only combines data in the same buffer. Thus, we may still
generate a lot of network traffic during the shuffle phase even if most
of the keys from a single mapper are the same. To see this, consider
the word count example, assuming that buffer size is 3, and we have
<key, value> = <Stanford, 3>, <Berkeley, 1>,
<Stanford, 7>, <Berkeley, 7>, and <Stanford, 2>
emitted from one mapper. The first three items will be in one buffer,
and last two will be in the the other buffer; as a result, the combiner
will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley,
7>, <Stanford, 2>. If we use in-mapper combiner, we will get
<Stanford, 12>, <Berkeley, 8>.
Consider the case of calculating average marks for student. Let us consider we have below dataset
s_id c_id marks
8001 101 78
8001 102 88
8002 101 56
8002 102 77
The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
method Map(integer s_id, integer m)
Emit(integer s_id, integer m)
class Reducer
method Reduce(integer s_id, integer [m1 , m2 , . . .])
sum ← 0
cnt ← 0
for all integer m ∈ integer [m1 , m2 , . . .] do
sum ← sum + m
cnt ← cnt + 1
avg_m ← sum/cnt
Emit(integer s_id, float avg_m )
If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.
We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:
class Mapper
method Initialize
S ← new AssociativeArray
C ← new AssociativeArray
method Map(integer s_id, integer m)
S{s_id} ← S{s_id} + m
C{s_id} ← C{s_id} + 1
method Close
for all integer s_id ∈ S do
Emit(integer s_id, pair (S{s_id}, C{s_id}))
class Reducer
method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
sum ← 0
cnt ← 0
for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
sum ← sum + s
cnt ← cnt + c
avg_m ← sum/cnt
Emit(integer s_id, float avg_m )
Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.
In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.