Tuesday, 16 September 2014

HBase: MapReduce On Multiple Input Table

Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.

The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -

create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'

The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'

Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'

The mapper class for this job is -
package com.rishav.hbase.union;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class UnionMapper extends TableMapper<Text, IntWritable> {
private static byte[] storeSalesTable = Bytes.toBytes("storeSales");
private static byte[] onlineSalesTable = Bytes.toBytes("onlineSales");
byte[] sales;
String storeSales;
Integer sSales;
String onlineSales;
Integer oSales;
Text mapperKey;
IntWritable mapperValue;
public void map(ImmutableBytesWritable rowKey, Result columns, Context context) {
// get table name
TableSplit currentSplit = (TableSplit)context.getInputSplit();
byte[] tableName = currentSplit.getTableName();
try {
if (Arrays.equals(tableName, storeSalesTable)) {
String date = new String(rowKey.get()).split("#")[0];
sales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sSales"));
storeSales = new String(sales);
sSales = new Integer(storeSales);
mapperKey = new Text("s#" + date);
mapperValue = new IntWritable(sSales);
context.write(mapperKey, mapperValue);
} else if (Arrays.equals(tableName, onlineSalesTable)) {
String date = new String(rowKey.get());
sales = columns.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("oSales"));
onlineSales = new String(sales);
Integer oSales = new Integer(onlineSales);
mapperKey = new Text("o#"+date);
mapperValue = new IntWritable(oSales);
context.write(mapperKey, mapperValue);
} catch (Exception e) {
// TODO : exception handling logic

Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.

The reducer class for this job is -
package com.rishav.hbase.union;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class UnionReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
if (key.toString().startsWith("s")) {
Integer dayStoreSales = 0;
for (IntWritable storeSale : values) {
dayStoreSales = dayStoreSales + new Integer(storeSale.toString());
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("tSales"), Bytes.toBytes(dayStoreSales));
try {
context.write(null, put);
} catch (IOException e) {
// TODO Auto-generated catch block
} catch (InterruptedException e) {
// TODO Auto-generated catch block
} else {
Integer dayStoreSales = 0;
for (IntWritable storeSale : values) {
dayStoreSales = dayStoreSales + new Integer(storeSale.toString());
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("tSales"), Bytes.toBytes(dayStoreSales));
try {
context.write(null, put);
} catch (IOException e) {
// TODO Auto-generated catch block
} catch (InterruptedException e) {
// TODO Auto-generated catch block

Based on intermediate key value I am using aggregation in reducer.

Finally the driver class for this job is
package com.rishav.hbase.union;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
public class UnionJob extends Configured implements Tool {
public int run(String[] arg0) throws Exception {
List<Scan> scans = new ArrayList<Scan>();
Scan scan1 = new Scan();
scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("storeSales"));
Scan scan2 = new Scan();
scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("onlineSales"));
Configuration conf = new Configuration();
Job job = new Job(conf);
return 0;
public static void main(String[] args) throws Exception {
UnionJob runJob = new UnionJob();
view raw UnionJob.java hosted with ❤ by GitHub

In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.

Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob

Once the job is complete use HBase shell to verify output results -
hbase(main):034:0> scan 'totalSales'
ROW                                        COLUMN+CELL                                                                                                               
 o#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x90                                                        
 o#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\x82                                                        
 s#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\xD2                                                        
 s#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x9A                                                        
4 row(s) in 0.0410 seconds
hbase(main):035:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
=> 400
hbase(main):036:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\x82".to_java_bytes)
=> 130
hbase(main):037:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):038:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410

MultiTableInputFormat can be used for doing HBase table joins too, I shall try that some time.


  1. Excellent post Rishav. This helped me. Thanks. Eagerly waiting for joins in Hbase post.

    1. Thanks Hari, will post about joins in HBase soon.

  2. Hi Rishav,
    Thank you so much for posting the hbase code on joins.I tried to run this code ,but I got struck with one error in the driver program

    The below code initTableMapperJob method should take table name,scan object,mapper class ,key,value,jobname as parameters as there is no table name it is throwing error.And there are two tables here.Can you please advise how to fix this error.

    1. Hi Giridhar,
      initTableMapperJob method is over-ridden and I have used this https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html#initTableMapperJob%28java.util.List,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20org.apache.hadoop.mapreduce.Job%29

      Also check your HBase version, this is supported starting with HBase-0.94.5

  3. Hi Rishav,

    As per your suggestion,I added the Hbase jar version to 0.94.5 from 0.94.2 & it is working fine.Thank you so much

  4. Hi Rishav,

    Have you posted about joins?

  5. hi Rishav
    can you please post the pom.xml or the link to the jar files you used in this tuto

  6. It was Really a nice article and i was really impressed by reading this B Big Data Hadoop Online Training India

  7. Thanks for your article. Its very helpful. Thank you very much for your explanation. Hadoop training in chennai | Hadoop Training institute in chennai

    1. In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.

      Big Data Projects For Final Year Students

      Image Processing Projects For Final Year

      Deep Learning Projects for Final Year

      Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
      export HADOOP_CLASSPATH=`hbase classpath`
      hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob
