Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:

package com.rishav.es;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  scan1.setCaching(500);
  ResultScanner scanner1 = htable.getScanner(scan1); 
  
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
          logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          logger.info("Executed bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
      }
      }).setBulkActions(1000).setConcurrentRequests(1).build();
  
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
    col++;
   }
   
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
   iRequest.source(jsonBuilder().startObject()
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
                      .endObject());
   bulkProcessor.add(iRequest);
  }
  scanner1.close();
  htable.close();
  
  // shutdown Elasticsearch node
  bulkProcessor.close();
 }
}

1 comment:

  1. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hcatalog/mapreduce/InputJobInfo

    ReplyDelete