Wednesday, 16 October 2013

HBase Table MapReduce Basics

HBase Table MapReduce Basics

HBase Table MapReduce Basics

In this post I am going to explain how we can use HBase tables as source and target for MapReduce program.

For writing MapReduce on HBase tables we should follow below guidelines:

1. Mapper Class

  • Mapper class should extend TableMapper.
  • Input key to mapper is ImmutableBytesWritable object which has rowkey of HBase table.
  • Input value is Result object (org.apache.hadoop.hbase.client.Result) which contains the requested column families (define the required columns/column families in Scan) from HBase table.

2. Reducer Class

  • Reducer class should extend TableReducer.
  • Output key is NULL.
  • Output value is Put (org.apache.hadoop.hbase.client.Put) object.

3. MapReduce Driver

  • Configure a Scan (org.apache.hadoop.hbase.client.Scan) object. For this scan object we can define many parameters like:
    • Start row.
    • Stop row.
    • Row filter.
    • Column Familiy(s) to retrieve.
    • Column(s) to retrieve.
  • Define input table using TableMapReduceUtil.initTableMapperJob. In this method we can define input table, Mapper, MapOutputKey, MapOutputValue, etc.
  • Define output table using TableMapReduceUtil.initTableReducerJob. In this method we can define output table, Reducer and Partitioner.
In my next post I shall give an example MapReduce program using HBase tables as input and output.

Monday, 14 October 2013

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can over come this issue by implementing partitions in Hive. Hive makes it very easy to implement partition by using automatic partition scheme when the table is created. In Hive’s implementation of partitioning, data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query. Today we are going to see how we can load a csv file to a partitioned table. For this we are going to use Airline OnTime dataset. Loading csv data to a partitioned table involves below mentioned two steps:
  1. Load csv file to a non-partitioned table.
  2. Load non-partitioned table data to partitioned table.
We shall partition Airline OnTime data based on two columns - year and month. 1. Load csv file to a non-partitioned table. We shall create a staging table to hold data from csv file. The hive commands to create schema and table are given below:
create schema stg_airline;
use stg_airline;

create table stg_airline.onTimePerf
(Year INT ,
Month INT ,
DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After creating the table load the csv data (note - delete header from csv) to table using below hive command:
LOAD DATA LOCAL INPATH "1987.csv" OVERWRITE INTO TABLE stg_airline.onTimePerf;
2. Load non-partitioned table data to partitioned table. We shall now create a table partitioned by year and month columns, the commands for this are given below:
create schema airline;
use airline;

create table airline.onTimePerf
(DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
PARTITIONED BY (Year INT, Month INT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
To load partitioned table we use below command:
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE airline.onTimePerf PARTITION(Year, Month) SELECT DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay, Year, Month FROM stg_airline.onTimePerf;
While writing insert statement for a partitioned table make sure that you specify the partition columns at the last in select clause. The 2 SET commands instruct hive to change our query to dynamically load partitions. If you don't execute the above 2 SET commands you will get below error:
FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
In my next blog I shall describe about using HCatalog in MapReduce program.

Sunday, 13 October 2013

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Connecting Hive to MongoDB Using MongoStorageHandler

Mongo-hadoop connector supports the creation of MongoDB-based Hive tables and BSON-based Hive tables. Both MongoDB-based Hive tables and BSON-based Hive tables can be:
  • Queried just like HDFS-based Hive tables.
  • Combined with HDFS-based Hive tables in joins and sub-queries.
Note - The current mongo-hadoop jars present in mongo-hadoop github doesn't contain MongoStorageHandler class so you need to build the jars yourself. In this post we shall use MongoStorageHandler to create a MongoDB-based hive table. Lets create a test.users collection in MongoDB with following data:
use test
db.users.insert({ "_id": 1, "name": "Tom", "age": 28 })
db.users.insert({ "_id": 2, "name": "Alice", "age": 18 })
db.users.insert({ "_id": 3, "name": "Bob", "age": 29 })
Now we shall create test.user table in Hive which is just a metadata created in Hive and the actual data will be stored in MongoDB. The commands for this are given below:
create schema test;
use test;

CREATE TABLE users (id int, name string, age int)
STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id","name":"name","age":"age"}')
TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");
Note - Specifying BSONSerDe Mappings is optional. Without it, all the field names in the Hive table are expected to be the same as in the underlying MongoDB collection. If the exact field names in Hive cannot be found in a MongoDB document, their values in the Hive table will be null. Now we can query the MongoDB collection in Hive also using HQL. We shall now try INSERT INTO TABLE command in HQL to see if we can load MongoDB collection from Hive. Lets create a temporary table in Hive, say users2 with same schema as users table and insert some data to it using a flat file.
cat users.txt
101,Scott,,10
104,Jesse,52
110,Mike,32

CREATE TABLE users2 (id int , name string , age int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH "users.txt" OVERWRITE INTO TABLE test.users2;
Now we shall use INSERT INTO TABLE command to insert data from Hive table users2 to users table.
INSERT INTO TABLE test.mongo_users SELECT id,name,age from test.users2;
There are some limitations of using MongoStorageHandler which are given below:
  • Using a StorageHandler will mark the table as non-native. Data cannot be loaded into the table directory from a "LOAD DATA" command. Instead, the data can only come from the result of a query.
  • Files are only stored as a single output file, not one per reduce call. This will cause much lock leasing problems.
  • INSERT INTO a StorageHandler table behaves as INSERT OVERWRITE.
In some other post I shall write about using BSON files as storage format for Hive tables.

Saturday, 12 October 2013

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

In my last post Introduction To Hive's Partitioning I described how we can load csv data to a partitioned hive table. Today we shall see how we can use HCatalog to run MapReduce on Hive table and store the output in another Hive table.

HCatalog makes Hive metadata available to users of other Hadoop tools like Pig, MapReduce and Hive. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse.
HCatalog’s table abstraction presents users with a relational view of data in the (HDFS) and ensures that users need not worry about where or in what format their data is stored, so users don't need to know if data is stored in RCFile format, text files, or sequence files.
It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog provides HCatInputFormat/HCatOutputFormat to enable MapReduce users to read/write data in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

We shall see how we can use HCatalog to do a count of records for each year-month combination.
In mapper class we get table schema and use this schema information to get the required columns and their values.
Note - In context.write I am using a custom writable IntPair which I will describe in a separate post.

public class onTimeMapper extends Mapper {
    @Override
    protected void map(WritableComparable key, HCatRecord value,
     org.apache.hadoop.mapreduce.Mapper.Context context)
     throws IOException, InterruptedException {

     // Get table schema
     HCatSchema schema = HCatBaseInputFormat.getTableSchema(context);

     Integer year = new Integer(value.getString("year", schema));
     Integer month = new Integer(value.getString("month", schema));
     Integer DayofMonth = value.getInteger("dayofmonth", schema);

     context.write(new IntPair(year, month), new IntWritable(DayofMonth));
    }
}
In reducer class we create a record schema for holding the columns and their values, which is written to target Hive table.

public class onTimeReducer extends Reducer {
 public void reduce (IntPair key, Iterable value, Context context) 
  throws IOException, InterruptedException{
  
  int count = 0; // records counter for particular year-month
  for (IntWritable s:value) {
   count++;
  }
  
  // define output record schema
  List columns = new ArrayList(3);
  columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,""));
  HCatSchema schema = new HCatSchema(columns);
  HCatRecord record = new DefaultHCatRecord(3);
  
  record.setInteger("year", schema, key.getFirstInt()); 
  record.set("month", schema, key.getSecondInt());
  record.set("flightCount", schema, count);
  context.write(null, record);
 }
}
Finally we write driver class with input/output schema and table details:

public class onTimeDriver extends Configured implements Tool{
    private static final Log log = LogFactory.getLog( onTimeDriver.class );

    public int run( String[] args ) throws Exception{
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");
     job.setJarByClass(onTimeDriver.class);
     job.setMapperClass(onTimeMapper.class);
     job.setReducerClass(onTimeReducer.class);

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     job.setInputFormatClass(HCatInputFormat.class);
     job.setMapOutputKeyClass(IntPair.class);
     job.setMapOutputValueClass(IntWritable.class);
     
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(DefaultHCatRecord.class);
     job.setOutputFormatClass(HCatOutputFormat.class);
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job);
     HCatOutputFormat.setSchema(job, s);
     
     return (job.waitForCompletion(true)? 0:1);
    }
    
    public static void main(String[] args) throws Exception{
  int exitCode = ToolRunner.run(new onTimeDriver(), args);
  System.exit(exitCode);
 }
}
Before running this code we need to create the output table in Hive using below commands:
create table airline.flight_count
(Year INT ,
Month INT ,
flightCount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After compiling and making jar file we can run this code using hadoop jar command without any argument. You can check output using HQL by querying airline.flight_count table.

Note - I faced some hive errors while running the jar from any folder except $HIVE_HOME with apache setup, so if you face any issue it better to give a try by placing and running the jar from $HIVE_HOME folder.
Just for verifying that our MapReduce program is independent of output table schema we shall alter airline.flight_count to have a dummy column:
ALTER TABLE airline.flight_count1 ADD COLUMNS (dummy STRING);
After truncating this table we will start up our MapReduce program again and we can see that our program runs in same manner.

Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:

package com.rishav.es;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  scan1.setCaching(500);
  ResultScanner scanner1 = htable.getScanner(scan1); 
  
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
          logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          logger.info("Executed bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
      }
      }).setBulkActions(1000).setConcurrentRequests(1).build();
  
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
    col++;
   }
   
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
   iRequest.source(jsonBuilder().startObject()
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
                      .endObject());
   bulkProcessor.add(iRequest);
  }
  scanner1.close();
  htable.close();
  
  // shutdown Elasticsearch node
  bulkProcessor.close();
 }
}

Wednesday, 3 July 2013

Starting with ElasticSearch

Starting with ElasticSearch

Starting with ElasticSearch

ElasticSearch is a distributed, RESTful search server based on Apache Lucene. The important features of it are listed below:
  • Real time data
  • RESTful API
  • Distributed
  • Multi-tenancy
  • Document oriented
  • Schema free
  • Per-operation persistence
  • Built on top of Apache Lucene
  • High availability
  • Full text search
  • Conflict management
More detailed information about ElasticSearch can be found at ElasticSearch.org A small example program to add, retrieve and search for documents in ES is given below:

package com.rishav.es;

import static org.elasticsearch.node.NodeBuilder.*;

import java.io.IOException;

import org.apache.lucene.search.TermQuery;
import org.elasticsearch.node.Node;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.get.GetResponse;
import  org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

public class esConnect {

 // on startup

 public static void main (String args[]) throws IOException{

  // create a client for ElasticSearch running on localhost listening to port 9300
  Node node = nodeBuilder().node();
  Client client = new TransportClient().addTransportAddress(
      new InetSocketTransportAddress("localhost",9300));
  
  // add some record
  IndexResponse iresponse = client.prepareIndex("testindex", "tweets", "1")
     .setSource(jsonBuilder()
                 .startObject()
                     .field("user", "rishav")
                     .field("postDate", "2013-01-30")
                     .field("message", "trying out Elastic Search")
                 .endObject()
               )
     .execute()
     .actionGet();
  
  
  // retrieve record based on id
  GetResponse gresponse = client.prepareGet("testindex", "tweets", "1")
        .execute().actionGet();
  System.out.println(gresponse.getSourceAsString());
  
  // search for a record based on below filters
  SearchResponse sresponse = client.prepareSearch("testindex").
         setSearchType(SearchType.DFS_QUERY_THEN_FETCH).
         setQuery(QueryBuilders.termQuery("user", "rishav")).  // query
         setFilter(FilterBuilders.termFilter("message", "out")).
         execute().
         actionGet();
  System.out.println(sresponse.getHits());
  
  // shutdown client
  client.close();
 }
}

Monday, 1 July 2013

XML parsing with Mapreduce

XML parsing with Mapreduce

XML parsing with Mapreduce


Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:

 <Response>
  <RowID>....</RowID>
  <ResponseID>....</ResponseID>
  <IPAddress>....</IPAddress>
  <Status>....</Status>
  <StartDate>....</StartDate>
  <EndDate>....</EndDate>
 </Response>

To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value
      };
}

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
  
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }
   
 @Override
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
    .createXMLStreamReader(
      new ByteArrayInputStream(currLine.getBytes()));
 
    String currentElement = "";
    int col = 0;
    
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
     col++;
    }
    
    
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code = reader.next();
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
      break;
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
       }
       col++;
      }
     }
    }
    
    
    // writing values to mapper output file
    // can remove this context.write
    context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
    
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
     col++;
    }
    htable.put(insHBase);
  
  } catch (Exception e) {
    e.printStackTrace();
  }
 }
  @Override
  public void cleanup(Context context) 
   throws IOException, InterruptedException {
   htable.flushCommits();
   htable.close();
   
 }
}

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);
 }

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  
  Job job = new Job(conf);
  job.setJarByClass(commonTableMapper.class);
  
  job.setInputFormatClass(XmlInputFormat.class);
  
  job.setMapperClass(commonTableMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setNumReduceTasks(0);
  
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  
  outPath.getFileSystem(conf).delete(outPath, true);
  job.waitForCompletion(true);
  
 }
}

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.