Tuesday, 9 December 2014

Implementing Apriori Algorithm In Hadoop-HBase - Part 2 : Conversion to MapReduce Codes

Let us assume that the transaction data which we are getting is in csv format like this - tId,pId
where tId is transaction Id
and pId is product Id
a single transaction can have one or more product Ids spread across one or multiple csv records e.g.

I have implemented Apriori algorithm for 2-itemset using 3 MapReduce jobs. The jobs and their functions are described below -

1. PopulateTranBasketAndProdPairJob - The mapper class of this job reads transaction records from specified csv file and emits (tId, pId). This job's reducer class gets (tId, <pId1, pId2,..., pIdn>) as input, then it makes product pairs available for this tId and writes individual pId(s) and product-pair(s) to HBase table 'tranBasket' with tId as rowkey.

2. SupportCountJob - This job reads the 'tranBasket' table and calculates the support counts for all pId and product pair(s). The support counts of individual products are stored in 'pCount' table with pId as rowkey and the support counts for product pairs are stored in 'ppCount' table with product pair as rowkey. At the end of this job transaction count is also printed to screen which acts as input to next job.

3.CalcSupportConfidenceJob - This is the last job in this series and gives us support, confidence and lift values for different product pairs. This job takes transaction count from the previous job as input to calculate support values. In this job only mapper is there, which reads complete 'pCount' table in memory and then reads 'ppCount' table row by row and performs calculation of different Apriori measures like support, confidence and lift and writes the result to HBase table 'apprOut'.
For verifying the results we can check mapper sysout files which have the values in readable format.

The source code is available here.

Note - This is just a simple demo application and there is scope for improvements. Some modifications which I can think of now are -
  1. Generally transaction ids are sequential numbers and if they are stored in HBase as such we will experience region hot spotting. Hence rowkey design has to be reworked.
  2. HBase scanner caching value needs to be checked and optimised.
  3. Currently pId and tId are stored as Text which can be changed to Long.
References -
  • http://rakesh.agrawal-family.com/papers/vldb94apriori.pdf
  • http://www.slideshare.net/deepti92pawar/the-comparative-study-of-apriori-and-fpgrowth-algorithm
  • http://www3.cs.stonybrook.edu/~cse634/lecture_notes/07apriori.pdf


  1. http://java.dzone.com/articles/mapper-combiner-program I was trying to implement this program but dont know how to run .. I copied this program to eclipse and made jar file but dont know how to run it. Can you give guidance on that.

    1. you can use -
      hadoop jar jar_file_name fully_qualified_class_name input_file_or_folder output_folder