Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.
The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -
create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'
The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'
Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'
The mapper class for this job is -
Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.
The reducer class for this job is -
Based on intermediate key value I am using aggregation in reducer.
Finally the driver class for this job is
In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.
Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob
Once the job is complete use HBase shell to verify output results -
hbase(main):034:0> scan 'totalSales'
ROW COLUMN+CELL
o#20130101 column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x90
o#20130102 column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\x82
s#20130101 column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\xD2
s#20130102 column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x9A
4 row(s) in 0.0410 seconds
hbase(main):035:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
=> 400
hbase(main):036:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\x82".to_java_bytes)
=> 130
hbase(main):037:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):038:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
MultiTableInputFormat can be used for doing HBase table joins too, I shall try that some time.
Excellent post Rishav. This helped me. Thanks. Eagerly waiting for joins in Hbase post.
ReplyDeleteThanks Hari, will post about joins in HBase soon.
DeleteHi Rishav,
ReplyDeleteThank you so much for posting the hbase code on joins.I tried to run this code ,but I got struck with one error in the driver program
The below code initTableMapperJob method should take table name,scan object,mapper class ,key,value,jobname as parameters as there is no table name it is throwing error.And there are two tables here.Can you please advise how to fix this error.
TableMapReduceUtil.initTableMapperJob
(scans,
JoinMapper.class,
Text.class,
IntWritable.class,
job);
Hi Giridhar,
DeleteinitTableMapperJob method is over-ridden and I have used this https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.html#initTableMapperJob%28java.util.List,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class,%20org.apache.hadoop.mapreduce.Job%29
Also check your HBase version, this is supported starting with HBase-0.94.5
Hi Rishav,
ReplyDeleteAs per your suggestion,I added the Hbase jar version to 0.94.5 from 0.94.2 & it is working fine.Thank you so much
great. you are welcome
DeleteHi Rishav,
ReplyDeleteHave you posted about joins?
hi Rishav
ReplyDeletecan you please post the pom.xml or the link to the jar files you used in this tuto
thanks
It was Really a nice article and i was really impressed by reading this B Big Data Hadoop Online Training India
ReplyDeleteThanks for your article. Its very helpful. Thank you very much for your explanation. Hadoop training in chennai | Hadoop Training institute in chennai
ReplyDeleteIn the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.
DeleteBig Data Projects For Final Year Students
Image Processing Projects For Final Year
Deep Learning Projects for Final Year
Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob
Really useful information.Thank you so much for your thought sharing with us.
ReplyDeletecheck out:
best institute for big data in chennai
best hadoop training in chennaii
big data course fees in chennai