Monday 1 July 2013

XML parsing with Mapreduce

XML parsing with Mapreduce

XML parsing with Mapreduce


Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:

 <Response>
  <RowID>....</RowID>
  <ResponseID>....</ResponseID>
  <IPAddress>....</IPAddress>
  <Status>....</Status>
  <StartDate>....</StartDate>
  <EndDate>....</EndDate>
 </Response>

To hold this xml record we created xmlDef class:
package com.rishav.xml;

public class xmlDef {
 public static String xmlDef[][] = new String[][]{
      {"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"},     //HBase table name
      {"Y", "N", "N","N","N","N"},                                            //is column a key column?
      {"cf1", "cf1","cf2","cf2","cf2","cf2"},                                 //column family
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
      {"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
      {"", "", "", "", "", ""}                                                // place holder for xml value
      };
}

The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:

package com.rishav.hbase;

import com.rishav.xml.xmlDef;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class commonTableMapper 
 extends org.apache.hadoop.mapreduce.Mapper {
  private HTable htable;
  
 // create HBase connection
 protected void setup(Context context) 
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }
   
 @Override
 public void map(LongWritable key, Text value, Mapper.Context context) 
  throws IOException, InterruptedException {
   String currLine = value.toString();
   try {
    XMLStreamReader reader = XMLInputFactory.newInstance()
    .createXMLStreamReader(
      new ByteArrayInputStream(currLine.getBytes()));
 
    String currentElement = "";
    int col = 0;
    
    // initialize all xml value to blank string
    for (String xmlTag : xmlDef.xmlDef[3]) {
     xmlDef.xmlDef[5][col] = "";
     col++;
    }
    
    
    // read xml tags and store values in xmlDef
    while (reader.hasNext()) {
     int code = reader.next();
     switch (code) {
     case START_ELEMENT:
      currentElement = reader.getLocalName();
      break;
     case CHARACTERS:
      col = 0;
      for (String xmlTag : xmlDef.xmlDef[3]) {
       if (currentElement.equalsIgnoreCase(xmlTag)) {
        xmlDef.xmlDef[5][col] += reader.getText().trim(); 
       }
       col++;
      }
     }
    }
    
    
    // writing values to mapper output file
    // can remove this context.write
    context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
    
    // put record in HBase
    Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
    col = 0;
    for (String xmlTag : xmlDef.xmlDef[3]) {
     insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
     col++;
    }
    htable.put(insHBase);
  
  } catch (Exception e) {
    e.printStackTrace();
  }
 }
  @Override
  public void cleanup(Context context) 
   throws IOException, InterruptedException {
   htable.flushCommits();
   htable.close();
   
 }
}

In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:

package com.rishav.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.mahout.text.wikipedia.XmlInputFormat;

public final class commonTableDriver {
 public static void main(String[] args) throws Exception {
  commonRunJob(args[0], args[1]);
 }

 public static void commonRunJob(String input,String output) throws Exception  {
  Configuration conf = new Configuration();
  conf.set("xmlinput.start", "<response>");
  conf.set("xmlinput.end", "</response>");
  
  Job job = new Job(conf);
  job.setJarByClass(commonTableMapper.class);
  
  job.setInputFormatClass(XmlInputFormat.class);
  
  job.setMapperClass(commonTableMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setNumReduceTasks(0);
  
  FileInputFormat.setInputPaths(job, new Path(input));
  Path outPath = new Path(output);
  FileOutputFormat.setOutputPath(job, outPath);
  
  outPath.getFileSystem(conf).delete(outPath, true);
  job.waitForCompletion(true);
  
 }
}

We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:

hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1

Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.

After executing this job we can check data in HBase table xmlTest.

20 comments:

  1. Good one....May i know how you interact with Hbase from mapreduce...what jars should we have to include before and should i need to create a table before in HBase?

    Thank you for your tutorial

    ReplyDelete
  2. Rajesh,
    1. This M/R program interacts with HBase using HTable client, if you are doing M/R on hbase tables source/targets then you can refer to rishavrohitblog.blogspot.com/2013/10/mapreduce-on-hbase-table.html .
    2. You need to include hbase jars. you can use this command - export HADOOP_CLASSPATH=`hbase classpath`
    3. yes you need to create hbase tables before running the program.
    HTH.

    -Rishav

    ReplyDelete
  3. Found this error while executing above program. I have compiled all java files and created jar file in linux. Any inputs?

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/mahout/text/wikipedia/XmlInputFormat
    at com.rishav.hbase.commonTableDriver.commonRunJob(commonTableDriver.java:30)

    ReplyDelete
    Replies
    1. You have missed adding mahout jar

      Delete
    2. very nice tutorial and helpful for the beginners.i tried executing the program but getting the above same problem even after adding mahout integration jar file...please help me in finding the solution..

      thanks in advance
      Pooja B R

      Delete
    3. This comment has been removed by the author.

      Delete
  4. This comment has been removed by the author.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. How can we do parse the XML files which are not consistent structure;
    for eg;
    -----File 1-----
    < start >
    < profile >
    < name >Rishav < /name >
    < gender >M < /gender >
    < /profile >
    < interest >
    < blogging >Yes< /blogging >
    < technology >Yes< /technology >
    < /interest >
    < certified >Yes< /certified >
    < /start >
    ----------------

    -----File 2-----
    < start >
    < profile >
    < name >Awi< /name >
    < gender >M< /gender >
    < /profile >
    < interest >
    < technology >Yes< /technology >
    < /interest >
    < designation >Yes< /designation >
    < /start >
    ----------------
    Also, what should be the approach for storing such data in HBase?

    Thanx in advance

    ReplyDelete
    Replies
    1. You can parse nested XML with inconsistent structure. In your mapper you need to check for XML tags like interest and then go inside those to see what all tags like technology, blogging, etc are there.
      Approach for storing them in HBase depends entirely on your data access(read) pattern.

      Delete
  7. I've used following in case if table is not present:
    HBaseAdmin hba = new HBaseAdmin(conf);
    if(!hba.tableExists("SCTest")){
    System.out.println("Table not present");
    HTableDescriptor ht = new HTableDescriptor("SCTest");
    ht.addFamily( new HColumnDescriptor("RowID"));
    ht.addFamily( new HColumnDescriptor("ResponseID"));
    ht.addFamily( new HColumnDescriptor("Data"));
    ht.addFamily( new HColumnDescriptor("Status"));
    ht.addFamily( new HColumnDescriptor("StartDate"));
    ht.addFamily( new HColumnDescriptor("EndDate"));
    System.out.println( "connecting" );

    System.out.println( "Creating Table" );
    hba.createTable(ht);
    hba.close();
    System.out.println("Done......");
    }

    ReplyDelete
  8. I've used following code to see if table is present or not. If not it'll create one.

    protected void setup(Context context)
    throws IOException, InterruptedException {
    Configuration conf = HBaseConfiguration.create();
    HBaseAdmin hba = new HBaseAdmin(conf);
    if(!hba.tableExists("SCTest")){
    System.out.println("Table not present");
    HTableDescriptor ht = new HTableDescriptor("SCTest");
    ht.addFamily( new HColumnDescriptor("RowID"));
    ht.addFamily( new HColumnDescriptor("ResponseID"));
    ht.addFamily( new HColumnDescriptor("Data"));
    ht.addFamily( new HColumnDescriptor("Status"));
    ht.addFamily( new HColumnDescriptor("StartDate"));
    ht.addFamily( new HColumnDescriptor("EndDate"));
    System.out.println( "connecting" );

    System.out.println( "Creating Table" );
    hba.createTable(ht);
    hba.close();
    System.out.println("Done......");
    }
    htable = new HTable(conf, xmlDef.xmlDef[0][0]);
    htable.setAutoFlush(true);
    htable.setWriteBufferSize(1024 * 1024 * 12);
    }

    ReplyDelete
    Replies
    1. Hi Surendra,
      You can use the above code in demo applications but in production applications which are running 100s of maps daily this will be additional overhead, as table creation is just a one time activity.

      Delete
  9. This comment has been removed by the author.

    ReplyDelete
  10. Can we merge xml files(lots of small xml files) and then process using mahout xml input format

    ReplyDelete
    Replies
    1. Yes, you can. Try exploring HAR file storage which is a good choice for storing small files.

      Delete
  11. This comment has been removed by the author.

    ReplyDelete
  12. I have run the job and it has Completed successfully
    but hbase table is empty ???

    xml looks like that



    the rowid value
    resomseid 123
    192.168.15
    not good
    20150101
    20150505


    2015-09-30 04:00:47,792 INFO [Thread-10] mapred.LocalJobRunner (LocalJobRunner.java:run(395)) - Map task executor complete.
    2015-09-30 04:00:48,666 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - Job job_local215646258_0001 completed successfully
    2015-09-30 04:00:48,684 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1381)) - Counters: 18
    File System Counters
    FILE: Number of bytes read=4559
    FILE: Number of bytes written=256346
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    Map-Reduce Framework
    Map input records=0
    Map output records=0
    Input split bytes=221
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=23
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=249430016
    File Input Format Counters
    Bytes Read=2490
    File Output Format Counters
    Bytes Written=16


    ReplyDelete
    Replies
    1. looks like you are not giving properly formed XML. you can see the above counters too "Map input records=0" which clearly indicates that there was not even a single input record.

      Delete
  13. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/mahout/text/wikipedia/XmlInputFormat
    at com.rishav.xml.commonTableDriver.commonRunJob(commonTableDriver.java:31)
    at com.rishav.xml.commonTableDriver.main(commonTableDriver.java:20)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
    Caused by: java.lang.ClassNotFoundException: org.apache.mahout.text.wikipedia.XmlInputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 7 more


    I tried importing the jar of mahout still this error appears. Do you think is this a issue with eclipse version??

    ReplyDelete