Wednesday, 13 March 2013

MapReduce with AWK

MapReduce in AWK

MapReduce with AWK


In my last post Lets MapReduce with Pentaho I had written a MapReduce program in Pentaho Data Integration. Now I am writing some MapReduce code in AWK to:
  • Calculate number/percentage of flights delayed by over 15 minutes aggregated at day level (on airline dataset).
I will be using hadoop streaming package to run my AWK MapReduce task.

Any MapReduce code should follow the common basics listed below:
  • The Mapper has a map method that transforms input (key, value) pairs into any number of intermediate (key’, value’) pairs.
  • The Reducer has a reduce method that transforms intermediate (key’, value’*) aggregates into any number of output (key’’, value’’) pairs.


Number/percentage of flights delayed by over 15 minutes aggregated at day level

I write a Map code to emit date in yyyyMMdd format, constant value 1 for each flight and a boolean value for flights with DepDelay (column#16) > 15.
#!/usr/bin/awk -f
BEGIN {
        FS=",";
}
{
        if ($16 > 15)
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,1);}
        else
        {       printf("%4s%02s%02s,%d,%d\n", $1,$2,$3,1,0);}
}
   
The intermediate output of Map code will look like this:
......
19871001,1,1
19871001,1,1
19871002,1,0
19871002,1,0
19871002,1,0
..... 
  
Note that the output of mapper is sorted.
The reduce code will keep a count of flights per day and delayed flights.
To aggregate data (sum) by day level I will use array in awk.
#!/bin/awk -f

BEGIN {FS=",";
}
{       A[$1]+=$2;      # Add $2 to the array A having index of $1
        B[$1]+=$3;      # Add $3 to the array B having index of $1
}
END {
        for(i in A)    # Get all the indexes of array A in i
        {printf "%s,%d,%d,%5.2f\n", i,A[i],B[i],B[i]*100/A[i]}
}
 
Run this MapReduce code with hadoop streaming:
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.0.jar \
  -input /airline/nh_1987.csv \
  -output /airline/output \
  -mapper path_to/airline2_m.awk \
  -reducer path_to/airline2_r.awk \
  -inputformat org.apache.hadoop.mapred.TextInputFormat \
  -outputformat org.apache.hadoop.mapred.TextOutputFormat \
  -file path_to/airline2_m.awk \
  -file path_to/airline2_r.awk
  
We can check the output of above code by using cat command or getting the file from hdfs to local filesystem:
hadoop fs -cat /airline/output/part-00000
hadoop fs -get /airline/output/part-00000
  

No comments:

Post a Comment