Saturday, 12 October 2013

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

In my last post Introduction To Hive's Partitioning I described how we can load csv data to a partitioned hive table. Today we shall see how we can use HCatalog to run MapReduce on Hive table and store the output in another Hive table.

HCatalog makes Hive metadata available to users of other Hadoop tools like Pig, MapReduce and Hive. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse.
HCatalog’s table abstraction presents users with a relational view of data in the (HDFS) and ensures that users need not worry about where or in what format their data is stored, so users don't need to know if data is stored in RCFile format, text files, or sequence files.
It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog provides HCatInputFormat/HCatOutputFormat to enable MapReduce users to read/write data in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

We shall see how we can use HCatalog to do a count of records for each year-month combination.
In mapper class we get table schema and use this schema information to get the required columns and their values.
Note - In context.write I am using a custom writable IntPair which I will describe in a separate post.

public class onTimeMapper extends Mapper {
    @Override
    protected void map(WritableComparable key, HCatRecord value,
     org.apache.hadoop.mapreduce.Mapper.Context context)
     throws IOException, InterruptedException {

     // Get table schema
     HCatSchema schema = HCatBaseInputFormat.getTableSchema(context);

     Integer year = new Integer(value.getString("year", schema));
     Integer month = new Integer(value.getString("month", schema));
     Integer DayofMonth = value.getInteger("dayofmonth", schema);

     context.write(new IntPair(year, month), new IntWritable(DayofMonth));
    }
}
In reducer class we create a record schema for holding the columns and their values, which is written to target Hive table.

public class onTimeReducer extends Reducer {
 public void reduce (IntPair key, Iterable value, Context context) 
  throws IOException, InterruptedException{
  
  int count = 0; // records counter for particular year-month
  for (IntWritable s:value) {
   count++;
  }
  
  // define output record schema
  List columns = new ArrayList(3);
  columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,""));
  HCatSchema schema = new HCatSchema(columns);
  HCatRecord record = new DefaultHCatRecord(3);
  
  record.setInteger("year", schema, key.getFirstInt()); 
  record.set("month", schema, key.getSecondInt());
  record.set("flightCount", schema, count);
  context.write(null, record);
 }
}
Finally we write driver class with input/output schema and table details:

public class onTimeDriver extends Configured implements Tool{
    private static final Log log = LogFactory.getLog( onTimeDriver.class );

    public int run( String[] args ) throws Exception{
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");
     job.setJarByClass(onTimeDriver.class);
     job.setMapperClass(onTimeMapper.class);
     job.setReducerClass(onTimeReducer.class);

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     job.setInputFormatClass(HCatInputFormat.class);
     job.setMapOutputKeyClass(IntPair.class);
     job.setMapOutputValueClass(IntWritable.class);
     
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(DefaultHCatRecord.class);
     job.setOutputFormatClass(HCatOutputFormat.class);
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job);
     HCatOutputFormat.setSchema(job, s);
     
     return (job.waitForCompletion(true)? 0:1);
    }
    
    public static void main(String[] args) throws Exception{
  int exitCode = ToolRunner.run(new onTimeDriver(), args);
  System.exit(exitCode);
 }
}
Before running this code we need to create the output table in Hive using below commands:
create table airline.flight_count
(Year INT ,
Month INT ,
flightCount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After compiling and making jar file we can run this code using hadoop jar command without any argument. You can check output using HQL by querying airline.flight_count table.

Note - I faced some hive errors while running the jar from any folder except $HIVE_HOME with apache setup, so if you face any issue it better to give a try by placing and running the jar from $HIVE_HOME folder.
Just for verifying that our MapReduce program is independent of output table schema we shall alter airline.flight_count to have a dummy column:
ALTER TABLE airline.flight_count1 ADD COLUMNS (dummy STRING);
After truncating this table we will start up our MapReduce program again and we can see that our program runs in same manner.

Monday, 8 July 2013

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

Bulk Loading ElasticSearch from HBase

In my previous post I wrote about indexing, retrieving and searching for documents in ElasticSearch, Starting with ElasticSearch. For indexing very large number of documents to ES we can use BulkProcessor API from ES, which is very fast and efficient for handling very large number of documents. The java program writen by me for reading data from Hbase and indexing it to ES is given below:

package com.rishav.es;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.index.IndexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rishav.xml.xmlDef;

public class esBulkLoad {
 public static void main(String[] args) throws IOException {

  // HBase configuration
  Configuration config = HBaseConfiguration.create();
  HTable htable = new HTable(config, "xmlTest");
  Scan scan1 = new Scan();
  scan1.setCaching(500);
  ResultScanner scanner1 = htable.getScanner(scan1); 
  
  //ElasticSearch configuration
  Client client = new TransportClient().addTransportAddress(
    new InetSocketTransportAddress("localhost",9300));
  
  final Logger logger = LoggerFactory.getLogger(esBulkLoad.class);
  
  // define ES bulkprocessor
  BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
      @Override
      public void beforeBulk(long executionId, BulkRequest request) {
          logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          logger.info("Executed bulk composed of {} actions", request.numberOfActions());
      }

      @Override
      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          logger.warn("Error executing bulk", failure);
      }
      }).setBulkActions(1000).setConcurrentRequests(1).build();
  
  // read Hbase records
  for (Result res : scanner1) {
   byte[] b;
   
   int col = 0;
   for (String xmlTag : xmlDef.xmlDef[3]) {
    b = res.getValue(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]));
    xmlDef.xmlDef[5][col] = Bytes.toString(b);
    col++;
   }
   
   // build ES IndexRequest and add it to bulkProcessor
   IndexRequest iRequest = new IndexRequest("xml", "response", xmlDef.xmlDef[5][0]);
   iRequest.source(jsonBuilder().startObject()
        .field(xmlDef.xmlDef[3][0], xmlDef.xmlDef[5][0])
                          .field(xmlDef.xmlDef[3][1], xmlDef.xmlDef[5][1])
                          .field(xmlDef.xmlDef[3][2], xmlDef.xmlDef[5][2])
                          .field(xmlDef.xmlDef[3][3], xmlDef.xmlDef[5][3])
                          .field(xmlDef.xmlDef[3][4], xmlDef.xmlDef[5][4])
                      .endObject());
   bulkProcessor.add(iRequest);
  }
  scanner1.close();
  htable.close();
  
  // shutdown Elasticsearch node
  bulkProcessor.close();
 }
}

Wednesday, 3 July 2013

Starting with ElasticSearch

Starting with ElasticSearch

Starting with ElasticSearch

ElasticSearch is a distributed, RESTful search server based on Apache Lucene. The important features of it are listed below:
  • Real time data
  • RESTful API
  • Distributed
  • Multi-tenancy
  • Document oriented
  • Schema free
  • Per-operation persistence
  • Built on top of Apache Lucene
  • High availability
  • Full text search
  • Conflict management
More detailed information about ElasticSearch can be found at ElasticSearch.org A small example program to add, retrieve and search for documents in ES is given below:

package com.rishav.es;

import static org.elasticsearch.node.NodeBuilder.*;

import java.io.IOException;

import org.apache.lucene.search.TermQuery;
import org.elasticsearch.node.Node;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.get.GetResponse;
import  org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.FilterBuilders.*;
import org.elasticsearch.index.query.QueryBuilders.*;

public class esConnect {

 // on startup

 public static void main (String args[]) throws IOException{

  // create a client for ElasticSearch running on localhost listening to port 9300
  Node node = nodeBuilder().node();
  Client client = new TransportClient().addTransportAddress(
      new InetSocketTransportAddress("localhost",9300));
  
  // add some record
  IndexResponse iresponse = client.prepareIndex("testindex", "tweets", "1")
     .setSource(jsonBuilder()
                 .startObject()
                     .field("user", "rishav")
                     .field("postDate", "2013-01-30")
                     .field("message", "trying out Elastic Search")
                 .endObject()
               )
     .execute()
     .actionGet();
  
  
  // retrieve record based on id
  GetResponse gresponse = client.prepareGet("testindex", "tweets", "1")
        .execute().actionGet();
  System.out.println(gresponse.getSourceAsString());
  
  // search for a record based on below filters
  SearchResponse sresponse = client.prepareSearch("testindex").
         setSearchType(SearchType.DFS_QUERY_THEN_FETCH).
         setQuery(QueryBuilders.termQuery("user", "rishav")).  // query
         setFilter(FilterBuilders.termFilter("message", "out")).
         execute().
         actionGet();
  System.out.println(sresponse.getHits());
  
  // shutdown client
  client.close();
 }
}

Monday, 1 July 2013

XML parsing with Mapreduce

XML parsing with Mapreduce

XML parsing with Mapreduce


Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:

 <Response>
  <RowID>....</RowID>
  <ResponseID>....</ResponseID>
  <IPAddress>....</IPAddress>
  <Status>....</Status>
  <StartDate>....</StartDate>
  <EndDate>....</EndDate>
 </Response>

To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value
      };
}

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
  
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }
   
 @Override
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
    .createXMLStreamReader(
      new ByteArrayInputStream(currLine.getBytes()));
 
    String currentElement = "";
    int col = 0;
    
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
     col++;
    }
    
    
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code = reader.next();
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
      break;
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
       }
       col++;
      }
     }
    }
    
    
    // writing values to mapper output file
    // can remove this context.write
    context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
    
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
     col++;
    }
    htable.put(insHBase);
  
  } catch (Exception e) {
    e.printStackTrace();
  }
 }
  @Override
  public void cleanup(Context context) 
   throws IOException, InterruptedException {
   htable.flushCommits();
   htable.close();
   
 }
}

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);
 }

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  
  Job job = new Job(conf);
  job.setJarByClass(commonTableMapper.class);
  
  job.setInputFormatClass(XmlInputFormat.class);
  
  job.setMapperClass(commonTableMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setNumReduceTasks(0);
  
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  
  outPath.getFileSystem(conf).delete(outPath, true);
  job.waitForCompletion(true);
  
 }
}

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.

Friday, 15 March 2013

MapReduce Fundamentals

MapReduce Fundamentals

MapReduce Fundamentals

MapReduce is a programming model for efficient distributed computing.
The core concept of MapReduce in Hadoop is that input may be split into logical chunks, and each chunk may be initially processed independently, by a map task. The results of these individual processing chunks can be physically partitioned into distinct sets, which are then sorted. Each sorted chunk is passed to a reduce task.
Every MapReduce program must specify a Mapper and typically a Reducer.


Input and Output Formats

A MapReduce may specify how it’s input is to be read by specifying an InputFormat to be used
– InputSplit
– RecordReader
A MapReduce may specify how it’s output is to be written by specifying an OutputFormat to be used.
These default to TextInputFormat and TextOutputFormat, which process line-based text data.
SequenceFile: SequenceFileInputFormat and SequenceFileOutputFormat.
These are file-based, but they are not required to be.

How many Maps and Reduces

Maps
– Usually as many as the number of HDFS blocks being processed, this is the default.
– Else the number of maps can be specified as a hint.
– The number of maps can also be controlled by specifying the minimum split size.
– The actual sizes of the map inputs are computed by:
• max(min(block_size, data/#maps), min_split_size)
Reduces
– Unless the amount of data being processed is small
• 0.95*num_nodes*mapred.tasktracker.reduce.tasks.maximum

vi/vim Cheatsheet

There are many ways to edit files in Unix and for me one of the best ways is using screen-oriented text editor vi. There is an improved version of vi also called vim (vi improved).

Starting the vi editor:

CommandDescription
vi filenameCreates a new file if it already does not exist, otherwise opens existing file.
vi -R filenameOpens an existing file in read only mode.

Operation Modes:
vi has two modes of operation
  • Command mode: This mode enables you to perform administrative tasks such as saving files, executing commands, moving the cursor, cutting (yanking) and pasting lines or words, and finding and replacing. In this mode, whatever you type is interpreted as a command.
  • Insert mode: This mode enables you to insert text into the file. Everything that's typed in this mode is interpreted as input and finally it is put in the file .
The vi always starts in command mode. To enter text, you must be in insert mode. To come in insert mode you simply type i. To get out of insert mode, press the Esc key, which will put you back into command mode.
Hint: If you are not sure which mode you are in, press the Esc key twice, and then you'll be in command mode. You open a file using vi editor and start type some characters and then come in command mode to understand the difference.


vi cheatsheet:


Wednesday, 13 March 2013

MapReduce with AWK

MapReduce in AWK

MapReduce with AWK


In my last post Lets MapReduce with Pentaho I had written a MapReduce program in Pentaho Data Integration. Now I am writing some MapReduce code in AWK to:
  • Calculate number/percentage of flights delayed by over 15 minutes aggregated at day level (on airline dataset).
I will be using hadoop streaming package to run my AWK MapReduce task.

Any MapReduce code should follow the common basics listed below:
  • The Mapper has a map method that transforms input (key, value) pairs into any number of intermediate (key’, value’) pairs.
  • The Reducer has a reduce method that transforms intermediate (key’, value’*) aggregates into any number of output (key’’, value’’) pairs.


Number/percentage of flights delayed by over 15 minutes aggregated at day level

I write a Map code to emit date in yyyyMMdd format, constant value 1 for each flight and a boolean value for flights with DepDelay (column#16) > 15.
#!/usr/bin/awk -f
BEGIN {
        FS=",";
}
{
        if ($16 > 15)
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,1);}
        else
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,0);}
}
   
The intermediate output of Map code will look like this:
......
19871001,1,1
19871001,1,1
19871002,1,0
19871002,1,0
19871002,1,0
..... 
  
Note that the output of mapper is sorted.
The reduce code will keep a count of flights per day and delayed flights.
To aggregate data (sum) by day level I will use array in awk.
#!/bin/awk -f

BEGIN {FS=",";
}
{       A[$1]+=$2;      # Add $2 to the array A having index of $1
        B[$1]+=$3;      # Add $3 to the array B having index of $1
}
END {
        for(i in A)    # Get all the indexes of array A in i
        {printf "%s,%d,%d,%5.2f\n", i,A[i],B[i],B[i]*100/A[i]}
}
 
Run this MapReduce code with hadoop streaming:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.0.jar \
  -input /airline/nh_1987.csv \
  -output /airline/output \
  -mapper path_to/airline2_m.awk \
  -reducer path_to/airline2_r.awk \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -outputformat org.apache.hadoop.mapred.TextOutputFormat \
  -file path_to/airline2_m.awk \
  -file path_to/airline2_r.awk
  
We can check the output of above code by using cat command or getting the file from hdfs to local filesystem:
hadoop fs -cat /airline/output/part-00000
hadoop fs -get /airline/output/part-00000