Wednesday 5 March 2014

MapReduce on Avro data files

In this post we are going to write a MapReduce program to consume Avro input data and also produce data in Avro format.
We will write a program to calculate average of student marks.

 

Data Preparation

The schema for the records is:
student.avsc
{
  "type" : "record",
  "name" : "student_marks",
  "namespace" : "com.rishav.avro",
  "fields" : [ {
    "name" : "student_id",
    "type" : "int"
  }, {
    "name" : "subject_id",
    "type" : "int"
  }, {
    "name" : "marks",
    "type" : "int"
  } ]
}

And some sample records are:
student.json
{"student_id":1,"subject_id":63,"marks":19}
{"student_id":2,"subject_id":64,"marks":74}
{"student_id":3,"subject_id":10,"marks":94}
{"student_id":4,"subject_id":79,"marks":27}
{"student_id":1,"subject_id":52,"marks":95}
{"student_id":2,"subject_id":34,"marks":16}
{"student_id":3,"subject_id":81,"marks":17}
{"student_id":4,"subject_id":60,"marks":52}
{"student_id":1,"subject_id":11,"marks":66}
{"student_id":2,"subject_id":84,"marks":39}
{"student_id":3,"subject_id":24,"marks":39}
{"student_id":4,"subject_id":16,"marks":0}
{"student_id":1,"subject_id":65,"marks":75}
{"student_id":2,"subject_id":5,"marks":52}
{"student_id":3,"subject_id":86,"marks":50}
{"student_id":4,"subject_id":55,"marks":42}
{"student_id":1,"subject_id":30,"marks":21}

Now we will convert the above sample records to avro format and upload the avro data file to HDFS:
java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro
hadoop fs -put student.avro student.avro

Avro MapReduce Program

In my program I have used Avro Java class for student_marks schema. To generate Java class from the schema file use below command:
java -jar avro-tools-1.7.5.jar compile schema student.avsc .
Then add the generated Java class to IDE.

I have written a MapReduce program which reads Avro data file student.avro (passed as argument) and calculates average marks for each student and store the output also in Avro format. The program is given below:


  • In the program the input key to mapper is AvroKey<student_marks> and the input value is null. The output key of map method is student_id and output value is an IntPair having marks and 1.
  • We have a combiner also which aggregates partial sums for each student_id.
  • Finally reducer takes student_id and partial sums and counts and uses them to calculate average for each student_id. The reducer writes the output in Avro format.

For Avro job setup we have added these properties:
// set InputFormatClass to AvroKeyInputFormat and define input schema
    job.setInputFormatClass(AvroKeyInputFormat.class);
    AvroJob.setInputKeySchema(job, student_marks.getClassSchema());

// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
    job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
    AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
    AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

Job Execution

We package our Java program to avro_mr.jar and add Avro jars to libjars and hadoop classpath using below commands:

export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output
You can verify the output using avro-tool command.

To enable snappy compression for output add below lines to run method and add snappy-java jar to libjars and hadoop classpath:
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);

7 comments:

  1. Hello and thank you for this example. i'm actually having problems defining the custom classes wrapped by AvroKey, in your case it's student_marks class. Can you share the student_marks code ? Thanks !

    ReplyDelete
    Replies
    1. You can generate custom class by using below command on your json schema file -
      java -jar avro-tools-1.7.5.jar compile schema student.avsc

      Hope this slove your problem.

      Delete
    2. Thank you. This means that your com.rishav.avro.student_marks (for which you made an "import") is not extending/implementing (Specific)RecordBase and (Specific)RecordBuilder ?

      Delete
    3. I am not having access to com.rishav.avro.student_marks class now, but I believe it would extend/implement some Avro classes.
      Are you trying to write Avro schema java class by yourelf? I would suggest you to first write a json schema and then use the above command to generate java class.

      Delete
    4. I'm actually not a "good" developper but I'm willing to make my tests work :-) Yes indeed, I'm trying to write my own classes wrapped by e.g. AvroKey. I want to be able to do the "key.datum().getStudentId()" above and much more ... but I'm having a tough time making them work. Whenever you have time to access the student_marks class, can you please share it ?

      Delete
    5. Here you go - https://gist.github.com/rishav-rohit/2352c44d19875e2ca5e5

      Delete
  2. Thank you Rishav ! indeed I didn't well get the custom class creation using java -jar avro-tools-1.7.5.jar compile schema student.avsc ... now I understand that I don't have to write this class "manually". Your code worked very well. I made a simple modification in order for the IntPair.set() method to work:

    // my bgn modifications:
    IntWritable intW_p_count = new IntWritable(0);
    IntWritable intW_p_sum = new IntWritable(0);
    ...
    intW_p_count.set(p_count);
    intW_p_sum.set(p_sum);

    //p_sum_count.set(p_sum, p_count);
    p_sum_count.set(intW_p_sum, intW_p_count);

    ReplyDelete