Let us assume that the transaction data which we are getting is in csv format like this - tId,pId
where tId is transaction Id
and pId is product Id
a single transaction can have one or more product Ids spread across one or multiple csv records e.g.
101,701
101,702
101,703
102,701
103,705
103,707
I have implemented Apriori algorithm for 2-itemset using 3 MapReduce jobs. The jobs and their functions are described below -
1. PopulateTranBasketAndProdPairJob - The mapper class of this job reads transaction records from specified csv file and emits (tId, pId). This job's reducer class gets (tId, <pId1, pId2,..., pIdn>) as input, then it makes product pairs available for this tId and writes individual pId(s) and product-pair(s) to HBase table 'tranBasket' with tId as rowkey.
2. SupportCountJob - This job reads the 'tranBasket' table and calculates the support counts for all pId and product pair(s). The support counts of individual products are stored in 'pCount' table with pId as rowkey and the support counts for product pairs are stored in 'ppCount' table with product pair as rowkey. At the end of this job transaction count is also printed to screen which acts as input to next job.
3.CalcSupportConfidenceJob - This is the last job in this series and gives us support, confidence and lift values for different product pairs. This job takes transaction count from the previous job as input to calculate support values. In this job only mapper is there, which reads complete 'pCount' table in memory and then reads 'ppCount' table row by row and performs calculation of different Apriori measures like support, confidence and lift and writes the result to HBase table 'apprOut'.
For verifying the results we can check mapper sysout files which have the values in readable format.
Note - This is just a simple demo application and there is scope for improvements. Some modifications which I can think of now are -
Generally transaction ids are sequential numbers and if they are stored in HBase as such we will experience region hot spotting. Hence rowkey design has to be reworked.
HBase scanner caching value needs to be checked and optimised.
Currently pId and tId are stored as Text which can be changed to Long.
Apriori algorithm is a frequent item set mining algorithm used over transactional databases, proposed by Rakesh Agrawal and Ramakrishnan Srikant in 1993. This algorithm proceeds by identifying the frequent individual items in the database and extending them to larger and larger item sets as long as those item sets appear sufficiently often in the database. The frequent item sets determined by Apriori can be used to determine association rules which highlight general trends in the database.
Before we go further and see how this algorithm works it is better to be familiar terminologies used in this algorithm-
Writing Complex MongoDB Queries Using QueryBuilder
MongoDB provides a lot of query selectors for filtering documents from a collection. Writing complex queries for MongoDB in Java can be tricky sometimes.
Consider below data present in student_marks collection
The same query written in Java will look something like this -
DBObject query = new BasicDBObject();
List<BasicDBObject> andQuery = new ArrayList<BasicDBObject>();
andQuery.add(new BasicDBObject("lname", "Ford"));
andQuery.add(new BasicDBObject("marks.english", new BasicDBObject("$gt", 35)));
query.put("$and", andQuery);
Using MongoDB QueryBuilder we can rewrite above query as -
DBObject query = new QueryBuilder()
.start()
.and(new QueryBuilder().start().put("lname").is("Ford").get(),
new QueryBuilder().start().put("marks.english")
.greaterThan(35).get()).get();
You can see that by using QueryBuilder we can write complex queries with ease. QueryBuilder class provides many methods like and, not, greaterThan, exists, etc. which helps in writing MongoDB queries more efficiently and less prone to error/mistakes.
MongoDB Aggregation pipeline is a framework for data aggregation. It is modelled
on the concept of data processing pipelines. Documents enter a
multi-stage pipeline that transforms the documents into an aggregated
results. It was introduced in MongoDB 2.2 to do aggregation
operations without needing to use map-reduce.
Aggregation Pipeline
The $match and $sort pipeline operators can
take advantage of an index when they occur at the beginning of the
pipeline [Reference].
There are no restrictions on result size as a cursor is returned [Reference].
The output can be returned inline or written to a collection [Reference].
Pipeline stages have a limit of 100MB of RAM. To handle large datasets use allowDiskUse option [Reference].
Aggregation Pipeline have an optimization phase which
attempts to reshape the pipeline for improved performance [Reference].
For most aggregation operations, the
Aggregation Pipeline provides better performance and
more coherent interface. However, map-reduce operations provide
some flexibility that is presently not available in the aggregation
pipeline.
The syntax for aggregation pipeline is
db.collection.aggregate([{<stage>},...])
Stages
The MongoDB aggregation pipeline consists of stages. Each stage transforms the
documents as they pass through the pipeline. Pipeline stages do not
need to produce one output document for every input document; e.g.,
some stages may generate new documents or filter out documents.
Pipeline stages can appear multiple times in the pipeline.
Various stage operators supported by MongoDB are listed below-
Returns an ordered stream of documents based on the proximity to a
geospatial point. Incorporates the functionality of $match,
$sort, and $limit for geospatial data. The
output documents include an additional distance field and can
include a location identifier field.
Groups input documents by a specified identifier expression and
applies the accumulator expression(s), if specified, to each group.
Consumes all input documents and outputs one document per each
distinct group. The output documents only contain the identifier
field and, if specified, accumulated fields.
Passes the first n documents unmodified to the pipeline
where n is the specified limit. For each input document, outputs
either one document (for the first n documents) or zero documents
(after the first n documents).
Filters the document stream to allow only matching documents
to pass unmodified into the next pipeline stage. $match
uses standard MongoDB queries. For each input document, outputs
either one document (a match) or zero documents (no match).
Reshapes each document in the stream by restricting the content for
each document based on information stored in the documents
themselves. Incorporates the functionality of $project
and $match. Can be used to implement field level
redaction. For each input document, outputs either one or zero
document.
Skips the first n documents where n is the specified skip number
and passes the remaining documents unmodified to the pipeline. For
each input document, outputs either zero documents (for the first n
documents) or one document (if after the first n documents).
Reorders the document stream by a specified sort key. Only the order
changes; the documents remain unmodified. For each input document,
outputs one document.
Deconstructs an array field from the input documents to output a
document for each element. Each output document replaces the array
with an element value. For each input document, outputs n documents
where n is the number of array elements and can be zero for an
empty array.
Different expressions supported by MongoDB are listed here.
In MongoDB there are multiple guarantee levels available for reporting the success of a write operation, called Write Concerns. The strength of the
write concerns determine the level of guarantee. A weak Write Concern has better performance at the cost of lesser guarantee, while a strong Write Concern has higher guarantee as clients wait to confirm the write operations.
MongoDB provides different levels of write concern to better address
the specific needs of applications. Clients may adjust write concern to
ensure that the most important operations persist successfully to an
entire MongoDB deployment. For other less critical operations, clients
can adjust the write concern to ensure faster performance rather than
ensure persistence to the entire deployment.
Write Concern Levels
MongoDB has the following levels of conceptual write
concern, listed from weakest to strongest:
Unacknowledged
With an unacknowledged write concern, MongoDB does not acknowledge
the receipt of write operations. Unacknowledged is similar to
errors ignored; however, drivers will attempt to receive and handle
network errors when possible. The driver’s ability to detect network
errors depends on the system’s networking configuration.
Acknowledged
With a receipt acknowledged write concern, the mongod confirms the receipt of the write operation. Acknowledged write
concern allows clients to catch network, duplicate key, and other
errors. This is default write concern.
Journaled
With a journaled write concern, the MongoDB acknowledges the write
operation only after committing the data to the journal. This
write concern ensures that MongoDB can recover the data following a
shutdown or power interruption.
You must have journaling enabled to use this write concern.
Replica Acknowledged Replica sets present additional considerations
with regards to write concern. The default write concern only requires
acknowledgement from the primary.
With replica acknowledged write concern, you can guarantee that the
write operation propagates to additional members of the replica set.
Write operation to a replica set with write concern level of w:2 or write to the primary and at least one secondary.
In my previous post I explained about Hive GenericUDF.
In this post I will give an example of Hive GenericUDF to get Latitude and Longitude of a given location using Google Geocoding API. Lets call this Hive function as GeoEncodeUDF. GeoEncodeUDF function takes a String location and returns an array of Float containing latitude and longitude.
For obtaining latitude and longitude information I am using Google geocode API which is available here http://maps.googleapis.com/maps/api/geocode/json?address=<address>, this returns a JSON objects containg matching places and their latitude and longitude. This might return multiple address but for sake of simplicity I am taking the first address's latitude and longitude. I have created a helper method getLatLng in class GeoLatLng which takes location string and returns latitude and longitude in an array of float. This class is given below -
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Description(name = "GeoEncodeUDF", value = "Get Lat-Lng", extended = "fetches location co-ordinates for given location from Google geocode Api and returns an ARRAY of 2 floats [lat,lng]")
@UDFType(deterministic = true)
public class GeoEncodeUDF extends GenericUDF {
private ArrayList<FloatWritable> result;
// Verify the input is of the required type.
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
// Exactly one input argument
if( arguments.length != 1 ) {
throw new UDFArgumentLengthException(GeoEncodeUDF.class.getSimpleName() + " accepts exactly one argument.");
}
// Is the input a String
if (((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING ) {
throw new UDFArgumentTypeException(0,"The single argument to " +GeoEncodeUDF.class.getSimpleName() + " should be String but " + arguments[0].getTypeName() + " is found");
Apache Hive comes with a
set of pre-defined User Defined Functions (UDFs). A
complete listing of Hive UDFs is available here. Some common UDFs are unix_timestamp(), to_date(string timestamp), concat(string|binary A, string|binary B...), etc. However sometimes custom UDF is needed to solve specific problems.
In this post I will go through the process of creating custom UDFs.
Difference between UDF and GenericUDF Hive UDFs are written in Java. In order to create a Hive UDF you need to derive from one of two classes UDF or GenericUDF. GenericUDFis bit complex to develop compared to UDF but it offers better performance and it supports all non-primitive parameters as input parameters and return types.
For writing custom UDFs by extending GenericUDF we need to overwrite 3 methods: initialize(), evaluate() and getDisplayString().
initialize() This method only gets called once per
JVM at the beginning to initilize the UDF. initilialize() is used to
assert and validate the number and type of parameters that a UDF takes
and the type of argument it returns. It also returns an ObjectInspector corresponding to the return type of the UDF.
evaluate() This method is called once for every row of data being processed. Here the actual logic for transformation/processing of each row is written. It will return an object containing the result of processing logic.
getDisplayString() A simple method for returning the display string for the UDF when explain is used.
Apart from these we can have these Annotations also -
@UDFType(deterministic = true)
A deterministic UDF is
one which always gives the same result when passed the same parameters.
An example of such UDF are length(string input), regexp_replace(string
initial_string, string pattern, string replacement), etc. A
non-deterministic UDF, on the other hand can return different result for
the same set of parameters. For example, unix_timestamp() returns the
current timestamp using the default time zone. Therefore, when
unix_timestamp() is invoked with the same parameters (no parameters) at
different times, different results are obtained, making it
non-deterministic. This annotation allows Hive to perform some
optimization if the UDF is deterministic.
@Description(name="my_udf", value="This will be the result
returned by explain statement.", extended="This will be result returned by the
explain extended statement.")
This annotation tells Hive the
name of your UDF. It will also be used to populate the result of queries
like `DESCRIBE FUNCTION MY_UDF` or `DESCRIBE FUNCTION EXTENDED MY_UDF`.
In my next post I will give an example of GenericUDF to latitude and longitude of a location.
Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.
The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -
create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'
The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'
Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'
The mapper class for this job is -
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.
The reducer class for this job is -
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Based on intermediate key value I am using aggregation in reducer.
Finally the driver class for this job is
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Recently I worked on a project which uses MongoDB as source data system and uses R for analysis and MongoDB again for output storage.
In this project we faced a different problem. We were using R to process source data present in MongoDB and if we gave large number of documents for analysis to R it was becoming slower and a bottleneck. To avoid this bottleneck we had to implement processing of a fixed number of documents in R for a batch.
To achieve this we needed some kind of record number in MongoDB, but being a distributed database getting some sequential number in MongoDB was not supported. Also our MongoDB source was getting populated by a distributed real-time stream so implementing some logic on application side was also deterrent.
To have some batchId field for a fixed number of documents in MongoDB we implemented below algorithm : 1. Find for documents which didn't had batchId field. 2. Sort by some timestamp field. 3. Limit the number of documents (say 10000). 4. Append batchId field to documents and save them (get value of batchId from audit table).
MongoDB shell command for this is :
db['collection1'].find({batchId:null}).sort({systemTime:1}).limit(10000).forEach( function (e) { // get value of batchId from audit table e.batchId = 1; db['collection1'].save(e); } );
Using the above code we appeneded batchId to MongoDB documents and picked only current batchId for analysis in R.
Java code for above MongoDB shell command is :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In this post we are going to write a MapReduce program to consume Avro input data and also produce data in Avro format.
We will write a program to calculate average of student marks.
I have written a MapReduce program which reads Avro data file student.avro (passed as argument) and calculates average marks for each student and store the output also in Avro format. The program is given below:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In the program the input key to mapper is AvroKey<student_marks> and the input value is null. The output key of map method is student_id and output value is an IntPair having marks and 1.
We have a combiner also which aggregates partial sums for each student_id.
Finally reducer takes student_id and partial sums and counts and uses them to calculate average for each student_id. The reducer writes the output in Avro format.
For Avro job setup we have added these properties:
// set InputFormatClass to AvroKeyInputFormat and define input schema
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, student_marks.getClassSchema());
// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));
Job Execution
We package our Java program to avro_mr.jar and add Avro jars to libjars and hadoop classpath using below commands:
In one of my previous post I explained how we can convert json data to avro data and vice versa using avro tools command line option. Today I was trying to see what options we have for converting csv data to avro format, as of now we don't have any avro tool option to accomplish this . Now, we can either write our own java program (MapReduce program or a simple java program) or we can use various SerDe's available with Hive to do this quickly and without writing any code :)
To convert csv data to Avro data using Hive we need to follow below steps:
Create a Hive table stored as textfile and specify your csv delimiter also.
Load csv file to above table using "load data" command.
Create another Hive table using AvroSerDe.
Insert data from former table to new Avro Hive table using "insert overwrite" command.
To demonstrate this I will use use below data (student.csv):
0,38,91
0,65,28
0,78,16
1,34,96
1,78,14
1,11,43
Now execute below queries in Hive:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Now you can get data in Avro format from Hive warehouse folder. To dump this file to local file system use below command:
hadoop fs -cat /path/to/warehouse/test.db/avro_table/* > student.avro
If you want to get json data from this avro file you can use avro tools command:
java -jar avro-tools-1.7.5.jar tojson student.avro > student.json
So we can easily convert csv to avro and csv to json also by just writing 4 HQLs.
In one of my previous post I wrote about Implementing Custom Writable which can be used as values in MapReduce program. For using customized key in MapReduce we need to implement WritableComparable interface.
WritableComparable interface is just a subinterface of the Writable and java.lang.Comparable interfaces. For implementing a WritableComparable we must have compareTo method apart from readFields and write methods, as shown below:
public interface WritableComparable extends Writable, Comparable
{
void readFields(DataInput in);
void write(DataOutput out);
int compareTo(WritableComparable o)
}
Comparison of types is crucial for MapReduce, where there is a sorting phase during which keys are compared with one another.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
As you can see in compareTo(IntPair tp) of above class that IntPair needs to be deserialized for comparison to take place, we can implement a RawComparator which can compare two keys by just checking their serialized representation. More on RawComparator is available in Hadoop: The Definitive Guide.
Avro can use different schemas for serialization and deserialization, and it can handle removed, added and modified fields. Thus it helps in building decoupled and robust systems.
In this post we will serialize data using this schema:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Before we actually see how Avro handles these modification I would like to mention below points:
If a new field is added then it must have a default value. Also specify type as an array of types starting with null e.g. "type": ["null", "string"] otherwise you will get this error:
Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.node.ValueNode.asText()Ljava/lang/String;
If a field is renamed then the old name must be present as aliases.
In the this java program we serialize data using StudentActivity.avsc schema and deserialize data using StudentActivityNew.avsc schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In the previous post we used avro-tools commands to serialize and deserialize data. In this post we post we will use Avro Java API for achieving the same. We will use same sample data and schema from our previous post.
The java code for serializing and deserializing data without generating the code for schema is given below:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Output path can be source folder for the project or we can add the generated java class files to Eclipse IDE manually.
The java code for serializing and deserializing data with generating the code for schema is similar to above code except that in previous code we were assiging values to a GenericRecord and in this one we are assigning values to the generated Avro object:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In our previous post we got some basic idea about Avro, in this post we will use Avro for serializing and deserializing data.
We will use these 3 methods in which we can use Avro for serialization/deserialization:
Using Avro command line tools.
Using Avro Java API without code generation.
Using Avro Java API with code generation.
Sample Data
We will use below sample data (StudentActivity.json):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Avro schemas are defined using JSON. The avro schema for our sample data is defined as below (StudentActivity.avsc):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. Serialization/Deserialization using Avro command line tools
Avro provides a jar file by name avro-tools-<version>.jar which provides many command line tools as listed below:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Apache Avro is a popular data serialization format and is gaining more users as many hadoop based tools natively support Avro for serialization and deserialization.
In this post we will understand some basics about Avro.
What is Avro?
Data serialization system
Uses JSON based schemas
Uses RPC calls to send data
Schema's sent during data exchange
Avro provides:
Rich data structures.
A compact, fast, binary data format.
A container file, to store persistent data.
Simple integration with many languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.
Avro API's exist for these languages Java, C, C++, C#, Python and Ruby.
Avro Schema:
Avro relies on schemas for serialization/deserialization.
When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.
If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.
Avro supports a wide range of datatypes which are listed below: Primitive Types
null: no value
boolean: a binary value
int: 32-bit signed integer
long: 64-bit signed integer
float: single precision (32-bit) IEEE 754 floating-point number
double: double precision (64-bit) IEEE 754 floating-point number
bytes: sequence of 8-bit unsigned bytes
string: unicode character sequence
Complex Types
Avro supports six kinds of complex types: records, enums, arrays, maps, unions and fixed. Detailed information on these complex types is available here.
Schema Resolution:
A reader of Avro data, whether from an RPC or a file, can always parse that data because its schema is provided. But that schema may not be exactly the schema that was expected. For example, if the data was written with a different version of the software than it is read, then records may have had fields added or removed.
We call the schema used to write the data as the writer's schema, and the schema that the application expects the reader's schema. Differences between these should be resolved as follows:
It is an error if the two schemas do not match.
To match, one of the following must hold:
both schemas are arrays whose item types match
both schemas are maps whose value types match
both schemas are enums whose names match
both schemas are fixed whose sizes and names match
both schemas are records with the same name
either schema is a union
both schemas have same primitive type
the writer's schema may be promoted to the
reader's as follows:
int is promotable to long, float, or double
long is promotable to float or double
float is promotable to double
if both are records:
the ordering of fields may be different: fields are
matched by name.
schemas for fields with the same name in both records
are resolved recursively.
if the writer's record contains a field with a name
not present in the reader's record, the writer's value
for that field is ignored.
if the reader's record schema has a field that
contains a default value, and writer's schema does not
have a field with the same name, then the reader should
use the default value from its field.
if the reader's record schema has a field with no
default value, and writer's schema does not have a field
with the same name, an error is signalled.
if both are enums:
if the writer's symbol is not present in the reader's
enum, then an error is signalled.
if both are arrays:
This resolution algorithm is applied recursively to the reader's and
writer's array item schemas.
if both are maps:
This resolution algorithm is applied recursively to the reader's and
writer's value schemas.
if both are unions:
The first schema in the reader's union that matches the
selected writer's union schema is recursively resolved
against it. if none match, an error is signalled.
if reader's is a union, but writer's is not
The first schema in the reader's union that matches the
writer's schema is recursively resolved against it. If none
match, an error is signalled.
if writer's is a union, but reader's is not
If the reader's schema matches the selected writer's schema,
it is recursively resolved against it. If they do not
match, an error is signalled.
In next post we will see a program to serialization/deserialization some data using avro and also see how Avro handles schema evolution.