Showing posts with label ElasticSearch. Show all posts
Showing posts with label ElasticSearch. Show all posts

Wednesday, 18 December 2013

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

Logstash, ElasticSearch and Kibana Integration for Clickstream Weblog Ingestion

In this blog I am going to show case how we can develop a quick and easy demo application for clickstream weblog ingestion, search and visualization. We will achieve this using Logstash for log ingestion, store it in ElasticSearch and make a pretty dashboard using Kibana. For clickstream weblog I am using logs data from ECML/PKDD 2005 Discovery Challenge . You can download complete weblogs after registering there. These weblog are delimited by semi-colon (;) and have below mentioned fields in order:
  • shop_id
  • unixtime
  • client ip
  • session
  • visted page
  • referrer
Here are some sample log lines:
15;1075658406;212.96.166.162;052ecba084545d8348806f087b6e09bb;/ls/?&id=77&view=2,6,31&pozice=20;http://www.shop5.cz/ls/?id=77
12;1075658406;195.146.109.248;05aa4f4db0162e5723331042eb9ce8a7;/ct/?c=153;http://www.shop3.cz/
12;1075658407;212.65.194.144;86140090a2e102f1644f29e5ddadad9b;/ls/?id=34;http://www.shop3.cz/ct/?c=155
14;1075658407;80.188.85.210;f07f39ec63abf67f965684f3fa5729c4;/findp/?&id=63&view=1,2,3,14,20,15&p_14=nerez;http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0
17;1075658408;194.108.232.234;be0970125c4eb3ee4fc380be05b3c58f;/ls/?id=155&sort=45;http://www.shop7.cz/ls/?id=155&sort=45
12;1075658409;62.24.70.41;851f20e644eb8bf82bfdbe4379050e2e;/txt/?c=734;http://www.shop3.cz/onakupu/
For creating this demo we need to create a logstash configuration file (lets name this file clickstream.conf) which consists of specifying inputs, filters and outputs. The clickstream.conf file looks like:
input { 
  file {
# path for clickstream log
    path => "/home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log"
# define a type for all events handeled by this input
    type => "weblog"
    start_position => "beginning"
# the clickstream log is in character set ISO-8859-1
    codec => plain {charset => "ISO-8859-1"}
   }
}

filter {
  csv {
# define columns present in weblog
    columns => [shop_id, unixtime, client_ip, session, page, referrer]
    separator => ";"
  }
  grok {
# get visited page and page parameters
    match => ["page", "%{URIPATH:page_visited}(?:%{URIPARAM:page_params})?"]
     remove_field => ["page"]
  }
  date {
# as we are getting unixtime field in epoch seconds we will convert it to normal timestamp
    match => [ "unixtime", "UNIX" ]
  }
  geoip {
# this will convert ip to longitude-latitude using GeoLiteCity database from Maxmind
    source => "client_ip"
    fields => ["latitude","longitude"]
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  mutate {
# this will convert geoip.coordinates to float values
    convert => [ "[geoip][coordinates]", "float" ]
  }
}

output { 
# store output in local elasticsearch cluster
  elasticsearch {
    host => "127.0.0.1"
  }
}
To start logstash agent we run below command:
java -jar logstash-1.2.2-flatjar.jar agent -f clickstream.conf
A sample record in ElasticSearch looks like this:
{

    _index: logstash-2004.02.01
    _type: logs
    _id: I1N0MboUR0O1O3RZ-qXqnw
    _version: 1
    _score: 1
    _source: {
        message: [
            14;1075658407;80.188.85.210;f07f39ec63abf67f965684f3fa5729c4;/findp/?&id=63&view=1,2,3,14,20,15&p_14=nerez;http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0 
        ]
        @timestamp: 2004-02-01T18:00:07.000Z
        @version: 1
        type: weblog
        host: HMECL000315.happiestminds.com
        path: /home/rishav.rohit/Desktop/clickstream/_2004_02_01_19_click_stream.log
        shop_id: 14
        unixtime: 1075658407
        client_ip: 80.188.85.210
        session: f07f39ec63abf67f965684f3fa5729c4
        referrer: http://www.shop4.cz/ls/?&p_14=nerez&id=63&view=1%2C2%2C3%2C14%2C20%2C15&&aktul=0
        page_visited: /findp/
        page_params: ?&id=63&view=1,2,3,14,20,15&p_14=nerez
        geoip: {
            latitude: 50.08330000000001
            longitude: 14.466700000000003
            coordinates: [
                14.466700000000003
                50.08330000000001
            ]
        }
    }

}
So we have parsed complex log message into simpler components and converted fields like unixtime to datetime, IP to latitude-longitude and got page visited by the client. Now using Kibana we can quickly make dashboard with these panels

Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:

package com.rishav.es;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  scan1.setCaching(500);
  ResultScanner scanner1 = htable.getScanner(scan1); 
  
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
          logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          logger.info("Executed bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
      }
      }).setBulkActions(1000).setConcurrentRequests(1).build();
  
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
    col++;
   }
   
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
   iRequest.source(jsonBuilder().startObject()
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
                      .endObject());
   bulkProcessor.add(iRequest);
  }
  scanner1.close();
  htable.close();
  
  // shutdown Elasticsearch node
  bulkProcessor.close();
 }
}

Wednesday, 3 July 2013

Starting with ElasticSearch

Starting with ElasticSearch

Starting with ElasticSearch

ElasticSearch is a distributed, RESTful search server based on Apache Lucene. The important features of it are listed below:
  • Real time data
  • RESTful API
  • Distributed
  • Multi-tenancy
  • Document oriented
  • Schema free
  • Per-operation persistence
  • Built on top of Apache Lucene
  • High availability
  • Full text search
  • Conflict management
More detailed information about ElasticSearch can be found at ElasticSearch.org A small example program to add, retrieve and search for documents in ES is given below:

package com.rishav.es;

import static org.elasticsearch.node.NodeBuilder.*;

import java.io.IOException;

import org.apache.lucene.search.TermQuery;
import org.elasticsearch.node.Node;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.get.GetResponse;
import  org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

public class esConnect {

 // on startup

 public static void main (String args[]) throws IOException{

  // create a client for ElasticSearch running on localhost listening to port 9300
  Node node = nodeBuilder().node();
  Client client = new TransportClient().addTransportAddress(
      new InetSocketTransportAddress("localhost",9300));
  
  // add some record
  IndexResponse iresponse = client.prepareIndex("testindex", "tweets", "1")
     .setSource(jsonBuilder()
                 .startObject()
                     .field("user", "rishav")
                     .field("postDate", "2013-01-30")
                     .field("message", "trying out Elastic Search")
                 .endObject()
               )
     .execute()
     .actionGet();
  
  
  // retrieve record based on id
  GetResponse gresponse = client.prepareGet("testindex", "tweets", "1")
        .execute().actionGet();
  System.out.println(gresponse.getSourceAsString());
  
  // search for a record based on below filters
  SearchResponse sresponse = client.prepareSearch("testindex").
         setSearchType(SearchType.DFS_QUERY_THEN_FETCH).
         setQuery(QueryBuilders.termQuery("user", "rishav")).  // query
         setFilter(FilterBuilders.termFilter("message", "out")).
         execute().
         actionGet();
  System.out.println(sresponse.getHits());
  
  // shutdown client
  client.close();
 }
}