Bulk Loading ElasticSearch from HBase
Bulk Loading ElasticSearch from HBase
In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch,
Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents.
The java program writen by me for reading data from Hbase and indexing it to ES is given below:
package com.rishav.es;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rishav.xml.xmlDef;
public class esBulkLoad {
public static void main(String[] args) throws IOException {
// HBase configuration
Configuration config = HBaseConfiguration.create();
HTable htable = new HTable(config, "xmlTest");
Scan scan1 = new Scan();
scan1.setCaching(500);
ResultScanner scanner1 = htable.getScanner(scan1);
//ElasticSearch configuration
Client client = new TransportClient().addTransportAddress(
new InetSocketTransportAddress("localhost",9300));
final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
// define ES bulkprocessor
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(1000).setConcurrentRequests(1).build();
// read Hbase records
for (Result res : scanner1) {
byte[] b;
int col = 0;
for (String xmlTag : xmlDef.xmlDef[3]) {
b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
xmlDef.xmlDef[5][col] = Bytes.toString(b);
col++;
}
// build ES IndexRequest and add it to bulkProcessor
IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
iRequest.source(jsonBuilder().startObject()
.field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
.field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
.field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
.field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
.field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
.endObject());
bulkProcessor.add(iRequest);
}
scanner1.close();
htable.close();
// shutdown Elasticsearch node
bulkProcessor.close();
}
}
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hcatalog/mapreduce/InputJobInfo
ReplyDelete