XML parsing with Mapreduce
XML parsing with Mapreduce
Recently I worked with XML data stored in HDFS and wrote a map reduce code to write data to HBase.
To work with xml type input data we can use XmlInputFormat class from mahout (No need to have mahout installed, we just need the class from mahout-integration jar).
The xml file was having below structure:
<Response>
<RowID>....</RowID>
<ResponseID>....</ResponseID>
<IPAddress>....</IPAddress>
<Status>....</Status>
<StartDate>....</StartDate>
<EndDate>....</EndDate>
</Response>
To hold this xml record we created xmlDef class:
package com.rishav.xml;
public class xmlDef {
public static String xmlDef[][] = new String[][]{
{"xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest", "xmlTest"}, //HBase table name
{"Y", "N", "N","N","N","N"}, //is column a key column?
{"cf1", "cf1","cf2","cf2","cf2","cf2"}, //column family
{"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //column name in HBase
{"RowID", "ResponseID", "IPAddress", "Status", "StartDate", "EndDate"}, //xml tag
{"", "", "", "", "", ""} // place holder for xml value
};
}
The mapper class is configured to read complete xml record enclosed in tags and these tags are defined in driver class. Each map reads one xml record at a time as inpur and we can parse this in a normal manner. In this case we have used XMLStreamReader.
The code for mapper class is given below:
package com.rishav.hbase;
import com.rishav.xml.xmlDef;
import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class commonTableMapper
extends org.apache.hadoop.mapreduce.Mapper {
private HTable htable;
// create HBase connection
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
htable = new HTable(conf, xmlDef.xmlDef[0][0]);
htable.setAutoFlush(true);
htable.setWriteBufferSize(1024 * 1024 * 12);
}
@Override
public void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String currLine = value.toString();
try {
XMLStreamReader reader = XMLInputFactory.newInstance()
.createXMLStreamReader(
new ByteArrayInputStream(currLine.getBytes()));
String currentElement = "";
int col = 0;
// initialize all xml value to blank string
for (String xmlTag : xmlDef.xmlDef[3]) {
xmlDef.xmlDef[5][col] = "";
col++;
}
// read xml tags and store values in xmlDef
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case START_ELEMENT:
currentElement = reader.getLocalName();
break;
case CHARACTERS:
col = 0;
for (String xmlTag : xmlDef.xmlDef[3]) {
if (currentElement.equalsIgnoreCase(xmlTag)) {
xmlDef.xmlDef[5][col] += reader.getText().trim();
}
col++;
}
}
}
// writing values to mapper output file
// can remove this context.write
context.write(xmlDef.xmlDef[5][0]+"#"+xmlDef.xmlDef[5][1]+"#"+xmlDef.xmlDef[5][2]+"#"+xmlDef.xmlDef[5][3]+"#"+xmlDef.xmlDef[5][4],1);
// put record in HBase
Put insHBase = new Put(Bytes.toBytes(xmlDef.xmlDef[5][0]));
col = 0;
for (String xmlTag : xmlDef.xmlDef[3]) {
insHBase.add(Bytes.toBytes(xmlDef.xmlDef[2][col]), Bytes.toBytes(xmlDef.xmlDef[3][col]), Bytes.toBytes(xmlDef.xmlDef[5][col]));
col++;
}
htable.put(insHBase);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void cleanup(Context context)
throws IOException, InterruptedException {
htable.flushCommits();
htable.close();
}
}
In the driver we set the boundries for xml record using xmlinput.start and xmlinput.end .
The driver class is:
package com.rishav.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.mahout.text.wikipedia.XmlInputFormat;
public final class commonTableDriver {
public static void main(String[] args) throws Exception {
commonRunJob(args[0], args[1]);
}
public static void commonRunJob(String input,String output) throws Exception {
Configuration conf = new Configuration();
conf.set("xmlinput.start", "<response>");
conf.set("xmlinput.end", "</response>");
Job job = new Job(conf);
job.setJarByClass(commonTableMapper.class);
job.setInputFormatClass(XmlInputFormat.class);
job.setMapperClass(commonTableMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.waitForCompletion(true);
}
}
We have exported the project from eclipse in jar package. Lets call this hbasetest.jar
To run this jar in hadoop use below command:
hadoop jar hbasetest.jar com.rishav.hbase.commonTableDriver input/sample.xml out1
Note: Before running this map reduce job create xmlTest table in HBase with column families cf1 and cf2.
After executing this job we can check data in HBase table xmlTest.