Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:

package com.rishav.es;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  scan1.setCaching(500);
  ResultScanner scanner1 = htable.getScanner(scan1); 
  
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
          logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          logger.info("Executed bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
      }
      }).setBulkActions(1000).setConcurrentRequests(1).build();
  
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
    col++;
   }
   
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
   iRequest.source(jsonBuilder().startObject()
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
                      .endObject());
   bulkProcessor.add(iRequest);
  }
  scanner1.close();
  htable.close();
  
  // shutdown Elasticsearch node
  bulkProcessor.close();
 }
}

Wednesday, 3 July 2013

Starting with ElasticSearch

Starting with ElasticSearch

Starting with ElasticSearch

ElasticSearch is a distributed, RESTful search server based on Apache Lucene. The important features of it are listed below:
  • Real time data
  • RESTful API
  • Distributed
  • Multi-tenancy
  • Document oriented
  • Schema free
  • Per-operation persistence
  • Built on top of Apache Lucene
  • High availability
  • Full text search
  • Conflict management
More detailed information about ElasticSearch can be found at ElasticSearch.org A small example program to add, retrieve and search for documents in ES is given below:

package com.rishav.es;

import static org.elasticsearch.node.NodeBuilder.*;

import java.io.IOException;

import org.apache.lucene.search.TermQuery;
import org.elasticsearch.node.Node;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.get.GetResponse;
import  org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

public class esConnect {

 // on startup

 public static void main (String args[]) throws IOException{

  // create a client for ElasticSearch running on localhost listening to port 9300
  Node node = nodeBuilder().node();
  Client client = new TransportClient().addTransportAddress(
      new InetSocketTransportAddress("localhost",9300));
  
  // add some record
  IndexResponse iresponse = client.prepareIndex("testindex", "tweets", "1")
     .setSource(jsonBuilder()
                 .startObject()
                     .field("user", "rishav")
                     .field("postDate", "2013-01-30")
                     .field("message", "trying out Elastic Search")
                 .endObject()
               )
     .execute()
     .actionGet();
  
  
  // retrieve record based on id
  GetResponse gresponse = client.prepareGet("testindex", "tweets", "1")
        .execute().actionGet();
  System.out.println(gresponse.getSourceAsString());
  
  // search for a record based on below filters
  SearchResponse sresponse = client.prepareSearch("testindex").
         setSearchType(SearchType.DFS_QUERY_THEN_FETCH).
         setQuery(QueryBuilders.termQuery("user", "rishav")).  // query
         setFilter(FilterBuilders.termFilter("message", "out")).
         execute().
         actionGet();
  System.out.println(sresponse.getHits());
  
  // shutdown client
  client.close();
 }
}

Monday, 1 July 2013

XML parsing with Mapreduce

XML parsing with Mapreduce

XML parsing with Mapreduce


Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:

 <Response>
  <RowID>....</RowID>
  <ResponseID>....</ResponseID>
  <IPAddress>....</IPAddress>
  <Status>....</Status>
  <StartDate>....</StartDate>
  <EndDate>....</EndDate>
 </Response>

To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value
      };
}

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
  
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }
   
 @Override
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
    .createXMLStreamReader(
      new ByteArrayInputStream(currLine.getBytes()));
 
    String currentElement = "";
    int col = 0;
    
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
     col++;
    }
    
    
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code = reader.next();
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
      break;
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
       }
       col++;
      }
     }
    }
    
    
    // writing values to mapper output file
    // can remove this context.write
    context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
    
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
     col++;
    }
    htable.put(insHBase);
  
  } catch (Exception e) {
    e.printStackTrace();
  }
 }
  @Override
  public void cleanup(Context context) 
   throws IOException, InterruptedException {
   htable.flushCommits();
   htable.close();
   
 }
}

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);
 }

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  
  Job job = new Job(conf);
  job.setJarByClass(commonTableMapper.class);
  
  job.setInputFormatClass(XmlInputFormat.class);
  
  job.setMapperClass(commonTableMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setNumReduceTasks(0);
  
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  
  outPath.getFileSystem(conf).delete(outPath, true);
  job.waitForCompletion(true);
  
 }
}

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.