Wednesday 16 October 2013

MapReduce on HBase Table

MapReduce on HBase Table

MapReduce on HBase Table

In my last post HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.

For our input we shall create a HBase table with below data:

create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'

create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.

The mapper class is:

public class testMapper extends TableMapper {
 
 @Override
 public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
   throws IOException, InterruptedException {

  try {
   // get rowKey and convert it to string
   String inKey = new String(rowKey.get());
   // set new key having only date
   String oKey = inKey.split("#")[0];
   // get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
   byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
   String sSales = new String(bSales);
   Integer sales = new Integer(sSales);
   // emit date and sales values
   context.write(new Text(oKey), new IntWritable(sales));
  } catch (RuntimeException e){
   e.printStackTrace();
  }
 }
}
The reducer class is:

public class testReducer extends TableReducer{
 
 @Override
 public void reduce(Text key, Iterable values, Context context) 
   throws IOException, InterruptedException {
  try {
   int sum = 0;
   // loop through different sales vales and add it to sum
   for (IntWritable sales : values) {
    Integer intSales = new Integer(sales.toString());
    sum += intSales;
   } 
   
   // create hbase put with rowkey as date
   Put insHBase = new Put(key.getBytes());
   // insert sum value to hbase 
   insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
   // write data to Hbase table
   context.write(null, insHBase);

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
The driver class is:

public class testDriver {
  public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 
 // define scan and define column families to scan
 Scan scan = new Scan();
 scan.addFamily(Bytes.toBytes("cf1"));

 Job job = new Job(conf); 
 
    job.setJarByClass(testDriver.class);
 // define input hbase table
    TableMapReduceUtil.initTableMapperJob(
     "test1",
        scan,
        testMapper.class,
        Text.class,
        IntWritable.class,
        job);
 // define output table
    TableMapReduceUtil.initTableReducerJob(
      "test2",
      testReducer.class, 
      job);

    job.waitForCompletion(true);
  }
}
We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.

hbase(main):013:0> scan 'test2'
ROW                                COLUMN+CELL                                                                                        
 20130101                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2                                    
 20130102                          column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A                                    
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.

hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.

31 comments:

  1. Good Work!

    I have small doubt that can't we use directly HBASE built in commands for copying data from one table to another.

    ReplyDelete
    Replies
    1. In you reducer method parameters, you must use generic type IntWritable to the iterable object. Otherwise it leads to error.

      Delete
    2. Yes you can copy hbase table data to another table using copyTable (http://hbase.apache.org/book/ops_mgt.html#copytable)

      Delete
    3. Rajesh, in reducer sales is IntWritable only. Where are you getting error?
      I have tested these programs on hadoop--1.0.3 and hbase-0.94.4. what versions are you using?

      Delete
  2. Good work Rishav Rohit.. Do you know how to add flume twitter data into cassandra table??.. if,please help me out..

    ReplyDelete
    Replies
    1. Thanks Vinay. Haven't got Chance to work on Cassandra yet :(

      Delete
  3. Could you list which libraries included in your demo? I can't tell which libraries to include, and cant run your example.

    ReplyDelete
    Replies
    1. Include all jars from hbase lib and hadoop lib.

      Delete
    2. This comment has been removed by the author.

      Delete
    3. This comment has been removed by the author.

      Delete
    4. for (IntWritable sales : values) {
      Integer intSales = new Integer(sales.toString());
      sum += intSales;
      }
      This part has the error. cant convert object to intwritable.

      Delete
    5. I have tested all the programs on Hadoop1.0.4, HBase0.94 and java1.6.
      can you paste the complete error trace.

      Delete
    6. This comment has been removed by the author.

      Delete
  4. Nice example, thanks for putting this together. When I tried to use hadoop jar, JVM couldn't find HBaseConfiguration, I guess that is expected since hbase jars are not expected to be in the hadoop classpath? I ended up creating a small wrapper which used `hbase classpath` output and got it working.

    ReplyDelete
  5. how to read from two hbase table simultaneously using map reduce?

    ReplyDelete
    Replies
    1. reading from multiple HBase table in MR job is supported in versions 0.94.5 and 0.95.0, check this for an example https://issues.apache.org/jira/browse/HBASE-3996

      Delete
    2. I am not sure. Looking at the example I think no.

      What is your requirement? I assume you want to join the two tables on some common key, and in that case the example at the link pointed by me should be good pointer.

      Delete
  6. Dear Rishav

    I want to read two hbase table simultaniously and then in reducer i want to do UNION of output of two mapper

    I am providing sudo code which i want to do

    Class Test
    {

    Class MapA extends TableMapper
    {
    map()
    {
    Logic That read Hbase Table A
    context.write("A "+key,Value);
    }
    }

    Class MapB extends TableMapper
    {
    map()
    {
    Logic That read Hbase Table B
    context.write("B "+key,Value);
    }
    }

    Class Reduce extends Reducer
    {
    ArrayList A;
    ArrayList B;
    reduce()
    {
    if(key contain "A")
    A.add(KeyValue Pair)
    else
    B.add(KeyValue Pair)
    }

    My Logic to union all the data
    }

    main()
    {
    Job job = new Job(config,"Test");
    TableMapReduceUtil.initTableMapperJob(Table1, scan, Test.MapA.class, Text.class, Text.class, job);
    TableMapReduceUtil.initTableMapperJob(Table2, scan, Test.MapB.class, Text.class, Text.class, job);
    job.setReducerClass(Test.Reduce.class);

    }

    }

    ReplyDelete
    Replies
    1. Nil you can check this post http://rishavrohitblog.blogspot.com/2014/09/hbase-mapreduce-on-multiple-input-table.html

      Delete
  7. This comment has been removed by the author.

    ReplyDelete
  8. This comment has been removed by the author.

    ReplyDelete
  9. Hi Rishit,
    I am running hadoop 2.2 and hbase 0.98.1 on fedora 20. When I am trying the above example, eclipse shows error ..map(ImmutableBytesWritable r....) and on hovering it informs there is no such definition and asks me to remove @Override. Removing override compiles the program successfully, but I get runtime error - "Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.hbase.io.ImmutableBytesWritable". I feel that it is a problem in the libraries I have imported. Can you let me know how to rectify this error and the list of packages that have to be imported?

    Regards,
    Seenu.

    ReplyDelete
  10. Do we require to create output table(test2) before running the jar?

    Thanks,
    Bhaskar

    ReplyDelete
    Replies
    1. Yes Bhaskar. I have given the create table statement also in the beginning of the post.

      Delete
  11. Great example, thanks! I was running the test but getting a weird error where keys are overlaid. For example, if input table has these rows:

    abc, 1
    x, 2

    Applying a simple map (k,v) --> (k,v) just to check it is working ok, I get

    abc, 1
    xbc, 2

    Additionally, add Bytes.toBytes(String.valueOf(sum)) to avoid writing bytes.

    Any ideas why I getting this mistake with the keys?

    Best,
    Tomas

    ReplyDelete
    Replies
    1. Use

      Put put = new Put(Bytes.toBytes(key.toString()));

      as explained in http://hbase.apache.org/0.94/book.html#mapreduce.example.readwrite

      Delete
    2. HBase stores everything (int/long, other objects) as Bytes, the only benefit of storing String is that the Hbase shell can display String properly. If you use Bytes.toBytes(String.valueOf(sum)) then in your subsequent jobs/applications you need to parse this back to proper datatype or object which is not a good approach.

      Delete
  12. Rishav Rohit , Thanks for the code man.. I tried it.. But it wont give anything out. Nothing get copied to the test2 table.. Everything is same as yours. test1 table, code everything.. But nothing happens.. Can you help me?

    ReplyDelete

  13. In Hadoop, MapReduce is a calculation that decomposes large manipulation jobs into individual tasks that can be executed in parallel cross a cluster of servers. The results of tasks can be joined together to compute final results.
    Mapreduce program example
    Hadoop fs command using java api

    ReplyDelete
  14. very informative blog and useful article thank you for sharing with us , keep posting Big Data Hadoop Online Training Hyderabad

    ReplyDelete