Saturday, 12 October 2013

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

MapReduce On Hive Tables Using HCatalog

In my last post Introduction To Hive's Partitioning I described how we can load csv data to a partitioned hive table. Today we shall see how we can use HCatalog to run MapReduce on Hive table and store the output in another Hive table.

HCatalog makes Hive metadata available to users of other Hadoop tools like Pig, MapReduce and Hive. It provides connectors for MapReduce and Pig so that users of those tools can read data from and write data to Hive’s warehouse.
HCatalog’s table abstraction presents users with a relational view of data in the (HDFS) and ensures that users need not worry about where or in what format their data is stored, so users don't need to know if data is stored in RCFile format, text files, or sequence files.
It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog provides HCatInputFormat/HCatOutputFormat to enable MapReduce users to read/write data in Hive’s data warehouse. It allows users to read only the partitions of tables and columns that they need. And it provides the records in a convenient list format so that users do not need to parse them.

We shall see how we can use HCatalog to do a count of records for each year-month combination.
In mapper class we get table schema and use this schema information to get the required columns and their values.
Note - In context.write I am using a custom writable IntPair which I will describe in a separate post.

public class onTimeMapper extends Mapper {
    protected void map(WritableComparable key, HCatRecord value,
     org.apache.hadoop.mapreduce.Mapper.Context context)
     throws IOException, InterruptedException {

     // Get table schema
     HCatSchema schema = HCatBaseInputFormat.getTableSchema(context);

     Integer year = new Integer(value.getString("year", schema));
     Integer month = new Integer(value.getString("month", schema));
     Integer DayofMonth = value.getInteger("dayofmonth", schema);

     context.write(new IntPair(year, month), new IntWritable(DayofMonth));
In reducer class we create a record schema for holding the columns and their values, which is written to target Hive table.

public class onTimeReducer extends Reducer {
 public void reduce (IntPair key, Iterable value, Context context) 
  throws IOException, InterruptedException{
  int count = 0; // records counter for particular year-month
  for (IntWritable s:value) {
  // define output record schema
  List columns = new ArrayList(3);
  columns.add(new HCatFieldSchema("year", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("month", HCatFieldSchema.Type.INT, ""));
  columns.add(new HCatFieldSchema("flightCount", HCatFieldSchema.Type.INT,""));
  HCatSchema schema = new HCatSchema(columns);
  HCatRecord record = new DefaultHCatRecord(3);
  record.setInteger("year", schema, key.getFirstInt()); 
  record.set("month", schema, key.getSecondInt());
  record.set("flightCount", schema, count);
  context.write(null, record);
Finally we write driver class with input/output schema and table details:

public class onTimeDriver extends Configured implements Tool{
    private static final Log log = LogFactory.getLog( onTimeDriver.class );

    public int run( String[] args ) throws Exception{
     Configuration conf = new Configuration();
     Job job = new Job(conf, "OnTimeCount");

     HCatInputFormat.setInput(job, "airline", "ontimeperf");
     HCatOutputFormat.setOutput(job, OutputJobInfo.create("airline", "flight_count", null));
     HCatSchema s = HCatOutputFormat.getTableSchema(job);
     HCatOutputFormat.setSchema(job, s);
     return (job.waitForCompletion(true)? 0:1);
    public static void main(String[] args) throws Exception{
  int exitCode = onTimeDriver(), args);
Before running this code we need to create the output table in Hive using below commands:
create table airline.flight_count
(Year INT ,
Month INT ,
flightCount INT)
After compiling and making jar file we can run this code using hadoop jar command without any argument. You can check output using HQL by querying airline.flight_count table.

Note - I faced some hive errors while running the jar from any folder except $HIVE_HOME with apache setup, so if you face any issue it better to give a try by placing and running the jar from $HIVE_HOME folder.
Just for verifying that our MapReduce program is independent of output table schema we shall alter airline.flight_count to have a dummy column:
ALTER TABLE airline.flight_count1 ADD COLUMNS (dummy STRING);
After truncating this table we will start up our MapReduce program again and we can see that our program runs in same manner.


  1. Hi Rishav,
    we successfully deployed a Map Reduce job which writes output into a Hive table.
    Now we are trying to run our Map Reduce application using Oozie (or hue workflow manager) but we are facing an issue with HCatOutput initialization.
    It says HCAtOutput needs to be iniziatlised, but we do not know to initialize it through workflow.xml properties.
    See below our workflow.xml.
    Can you help us?
    Regards Davide

    1. I haven't worked much with Oozie. you can reach out to oozie mailing list for help.