MapReduce on HBase Table
MapReduce on HBase Table
In my last post
HBase Table MapReduce Basics I explained about some basic guidelines to follow while writing MapReduce program. In this post I will post a sample MapReduce program which reads data from HBase table, does some aggregation and writes the output to another HBase table.
For our input we shall create a HBase table with below data:
create 'test1', 'cf1'
put 'test1', '20130101#1', 'cf1:sales', '100'
put 'test1', '20130101#2', 'cf1:sales', '110'
put 'test1', '20130102#1', 'cf1:sales', '200'
put 'test1', '20130102#2', 'cf1:sales', '210'
create 'test2', 'cf1'
For the table "test1", the rowkey is composed of date and store number and we have sales value for each store.
We shall do an aggregation on date to get total sales.
The mapper class is:
public class testMapper extends TableMapper {
@Override
public void map(ImmutableBytesWritable rowKey, Result columns, Context context)
throws IOException, InterruptedException {
try {
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format first and then convert it to string (as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new Text(oKey), new IntWritable(sales));
} catch (RuntimeException e){
e.printStackTrace();
}
}
}
The reducer class is:
public class testReducer extends TableReducer{
@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
try {
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values) {
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
// create hbase put with rowkey as date
Put insHBase = new Put(key.getBytes());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
}
The driver class is:
public class testDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// define scan and define column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
Job job = new Job(conf);
job.setJarByClass(testDriver.class);
// define input hbase table
TableMapReduceUtil.initTableMapperJob(
"test1",
scan,
testMapper.class,
Text.class,
IntWritable.class,
job);
// define output table
TableMapReduceUtil.initTableReducerJob(
"test2",
testReducer.class,
job);
job.waitForCompletion(true);
}
}
We shall build the jar and run hadoop jar command for testDriver class. This will populate the data to output HBase table test2.
hbase(main):013:0> scan 'test2'
ROW COLUMN+CELL
20130101 column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\xD2
20130102 column=cf1:sum, timestamp=1381869476363, value=\x00\x00\x0\x9A
2 row(s) in 0.0240 seconds
The sum values are displayed as bytes (HBase stores everthing as bytes), we can convert it to readable integer format in HBase shell.
hbase(main):014:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):015:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410
So we are getting correct aggregated values for sales in output.