MapReduce on HBase Table
MapReduce on HBase Table
In my last post
HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.
For our input we shall create a HBase table with below data:
create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'
create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.
The mapper class is:
public class testMapper extends TableMapper {
@Override
public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
throws IOException, InterruptedException {
try {
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new Text(oKey), new IntWritable(sales));
} catch (RuntimeException e){
e.printStackTrace();
}
}
}
The reducer class is:
public class testReducer extends TableReducer{
@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
try {
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values) {
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
// create hbase put with rowkey as date
Put insHBase = new Put(key.getBytes());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
}
The driver class is:
public class testDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// define scan and define column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
Job job = new Job(conf);
job.setJarByClass(testDriver.class);
// define input hbase table
TableMapReduceUtil.initTableMapperJob(
"test1",
scan,
testMapper.class,
Text.class,
IntWritable.class,
job);
// define output table
TableMapReduceUtil.initTableReducerJob(
"test2",
testReducer.class,
job);
job.waitForCompletion(true);
}
}
We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.
hbase(main):013:0> scan 'test2'
ROW COLUMN+CELL
20130101 column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2
20130102 column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.
hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.
Good Work!
ReplyDeleteI have small doubt that can't we use directly HBASE built in commands for copying data from one table to another.
In you reducer method parameters, you must use generic type IntWritable to the iterable object. Otherwise it leads to error.
DeleteYes you can copy hbase table data to another table using copyTable (http://hbase.apache.org/book/ops_mgt.html#copytable)
DeleteRajesh, in reducer sales is IntWritable only. Where are you getting error?
DeleteI have tested these programs on hadoop--1.0.3 and hbase-0.94.4. what versions are you using?
Good work Rishav Rohit.. Do you know how to add flume twitter data into cassandra table??.. if,please help me out..
ReplyDeleteThanks Vinay. Haven't got Chance to work on Cassandra yet :(
DeleteCould you list which libraries included in your demo? I can't tell which libraries to include, and cant run your example.
ReplyDeleteInclude all jars from hbase lib and hadoop lib.
DeleteThis comment has been removed by the author.
DeleteThis comment has been removed by the author.
Deletefor (IntWritable sales : values) {
DeleteInteger intSales = new Integer(sales.toString());
sum += intSales;
}
This part has the error. cant convert object to intwritable.
I have tested all the programs on Hadoop1.0.4, HBase0.94 and java1.6.
Deletecan you paste the complete error trace.
This comment has been removed by the author.
DeleteNice example, thanks for putting this together. When I tried to use hadoop jar, JVM couldn't find HBaseConfiguration, I guess that is expected since hbase jars are not expected to be in the hadoop classpath? I ended up creating a small wrapper which used `hbase classpath` output and got it working.
ReplyDeletehow to read from two hbase table simultaneously using map reduce?
ReplyDeletereading from multiple HBase table in MR job is supported in versions 0.94.5 and 0.95.0, check this for an example https://issues.apache.org/jira/browse/HBASE-3996
DeleteI am not sure. Looking at the example I think no.
DeleteWhat is your requirement? I assume you want to join the two tables on some common key, and in that case the example at the link pointed by me should be good pointer.
Dear Rishav
ReplyDeleteI want to read two hbase table simultaniously and then in reducer i want to do UNION of output of two mapper
I am providing sudo code which i want to do
Class Test
{
Class MapA extends TableMapper
{
map()
{
Logic That read Hbase Table A
context.write("A "+key,Value);
}
}
Class MapB extends TableMapper
{
map()
{
Logic That read Hbase Table B
context.write("B "+key,Value);
}
}
Class Reduce extends Reducer
{
ArrayList A;
ArrayList B;
reduce()
{
if(key contain "A")
A.add(KeyValue Pair)
else
B.add(KeyValue Pair)
}
My Logic to union all the data
}
main()
{
Job job = new Job(config,"Test");
TableMapReduceUtil.initTableMapperJob(Table1, scan, Test.MapA.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableMapperJob(Table2, scan, Test.MapB.class, Text.class, Text.class, job);
job.setReducerClass(Test.Reduce.class);
}
}
Nil you can check this post http://rishavrohitblog.blogspot.com/2014/09/hbase-mapreduce-on-multiple-input-table.html
DeleteThis comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHi Rishit,
ReplyDeleteI am running hadoop 2.2 and hbase 0.98.1 on fedora 20. When I am trying the above example, eclipse shows error ..map(ImmutableBytesWritable r....) and on hovering it informs there is no such definition and asks me to remove @Override. Removing override compiles the program successfully, but I get runtime error - "Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.hbase.io.ImmutableBytesWritable". I feel that it is a problem in the libraries I have imported. Can you let me know how to rectify this error and the list of packages that have to be imported?
Regards,
Seenu.
Do we require to create output table(test2) before running the jar?
ReplyDeleteThanks,
Bhaskar
Yes Bhaskar. I have given the create table statement also in the beginning of the post.
DeleteGreat example, thanks! I was running the test but getting a weird error where keys are overlaid. For example, if input table has these rows:
ReplyDeleteabc, 1
x, 2
Applying a simple map (k,v) --> (k,v) just to check it is working ok, I get
abc, 1
xbc, 2
Additionally, add Bytes.toBytes(String.valueOf(sum)) to avoid writing bytes.
Any ideas why I getting this mistake with the keys?
Best,
Tomas
Use
DeletePut put = new Put(Bytes.toBytes(key.toString()));
as explained in http://hbase.apache.org/0.94/book.html#mapreduce.example.readwrite
HBase stores everything (int/long, other objects) as Bytes, the only benefit of storing String is that the Hbase shell can display String properly. If you use Bytes.toBytes(String.valueOf(sum)) then in your subsequent jobs/applications you need to parse this back to proper datatype or object which is not a good approach.
DeleteGood example
ReplyDeleteRishav Rohit , Thanks for the code man.. I tried it.. But it wont give anything out. Nothing get copied to the test2 table.. Everything is same as yours. test1 table, code everything.. But nothing happens.. Can you help me?
ReplyDelete
ReplyDeleteIn Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
Mapreduce program example
Hadoop fs command using java api
very informative blog and useful article thank you for sharing with us , keep posting Big Data Hadoop Online Training Hyderabad
ReplyDelete