XML parsing with Mapreduce
Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase. To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar). The xml file was having below structure:
<Response>
<RowID>....</RowID>
<ResponseID>....</ResponseID>
<IPAddress>....</IPAddress>
<Status>....</Status>
<StartDate>....</StartDate>
<EndDate>....</EndDate>
</Response>
To hold this xml record we created xmlDef class:
package com.rishav.xml;
public class xmlDef {
public static String xmlDef[][] = new String[][]{
{"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"}, //HBase table name
{"Y", "N", "N","N","N","N"}, //is column a key column?
{"cf1", "cf1","cf2","cf2","cf2","cf2"}, //column family
{"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
{"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
{"", "", "", "", "", ""} // place holder for xml value
};
}
The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader. The code for mapper class is given below:
package com.rishav.hbase;
import com.rishav.xml.xmlDef;
import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class commonTableMapper
extends org.apache.hadoop.mapreduce.Mapper {
private HTable htable;
// create HBase connection
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
htable = new HTable(conf, xmlDef.xmlDef[0][0]);
htable.setAutoFlush(true);
htable.setWriteBufferSize(1024 * 1024 * 12);
}
@Override
public void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String currLine = value.toString();
try {
XMLStreamReader reader = XMLInputFactory.newInstance()
.createXMLStreamReader(
new ByteArrayInputStream(currLine.getBytes()));
String currentElement = "";
int col = 0;
// initialize all xml value to blank string
for (String xmlTag : xmlDef.xmlDef[3]) {
xmlDef.xmlDef[5][col] = "";
col++;
}
// read xml tags and store values in xmlDef
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case START_ELEMENT:
currentElement = reader.getLocalName();
break;
case CHARACTERS:
col = 0;
for (String xmlTag : xmlDef.xmlDef[3]) {
if (currentElement.equalsIgnoreCase(xmlTag)) {
xmlDef.xmlDef[5][col] += reader.getText().trim();
}
col++;
}
}
}
// writing values to mapper output file
// can remove this context.write
context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
// put record in HBase
Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
col = 0;
for (String xmlTag : xmlDef.xmlDef[3]) {
insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
col++;
}
htable.put(insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
htable.flushCommits();
htable.close();
}
}
In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end . The driver class is:
package com.rishav.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.mahout.text.wikipedia.XmlInputFormat;
public final class commonTableDriver {
public static void main(String[] args) throws Exception {
commonRunJob(args[0], args[1]);
}
public static void commonRunJob(String input,String output) throws Exception {
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<response>");
conf.set("xmlinput.end", "</response>");
Job job = new Job(conf);
job.setJarByClass(commonTableMapper.class);
job.setInputFormatClass(XmlInputFormat.class);
job.setMapperClass(commonTableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.waitForCompletion(true);
}
}
We have exported the project from eclipse in jar package. Lets call this hbasetest.jar To run this jar in hadoop use below command:
hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1
Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.
After executing this job we can check data in HBase table xmlTest.
Good one....May i know how you interact with Hbase from mapreduce...what jars should we have to include before and should i need to create a table before in HBase?
ReplyDeleteThank you for your tutorial
Rajesh,
ReplyDelete1. This M/R program interacts with HBase using HTable client, if you are doing M/R on hbase tables source/targets then you can refer to rishavrohitblog.blogspot.com/2013/10/mapreduce-on-hbase-table.html .
2. You need to include hbase jars. you can use this command - export HADOOP_CLASSPATH=`hbase classpath`
3. yes you need to create hbase tables before running the program.
HTH.
-Rishav
Found this error while executing above program. I have compiled all java files and created jar file in linux. Any inputs?
ReplyDeleteException in thread "main" java.lang.NoClassDefFoundError: org/apache/mahout/text/wikipedia/XmlInputFormat
at com.rishav.hbase.commonTableDriver.commonRunJob(commonTableDriver.java:30)
You have missed adding mahout jar
Deletevery nice tutorial and helpful for the beginners.i tried executing the program but getting the above same problem even after adding mahout integration jar file...please help me in finding the solution..
Deletethanks in advance
Pooja B R
This comment has been removed by the author.
DeleteThis comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHow can we do parse the XML files which are not consistent structure;
ReplyDeletefor eg;
-----File 1-----
< start >
< profile >
< name >Rishav < /name >
< gender >M < /gender >
< /profile >
< interest >
< blogging >Yes< /blogging >
< technology >Yes< /technology >
< /interest >
< certified >Yes< /certified >
< /start >
----------------
-----File 2-----
< start >
< profile >
< name >Awi< /name >
< gender >M< /gender >
< /profile >
< interest >
< technology >Yes< /technology >
< /interest >
< designation >Yes< /designation >
< /start >
----------------
Also, what should be the approach for storing such data in HBase?
Thanx in advance
You can parse nested XML with inconsistent structure. In your mapper you need to check for XML tags like interest and then go inside those to see what all tags like technology, blogging, etc are there.
DeleteApproach for storing them in HBase depends entirely on your data access(read) pattern.
I've used following in case if table is not present:
ReplyDeleteHBaseAdmin hba = new HBaseAdmin(conf);
if(!hba.tableExists("SCTest")){
System.out.println("Table not present");
HTableDescriptor ht = new HTableDescriptor("SCTest");
ht.addFamily( new HColumnDescriptor("RowID"));
ht.addFamily( new HColumnDescriptor("ResponseID"));
ht.addFamily( new HColumnDescriptor("Data"));
ht.addFamily( new HColumnDescriptor("Status"));
ht.addFamily( new HColumnDescriptor("StartDate"));
ht.addFamily( new HColumnDescriptor("EndDate"));
System.out.println( "connecting" );
System.out.println( "Creating Table" );
hba.createTable(ht);
hba.close();
System.out.println("Done......");
}
I've used following code to see if table is present or not. If not it'll create one.
ReplyDeleteprotected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
HBaseAdmin hba = new HBaseAdmin(conf);
if(!hba.tableExists("SCTest")){
System.out.println("Table not present");
HTableDescriptor ht = new HTableDescriptor("SCTest");
ht.addFamily( new HColumnDescriptor("RowID"));
ht.addFamily( new HColumnDescriptor("ResponseID"));
ht.addFamily( new HColumnDescriptor("Data"));
ht.addFamily( new HColumnDescriptor("Status"));
ht.addFamily( new HColumnDescriptor("StartDate"));
ht.addFamily( new HColumnDescriptor("EndDate"));
System.out.println( "connecting" );
System.out.println( "Creating Table" );
hba.createTable(ht);
hba.close();
System.out.println("Done......");
}
htable = new HTable(conf, xmlDef.xmlDef[0][0]);
htable.setAutoFlush(true);
htable.setWriteBufferSize(1024 * 1024 * 12);
}
Hi Surendra,
DeleteYou can use the above code in demo applications but in production applications which are running 100s of maps daily this will be additional overhead, as table creation is just a one time activity.
This comment has been removed by the author.
ReplyDeleteCan we merge xml files(lots of small xml files) and then process using mahout xml input format
ReplyDeleteYes, you can. Try exploring HAR file storage which is a good choice for storing small files.
DeleteThis comment has been removed by the author.
ReplyDeleteI have run the job and it has Completed successfully
ReplyDeletebut hbase table is empty ???
xml looks like that
the rowid value
resomseid 123
192.168.15
not good
20150101
20150505
2015-09-30 04:00:47,792 INFO [Thread-10] mapred.LocalJobRunner (LocalJobRunner.java:run(395)) - Map task executor complete.
2015-09-30 04:00:48,666 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1374)) - Job job_local215646258_0001 completed successfully
2015-09-30 04:00:48,684 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1381)) - Counters: 18
File System Counters
FILE: Number of bytes read=4559
FILE: Number of bytes written=256346
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=0
Map output records=0
Input split bytes=221
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=23
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=249430016
File Input Format Counters
Bytes Read=2490
File Output Format Counters
Bytes Written=16
looks like you are not giving properly formed XML. you can see the above counters too "Map input records=0" which clearly indicates that there was not even a single input record.
DeleteException in thread "main" java.lang.NoClassDefFoundError: org/apache/mahout/text/wikipedia/XmlInputFormat
ReplyDeleteat com.rishav.xml.commonTableDriver.commonRunJob(commonTableDriver.java:31)
at com.rishav.xml.commonTableDriver.main(commonTableDriver.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.lang.ClassNotFoundException: org.apache.mahout.text.wikipedia.XmlInputFormat
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more
I tried importing the jar of mahout still this error appears. Do you think is this a issue with eclipse version??