Wednesday, 19 February 2014

Avro Schema Evolution

Avro can use different schemas for serialization and deserialization, and it can handle removed, added and modified fields. Thus it helps in building decoupled and robust systems.

In this post we will serialize data using this schema:
{
"namespace": "com.rishav.avro",
"type": "record",
"name": "StudentActivity",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "student_id",
"type": "int"
},
{
"name": "university_id",
"type": "int"
},
{
"name": "course_details",
"type": {
"name": "Activity",
"type": "record",
"fields": [
{
"name": "course_id",
"type": "int"
},
{
"name": "enroll_date",
"type": "string"
},
{
"name": "verb",
"type": "string"
},
{
"name": "result_score",
"type": "double"
}
]
}
}
]
}

and deserialize it using a different schema
{
"namespace": "com.rishav.avro",
"type": "record",
"name": "StudentActivity",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "student_id",
"type": "int"
},
{
"name": "age",
"type": ["null", "int"],
"default": null
},
{
"name": "course_details",
"type": {
"name": "Activity",
"type": "record",
"fields": [
{
"name": "course_id",
"type": "int"
},
{
"name": "enroll_date",
"type": "string"
},
{
"name": "verb",
"type": "string"
},
{
"name": "score",
"type": "double",
"aliases": ["result_score"]
}
]
}
}
]
}
which has following modifications:
  1. university_id field is removed.
  2. age field is added.
  3. result_score field is renamed to score.
Before we actually see how Avro handles these modification I would like to mention below points:
  • If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
    Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
  • If a field is renamed then the old name must be present as aliases.

In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema
package com.rishav.avro;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.json.simple.JSONObject;
public class AvroSchemaEvolution {
public void serialize() throws JsonParseException, JsonProcessingException, IOException {
InputStream in = new FileInputStream("resources/StudentActivity.json");
// create a schema
Schema schema = new Schema.Parser().parse(new File("resources/StudentActivity.avsc"));
// create a record to hold json
GenericRecord AvroRec = new GenericData.Record(schema);
// create a record to hold course_details
GenericRecord CourseRec = new GenericData.Record(schema.getField("course_details").schema());
// this file will have AVro output data
File AvroFile = new File("resources/StudentActivity.avro");
// Create a writer to serialize the record
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, AvroFile);
// iterate over JSONs present in input file and write to Avro output file
for (Iterator it = new ObjectMapper().readValues(
new JsonFactory().createJsonParser(in), JSONObject.class); it.hasNext();) {
JSONObject JsonRec = (JSONObject) it.next();
AvroRec.put("id", JsonRec.get("id"));
AvroRec.put("student_id", JsonRec.get("student_id"));
AvroRec.put("university_id", JsonRec.get("university_id"));
LinkedHashMap CourseDetails = (LinkedHashMap) JsonRec.get("course_details");
CourseRec.put("course_id", CourseDetails.get("course_id"));
CourseRec.put("enroll_date", CourseDetails.get("enroll_date"));
CourseRec.put("verb", CourseDetails.get("verb"));
CourseRec.put("result_score", CourseDetails.get("result_score"));
AvroRec.put("course_details", CourseRec);
dataFileWriter.append(AvroRec);
} // end of for loop
in.close();
dataFileWriter.close();
} // end of serialize method
public void deserialize () throws IOException {
// create a schema
Schema schema = new Schema.Parser().parse(new File("resources/StudentActivityNew.avsc"));
// Schema schema = new Schema.Parser().parse(new File("resources/StudentActivity.avsc"));
// create a record using schema
GenericRecord AvroRec = new GenericData.Record(schema);
File AvroFile = new File("resources/StudentActivity.avro");
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(AvroFile, datumReader);
System.out.println("Deserialized data is :");
while (dataFileReader.hasNext()) {
AvroRec = dataFileReader.next(AvroRec);
System.out.println(AvroRec);
}
}
public static void main(String[] args) throws JsonParseException, JsonProcessingException, IOException {
AvroSchemaEvolution AvroEx = new AvroSchemaEvolution();
AvroEx.serialize();
AvroEx.deserialize();
}
}

On executing this code we see that Avro handles the modifications without any issues and our data is deserialized properly.

No comments:

Post a Comment