Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

Tuesday, 9 December 2014

Implementing Apriori Algorithm In Hadoop-HBase - Part 1 : Introduction to Apriori Algorithm

Apriori algorithm is a frequent item set mining algorithm used over transactional databases, proposed by Rakesh Agrawal and Ramakrishnan Srikant in 1993. This algorithm proceeds by identifying the frequent individual items in the database and extending them to larger and larger item sets as long as those item sets appear sufficiently often in the database. The frequent item sets determined by Apriori can be used to determine association rules which highlight general trends in the database.

Before we go further and see how this algorithm works it is better to be familiar terminologies used in this algorithm-

Tid  | Items
1     | Bread, Milk
2     | Bread, Diaper, Beer, Milk
3     | Milk, Diaper, Beer, Coke
4     | Bread, Milk, Diaper, Beer
5     | Bread, Milk, Diaper,Coke
    • Itemset    
A collection of one or more items
Example: {Milk, Bread, Diaper}
k-itemset
An itemset that contains k items
  • Support count ()
Frequency of occurrence of an itemset
E.g.   ({Milk, Bread, Diaper}) = 2
  • Support
Fraction of transactions that contain an itemset
E.g.   s( {Milk, Bread, Diaper} ) = 2/5
  • Frequent Itemset
An itemset whose support is greater than or equal to a minsup threshold.

  • Association Rule
An implication expression of the form X  Y, where X and Y are itemsets.
Example: {Milk, Diaper}  {Beer}
  • Rule Evaluation Metrics
Support (s) - Fraction of transactions that contain both X and Y
Confidence (c) - Measures how often items in Y  appear in transactions that
contain X.


In next few post I will describe how to implement this algorithm in HBase and MapReduce.

Tuesday, 25 February 2014

Implementing Custom WritableComparable

In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.

WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
{
    void readFields(DataInput in);
    void write(DataOutput out);
    int compareTo(WritableComparable o)
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.

The code for IntPair class which is used in In-mapper Combiner Program to Calculate Average post is given below:


As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.

Thursday, 26 December 2013

In-mapper Combiner Program to Calculate Average

In-mapper Combiner Program to Calculate Average

In my previous post I described how we can use in-mapper combiner to make our M/R program more efficient. In the post, we also saw both M/R algorithm for average calculation with/without using in-mapper combiner optimization.

In this post I am posting codes for both the algorithm.

The M/R program to calculate average without in-mapper combiner is given below:

The M/R program to calculate average with in-mapper combiner is given below:


The programs took 56sec and 42sec respectively for execution on my laptop for 10million records. So we can see a 33% improvement in time while using in-mapper combiner program.

In-Mapper Combiner

In-Mapper Combiner

Recently I read a book on Map/Reduce algorithms by Lin and Dyer. This books gives a deep insight in designing efficient M/R algoriths. Today in this post I will post about in-mapper combining alogrithm and a sample M/R program using this algorithm.

Advantages of in-mapper combiner over traditional combiner:

When a mapper with a traditional combiner (the mini-reducer) emits the key-value pair, they are collected in the memory buffer and then the combiner aggregates a batch of these key-value pairs before sending them to the reducer. The drawbacks of this approach are:
  1. The execution of combiner is not guaranteed; so MapReduce jobs cannot depend on the combiner execution.
  2. Hadoop may store the key-value pairs in local filesystem, and run the combiner later which will cause expensive disk IO.
  3. A combiner only combines data in the same buffer. Thus, we may still generate a lot of network traffic during the shuffle phase even if most of the keys from a single mapper are the same. To see this, consider the word count example, assuming that buffer size is 3, and we have <key, value> = <Stanford, 3>, <Berkeley, 1>, <Stanford, 7>, <Berkeley, 7>, and <Stanford, 2> emitted from one mapper. The first three items will be in one buffer, and last two will be in the the other buffer; as a result, the combiner will emit <Stanford, 10>, <Berkeley, 1>, <Berkeley, 7>, <Stanford, 2>. If we use in-mapper combiner, we will get <Stanford, 12>, <Berkeley, 8>.



Consider the case of calculating average marks for student. Let us consider we have below dataset

s_id    c_id    marks
8001    101    78
8001    102    88
8002    101    56
8002    102    77

The pseudo code for a basic M/R algorithm which computes average marks is as given:
class Mapper
    method Map(integer s_id, integer m)
        Emit(integer s_id, integer m)

class Reducer
    method Reduce(integer s_id, integer [m1 , m2 , . . .])
        sum ← 0
        cnt ← 0
        for all integer m ∈ integer [m1 , m2 , . . .] do
            sum ← sum + m
            cnt ← cnt + 1
        avg_m ← sum/cnt
        Emit(integer s_id, float avg_m )

If we have a large number of input records then the same number of records emitted from map task will be shuffled and sorted before being passed on to  reducer. This large amount of data transfer could be deterrent in the speed of execution of overall M/R job.

We can make this algorithm faster by decreasing the number of records emitted by the mapper. To achieve this we can use an associative array to store partial sums of marks, and another associative array to store the count of marks and finally emit these values in close method. The pseduo code for in-mapper combiner is shown below:

class Mapper
    method Initialize
        S ← new AssociativeArray
        C ← new AssociativeArray
    method Map(integer s_id, integer m)
        S{s_id} ← S{s_id} + m
        C{s_id} ← C{s_id} + 1
    method Close
        for all integer s_id ∈ S do
            Emit(integer s_id, pair (S{s_id}, C{s_id}))

class Reducer
    method Reduce(integer s_id, pairs [(s1 , c1 ), (s2 , c2 ) . . .])
        sum ← 0
        cnt ← 0
    for all pair (s, c) ∈ pairs [(s1 , c1 ), (s2 , c2 ) . . .] do
            sum ← sum + s
            cnt ← cnt + c
    avg_m ← sum/cnt
    Emit(integer s_id, float avg_m )

Using this algorithm we can improve the performance of M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers.

In my next post I shall post M/R program using in-mapper combiner and also compare the performance of this with M/R program without using any optimization.

Thursday, 31 October 2013

Implementing Custom Writable

Implementing Custom Writable

Implementing Custom Writable

Hadoop MapReduce uses implementations of Writables for interacting with user-provided Mappers and Reducers. Hadoop provides a lot of implementations of Writables which are listed here, but sometimes we need to pass custom objects and these custom objects should implement Hadoop's Writable interface. In this post we are going to describe a custom class IntPair. To implement the Writable interface we require two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}
The code for IntPair is given below:

public class IntPair implements Writable{
 private IntWritable first;
 private IntWritable second;
 
 public IntPair() {
  set(new IntWritable(), new IntWritable());
 }
 
 public IntPair(Integer first, Integer second) {
  set(new IntWritable(first), new IntWritable(second));
 }
 
 public void set(IntWritable first, IntWritable second) {
  this.first = first;
  this.second = second;
 }
 
 public IntWritable getFirst() {
  return first;
 }
 
 public Integer getFirstInt() {
  return new Integer(first.toString());
 }
 
 public Integer getSecondInt() {
  return new Integer(second.toString());
 }
 public IntWritable getSecond() {
  return second;
 }
 
 public void write(DataOutput out) throws IOException {
 first.write(out);
 second.write(out);
 }
 
 public void readFields(DataInput in) throws IOException {
 first.readFields(in);
 second.readFields(in);
 }
}
Now we can use this IntPair class in Hadoop MapReduce as value type. If we want to use IntPair as key in MapReduce then it needs to implement WritableComparable, which we shall cover in a different post.

Monday, 1 July 2013

XML parsing with Mapreduce

XML parsing with Mapreduce

XML parsing with Mapreduce


Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:

 <Response>
  <RowID>....</RowID>
  <ResponseID>....</ResponseID>
  <IPAddress>....</IPAddress>
  <Status>....</Status>
  <StartDate>....</StartDate>
  <EndDate>....</EndDate>
 </Response>

To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value
      };
}

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
  
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }
   
 @Override
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
    .createXMLStreamReader(
      new ByteArrayInputStream(currLine.getBytes()));
 
    String currentElement = "";
    int col = 0;
    
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
     col++;
    }
    
    
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code = reader.next();
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
      break;
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
       }
       col++;
      }
     }
    }
    
    
    // writing values to mapper output file
    // can remove this context.write
    context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
    
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
     col++;
    }
    htable.put(insHBase);
  
  } catch (Exception e) {
    e.printStackTrace();
  }
 }
  @Override
  public void cleanup(Context context) 
   throws IOException, InterruptedException {
   htable.flushCommits();
   htable.close();
   
 }
}

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);
 }

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  
  Job job = new Job(conf);
  job.setJarByClass(commonTableMapper.class);
  
  job.setInputFormatClass(XmlInputFormat.class);
  
  job.setMapperClass(commonTableMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setNumReduceTasks(0);
  
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  
  outPath.getFileSystem(conf).delete(outPath, true);
  job.waitForCompletion(true);
  
 }
}

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.

Wednesday, 13 March 2013

MapReduce with AWK

MapReduce in AWK

MapReduce with AWK


In my last post Lets MapReduce with Pentaho I had written a MapReduce program in Pentaho Data Integration. Now I am writing some MapReduce code in AWK to:
  • Calculate number/percentage of flights delayed by over 15 minutes aggregated at day level (on airline dataset).
I will be using hadoop streaming package to run my AWK MapReduce task.

Any MapReduce code should follow the common basics listed below:
  • The Mapper has a map method that transforms input (key, value) pairs into any number of intermediate (key’, value’) pairs.
  • The Reducer has a reduce method that transforms intermediate (key’, value’*) aggregates into any number of output (key’’, value’’) pairs.


Number/percentage of flights delayed by over 15 minutes aggregated at day level

I write a Map code to emit date in yyyyMMdd format, constant value 1 for each flight and a boolean value for flights with DepDelay (column#16) > 15.
#!/usr/bin/awk -f
BEGIN {
        FS=",";
}
{
        if ($16 > 15)
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,1);}
        else
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,0);}
}
   
The intermediate output of Map code will look like this:
......
19871001,1,1
19871001,1,1
19871002,1,0
19871002,1,0
19871002,1,0
..... 
  
Note that the output of mapper is sorted.
The reduce code will keep a count of flights per day and delayed flights.
To aggregate data (sum) by day level I will use array in awk.
#!/bin/awk -f

BEGIN {FS=",";
}
{       A[$1]+=$2;      # Add $2 to the array A having index of $1
        B[$1]+=$3;      # Add $3 to the array B having index of $1
}
END {
        for(i in A)    # Get all the indexes of array A in i
        {printf "%s,%d,%d,%5.2f\n", i,A[i],B[i],B[i]*100/A[i]}
}
 
Run this MapReduce code with hadoop streaming:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.0.jar \
  -input /airline/nh_1987.csv \
  -output /airline/output \
  -mapper path_to/airline2_m.awk \
  -reducer path_to/airline2_r.awk \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -outputformat org.apache.hadoop.mapred.TextOutputFormat \
  -file path_to/airline2_m.awk \
  -file path_to/airline2_r.awk
  
We can check the output of above code by using cat command or getting the file from hdfs to local filesystem:
hadoop fs -cat /airline/output/part-00000
hadoop fs -get /airline/output/part-00000
  

Tuesday, 26 February 2013

Lets MapReduce with Pentaho

Lets MapReduce with Pentaho Data Integrator

I was exploring Pentaho Data Integrator for quite some time, and always wanted to see how to work with BigData using Pentaho. Today I got a chance to do some simple MapReduce with Pentaho on the Airline dataset which I had dumped into my Cloudera Hadoop cluster (Sample Datasets).
Before doing MR in Pentaho, we need it to configure it to work with Cloudera (by default it is configured to work with Apache). It can be easily done in 5 minutes by following this link:

Configure Pentaho for Cloudera CDH4

After doing all the listed steps make sure you are able to connect to HDFS through Pentaho. For this:

1. Create a new transformation.

2. Add a Hadoop File Input step.

3. Browse HDFS, give address of your namenode, port as 8020 , user id as hdfs and by default there is no password.

4. If you are able to browse HDFS then you have successfully configured Pentaho.

5. You can discard this transformation. Was just checking the connectivity J.

To do a simple MapReduce in Pentaho we need to create:

1. Mapper transformation

2. Combiner transformation (optional)

3. Reducer transformation

4. MapReduce job

I am going to create a simple MR task which counts the number to flights for each month for our entire airline dataset.

First create a Mapper transformation:

1. Create a new transformation in Pentaho, let us call it “airline_mapper”.

2. Add a MapReduce Input step and configure it as shown:

3. Add a Split Fields Step to split csv file to different fields and connect it with previous step, configure current step as shown:

4. Add a String Operations step to pad month field with leading 0 (to make all month fields as 2 digit integers), connect it with previous step. It should be configure as shown:

 

 5. Add a User Defined Java Expression step to concatenate year with month, connect it with previous step. It should be configure as shown:

 6. Add a MapReduce Output step and connect it with previous step. It should be configure as shown:

 

I have used Carrier field to count number of flights for each month, we can use any other field also for doing same.

7. Now save this transformation. It should like this:

 

Now create a Reducer transformation:

1. Create a new transformation in Pentaho, let us call it “airline_reducer”.

2. Add a MapReduce Input step and configure it as shown:

 3. Add a Group By step and connect it with previous step. It should be configure as shown:

 4. Add a MapReduce Output step, connect it with previous step. It should be configure as shown:

 5. Now save this transformation. It should like this:

 Our mapper and reducer transformation are complete, now let us combine them in a MR job.

Create a MapReduce job:

1. Create a new job in Pentaho, let us call it “airline_job”.

2. Add a Start step and a Pentaho MapReduce step and connect them. It should look like:

 3. Configure Pentaho MapReduce as shown in below screenshots:

 You can give any name in Hadoop Job Name, let us call our job as airline_agg.
Mapper transformation should point to airline_mapper transformation.
Mapper Input/Output Step Name should match the step names with our mapper transformation.

4. We don’t have combiner step, so leave next tab empty.

5. Configure Reducer tab like this:

 Again Reducer Input/Output Step Name should match the step names with our reducer transformation.

6. Configure Job Setup step as shown:

 7. Configure Cluster tab like this:

 8. Save and run this job.

While the job is running we can see its progress in Pentaho and also in JobTracker UI (http://jobtracker_ip:50030/).

 You can also see the details of a running job by clicking on it:

 On my Cloudera setup it took around 9 minutes to execute this job.

I shall try to do some complex analysis on this dataset using Pentaho and post it very soon.