Tuesday, 16 September 2014

HBase: MapReduce On Multiple Input Table

Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.

The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -

create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'


The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'

Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'

The mapper class for this job is -

Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.

The reducer class for this job is -

Based on intermediate key value I am using aggregation in reducer.

Finally the driver class for this job is

In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.

Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob

Once the job is complete use HBase shell to verify output results -
hbase(main):034:0> scan 'totalSales'
ROW                                        COLUMN+CELL                                                                                                               
 o#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x90                                                        
 o#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\x82                                                        
 s#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\xD2                                                        
 s#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x9A                                                        
4 row(s) in 0.0410 seconds
hbase(main):035:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
=> 400
hbase(main):036:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\x82".to_java_bytes)
=> 130
hbase(main):037:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):038:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410

MultiTableInputFormat can be used for doing HBase table joins too, I shall try that some time.

8 comments:

  1. Excellent post Rishav. This helped me. Thanks. Eagerly waiting for joins in Hbase post.

    ReplyDelete
    Replies
    1. Thanks Hari, will post about joins in HBase soon.

      Delete
  2. Hi Rishav,
    Thank you so much for posting the hbase code on joins.I tried to run this code ,but I got struck with one error in the driver program

    The below code initTableMapperJob method should take table name,scan object,mapper class ,key,value,jobname as parameters as there is no table name it is throwing error.And there are two tables here.Can you please advise how to fix this error.
    TableMapReduceUtil.initTableMapperJob
    (scans,
    JoinMapper.class,
    Text.class,
    IntWritable.class,
    job);

    ReplyDelete
    Replies
    1. Hi Giridhar,
      initTableMapperJob method is over-ridden and I have used this https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html#initTableMapperJob%28java.util.List,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20org.apache.hadoop.mapreduce.Job%29

      Also check your HBase version, this is supported starting with HBase-0.94.5

      Delete
  3. Hi Rishav,

    As per your suggestion,I added the Hbase jar version to 0.94.5 from 0.94.2 & it is working fine.Thank you so much

    ReplyDelete
  4. Hi Rishav,

    Have you posted about joins?

    ReplyDelete
  5. hi Rishav
    can you please post the pom.xml or the link to the jar files you used in this tuto
    thanks

    ReplyDelete