In this post we will serialize data using this schema:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"namespace": "com.rishav.avro", | |
"type": "record", | |
"name": "StudentActivity", | |
"fields": [ | |
{ | |
"name": "id", | |
"type": "string" | |
}, | |
{ | |
"name": "student_id", | |
"type": "int" | |
}, | |
{ | |
"name": "university_id", | |
"type": "int" | |
}, | |
{ | |
"name": "course_details", | |
"type": { | |
"name": "Activity", | |
"type": "record", | |
"fields": [ | |
{ | |
"name": "course_id", | |
"type": "int" | |
}, | |
{ | |
"name": "enroll_date", | |
"type": "string" | |
}, | |
{ | |
"name": "verb", | |
"type": "string" | |
}, | |
{ | |
"name": "result_score", | |
"type": "double" | |
} | |
] | |
} | |
} | |
] | |
} |
and deserialize it using a different schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"namespace": "com.rishav.avro", | |
"type": "record", | |
"name": "StudentActivity", | |
"fields": [ | |
{ | |
"name": "id", | |
"type": "string" | |
}, | |
{ | |
"name": "student_id", | |
"type": "int" | |
}, | |
{ | |
"name": "age", | |
"type": ["null", "int"], | |
"default": null | |
}, | |
{ | |
"name": "course_details", | |
"type": { | |
"name": "Activity", | |
"type": "record", | |
"fields": [ | |
{ | |
"name": "course_id", | |
"type": "int" | |
}, | |
{ | |
"name": "enroll_date", | |
"type": "string" | |
}, | |
{ | |
"name": "verb", | |
"type": "string" | |
}, | |
{ | |
"name": "score", | |
"type": "double", | |
"aliases": ["result_score"] | |
} | |
] | |
} | |
} | |
] | |
} |
- university_id field is removed.
- age field is added.
- result_score field is renamed to score.
- If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
- If a field is renamed then the old name must be present as aliases.
In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rishav.avro; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.Iterator; | |
import java.util.LinkedHashMap; | |
import org.apache.avro.Schema; | |
import org.apache.avro.file.DataFileReader; | |
import org.apache.avro.file.DataFileWriter; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.io.BinaryDecoder; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.DatumWriter; | |
import org.codehaus.jackson.JsonFactory; | |
import org.codehaus.jackson.JsonParseException; | |
import org.codehaus.jackson.JsonProcessingException; | |
import org.codehaus.jackson.map.ObjectMapper; | |
import org.json.simple.JSONObject; | |
public class AvroSchemaEvolution { | |
public void serialize() throws JsonParseException, JsonProcessingException, IOException { | |
InputStream in = new FileInputStream("resources/StudentActivity.json"); | |
// create a schema | |
Schema schema = new Schema.Parser().parse(new File("resources/StudentActivity.avsc")); | |
// create a record to hold json | |
GenericRecord AvroRec = new GenericData.Record(schema); | |
// create a record to hold course_details | |
GenericRecord CourseRec = new GenericData.Record(schema.getField("course_details").schema()); | |
// this file will have AVro output data | |
File AvroFile = new File("resources/StudentActivity.avro"); | |
// Create a writer to serialize the record | |
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); | |
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); | |
dataFileWriter.create(schema, AvroFile); | |
// iterate over JSONs present in input file and write to Avro output file | |
for (Iterator it = new ObjectMapper().readValues( | |
new JsonFactory().createJsonParser(in), JSONObject.class); it.hasNext();) { | |
JSONObject JsonRec = (JSONObject) it.next(); | |
AvroRec.put("id", JsonRec.get("id")); | |
AvroRec.put("student_id", JsonRec.get("student_id")); | |
AvroRec.put("university_id", JsonRec.get("university_id")); | |
LinkedHashMap CourseDetails = (LinkedHashMap) JsonRec.get("course_details"); | |
CourseRec.put("course_id", CourseDetails.get("course_id")); | |
CourseRec.put("enroll_date", CourseDetails.get("enroll_date")); | |
CourseRec.put("verb", CourseDetails.get("verb")); | |
CourseRec.put("result_score", CourseDetails.get("result_score")); | |
AvroRec.put("course_details", CourseRec); | |
dataFileWriter.append(AvroRec); | |
} // end of for loop | |
in.close(); | |
dataFileWriter.close(); | |
} // end of serialize method | |
public void deserialize () throws IOException { | |
// create a schema | |
Schema schema = new Schema.Parser().parse(new File("resources/StudentActivityNew.avsc")); | |
// Schema schema = new Schema.Parser().parse(new File("resources/StudentActivity.avsc")); | |
// create a record using schema | |
GenericRecord AvroRec = new GenericData.Record(schema); | |
File AvroFile = new File("resources/StudentActivity.avro"); | |
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); | |
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(AvroFile, datumReader); | |
System.out.println("Deserialized data is :"); | |
while (dataFileReader.hasNext()) { | |
AvroRec = dataFileReader.next(AvroRec); | |
System.out.println(AvroRec); | |
} | |
} | |
public static void main(String[] args) throws JsonParseException, JsonProcessingException, IOException { | |
AvroSchemaEvolution AvroEx = new AvroSchemaEvolution(); | |
AvroEx.serialize(); | |
AvroEx.deserialize(); | |
} | |
} |
On executing this code we see that Avro handles the modifications without any issues and our data is deserialized properly.
No comments:
Post a Comment