Tuesday, 30 September 2014

What is Write Concern in MongoDB?

In MongoDB there are multiple guarantee levels available for reporting the success of a write operation, called Write Concerns. The strength of the write concerns determine the level of guarantee. A weak Write Concern has better performance at the cost of lesser guarantee, while a strong Write Concern has higher guarantee as clients wait to confirm the write operations.

MongoDB provides different levels of write concern to better address the specific needs of applications. Clients may adjust write concern to ensure that the most important operations persist successfully to an entire MongoDB deployment. For other less critical operations, clients can adjust the write concern to ensure faster performance rather than ensure persistence to the entire deployment.

Write Concern Levels

MongoDB has the following levels of conceptual write concern, listed from weakest to strongest:

With an unacknowledged write concern, MongoDB does not acknowledge the receipt of write operations. Unacknowledged is similar to errors ignored; however, drivers will attempt to receive and handle network errors when possible. The driver’s ability to detect network errors depends on the system’s networking configuration.
Write operation to a ``mongod`` instance with write concern of ``unacknowledged``. The client does not wait for any acknowledgment. 

With a receipt acknowledged write concern, the mongod confirms the receipt of the write operation. Acknowledged write concern allows clients to catch network, duplicate key, and other errors. This is default write concern.
Write operation to a ``mongod`` instance with write concern of ``acknowledged``. The client waits for acknowledgment of success or exception.

With a journaled write concern, the MongoDB acknowledges the write operation only after committing the data to the journal. This write concern ensures that MongoDB can recover the data following a shutdown or power interruption.
You must have journaling enabled to use this write concern.
Write operation to a ``mongod`` instance with write concern of ``journaled``. The ``mongod`` sends acknowledgment after it commits the write operation to the journal.
Replica Acknowledged
Replica sets present additional considerations with regards to write concern. The default write concern only requires acknowledgement from the primary. With replica acknowledged write concern, you can guarantee that the write operation propagates to additional members of the replica set.
Write operation to a replica set with write concern level of ``w:2`` or write to the primary and at least one secondary.
Hive UDF to get Latitude and Longitude

In my previous post I explained about Hive GenericUDF.
In this post I will give an example of Hive GenericUDF to get Latitude and Longitude of a given location using Google Geocoding API. Lets call this Hive function as GeoEncodeUDF. GeoEncodeUDF function takes a String location and returns an array of Float containing latitude and longitude.

For obtaining latitude and longitude information I am using Google geocode API which is available here http://maps.googleapis.com/maps/api/geocode/json?address=<address>, this returns a JSON objects containg matching places and their latitude and longitude. This might return multiple address but for sake of simplicity I am taking the first address's latitude and longitude. I have created a helper method getLatLng in class GeoLatLng which takes location string and returns latitude and longitude in an array of float. This class is given below -
package com.rishav.hadoop.hive.ql.udf.utils;
import java.io.IOException;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.httpclient.util.URIUtil;
import org.json.JSONException;
import org.json.JSONObject;
public class GeoLatLng {
public static Float[] getLatLng(String location) {
// String geoPoints = null;
Float[] geoPoints = new Float[] {null, null};
// if input is null return null array
if (location == null ) return null;
String loc_uri = null;
try {
loc_uri = URIUtil.encodeQuery("http://maps.googleapis.com/maps/api/geocode/json?address=" + location);
} catch (URIException e) {
System.err.println("ERROR: URI encoding failed");
// Create an instance of HttpClient.
HttpClient client = new HttpClient();
// Create a method instance.
GetMethod method = new GetMethod(loc_uri);
// Provide custom retry handler is necessary
new DefaultHttpMethodRetryHandler(3, false));
try {
// Execute the method.
int statusCode = client.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
System.err.println("Method failed: " + method.getStatusLine());
// Read the response body.
byte[] responseBody = method.getResponseBody();
String responseStr = new String(responseBody);
JSONObject response = new JSONObject(responseStr);
JSONObject latlng = response.getJSONArray("results")
try {
geoPoints[0] = new Float(latlng.get("lat").toString());
geoPoints[1] = new Float(latlng.get("lng").toString());
} catch (Exception e) {
geoPoints[0] = null;
geoPoints[1] = null;
return geoPoints;
} catch (HttpException e) {
System.err.println("Fatal protocol violation: " + e.getMessage());
} catch (IOException e) {
System.err.println("Fatal transport error: " + e.getMessage());
} catch (JSONException e) {
return geoPoints;
} finally {
// Release the connection.
return geoPoints;
The GenericUDF is GeoEncodeUDF
package com.rishav.hadoop.hive.ql.udf.generic;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.FloatWritable;
import com.rishav.hadoop.hive.ql.udf.utils.GeoLatLng;
@Description(name = "GeoEncodeUDF", value = "Get Lat-Lng", extended = "fetches location co-ordinates for given location from Google geocode Api and returns an ARRAY of 2 floats [lat,lng]")
@UDFType(deterministic = true)
public class GeoEncodeUDF extends GenericUDF {
private ArrayList<FloatWritable> result;
// Verify the input is of the required type.
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
// Exactly one input argument
if( arguments.length != 1 ) {
throw new UDFArgumentLengthException(GeoEncodeUDF.class.getSimpleName() + " accepts exactly one argument.");
// Is the input a String
if (((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING ) {
throw new UDFArgumentTypeException(0,"The single argument to " +GeoEncodeUDF.class.getSimpleName() + " should be String but " + arguments[0].getTypeName() + " is found");
return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// Should be exactly one argument
if( arguments.length!=1 ) {
return null;
// If passed a null, return a null
if( arguments[0].get()==null ) {
return null;
// System.out.println("arguments[0].toString() is " + arguments[0].toString());
// System.out.println("arguments[0] is " + arguments[0].get());
Float[] tmpLatLng = GeoLatLng.getLatLng(arguments[0].get().toString());
// System.out.println("LatLong are " + tmpLatLng[0] + "#" + tmpLatLng[1]);
ArrayList<FloatWritable> result = new ArrayList<FloatWritable>();
result.add(new FloatWritable(tmpLatLng[0]));
result.add(new FloatWritable(tmpLatLng[1]));
return result;
// returns the string that will be returned when explain is used
public String getDisplayString(String[] arg0) {
return new String("geo_points");
I have overwritten initialize(), evaluate() and getDisplayString() methods which I have already described in my previous post.

Now to use this UDF in Hive we need to create a jar file of this UDF and add it to Hive. The commands to add this UDF to Hive are -
ADD JAR /path/to/HiveUDF.jar;
CREATE TEMPORARY FUNCTION geo_points AS 'com.rishav.hadoop.hive.ql.udf.generic.GeoEncodeUDF';
Now we can use geo_points function on any table having address string like this -
hive> select geo_points("india") from test.x limit 1;
This HQL will return an array containing lat-lng, to get them as separate columns use -
hive> select latlng[0], latlng[1] FROM (select geo_points("india") as latlng from test.x) tmp limit 1;
20.593683    78.96288

Introduction to Hive UDFs

Apache Hive comes with a set of pre-defined User Defined Functions (UDFs). A complete listing of Hive UDFs is available here. Some common UDFs are unix_timestamp(), to_date(string timestamp), concat(string|binary A, string|binary B...), etc. However sometimes custom UDF is needed to solve specific problems.

In this post I will go through the process of creating custom UDFs.

Difference between UDF and GenericUDF
Hive UDFs are written in Java. In order to create a Hive UDF you need to derive from one of two classes UDF or GenericUDF. GenericUDFis bit complex to develop compared to UDF but it offers better performance and it supports all non-primitive parameters as input parameters and return types.

For writing custom UDFs by extending GenericUDF we need to overwrite 3 methods: initialize(), evaluate() and getDisplayString().

This method only gets called once per JVM at the beginning to initilize the UDF. initilialize() is used to assert and validate the number and type of parameters that a UDF takes and the type of argument it returns. It also returns an ObjectInspector corresponding to the return type of the UDF.

This method is called once for every row of data being processed. Here the actual logic for transformation/processing of each row is written. It will return an object containing the result of processing logic. 

A simple method for returning the display string for the UDF when explain is used.

Apart from these we can have these Annotations also -
  • @UDFType(deterministic = true)
A deterministic UDF is one which always gives the same result when passed the same parameters. An example of such UDF are length(string input), regexp_replace(string initial_string, string pattern, string replacement), etc. A non-deterministic UDF, on the other hand can return different result for the same set of parameters. For example, unix_timestamp() returns the current timestamp using the default time zone. Therefore, when unix_timestamp() is invoked with the same parameters (no parameters) at different times, different results are obtained, making it non-deterministic. This annotation allows Hive to perform some optimization if the UDF is deterministic.
  • @Description(name="my_udf", value="This will be the result returned by explain statement.", extended="This will be result returned by the explain extended statement.")
This annotation tells Hive the name of your UDF. It will also be used to populate the result of queries like `DESCRIBE FUNCTION MY_UDF` or `DESCRIBE FUNCTION EXTENDED MY_UDF`.

In my next post I will give an example of GenericUDF to latitude and longitude of a location.

Tuesday, 16 September 2014

HBase: MapReduce On Multiple Input Table

Starting with version 0.94.5 HBase supports reading multiple tables as input to MapReduce jobs using MultiTableInputFormat class.
In this post I am giving an example of MapReduce job which reads from two HBase tables performs some aggregation on one table and merges (SQL UNION ALL operation) it with the content of second table and stores the result in an output table.

The first table is 'storeSales' table and it has store-wise sales for each date. The create statements are -

create 'storeSales', 'cf1'
put 'storeSales', '20130101#1', 'cf1:sSales', '100'
put 'storeSales', '20130101#2', 'cf1:sSales', '110'
put 'storeSales', '20130102#1', 'cf1:sSales', '200'
put 'storeSales', '20130102#2', 'cf1:sSales', '210'

The second table is 'onlineSales' table and it has online sale for each date. The create statements are -
create 'onlineSales', 'cf2'
put 'onlineSales', '20130101', 'cf2:oSales', '400'
put 'onlineSales', '20130102', 'cf2:oSales', '130'

Using a MapReduce job I am going to merge aggregated (at date level) store sales with online sales.
Lets create a output table for the same -
create 'totalSales', 'cf1'

The mapper class for this job is -
package com.rishav.hbase.union;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class UnionMapper extends TableMapper<Text, IntWritable> {
private static byte[] storeSalesTable = Bytes.toBytes("storeSales");
private static byte[] onlineSalesTable = Bytes.toBytes("onlineSales");
byte[] sales;
String storeSales;
Integer sSales;
String onlineSales;
Integer oSales;
Text mapperKey;
IntWritable mapperValue;
public void map(ImmutableBytesWritable rowKey, Result columns, Context context) {
// get table name
TableSplit currentSplit = (TableSplit)context.getInputSplit();
byte[] tableName = currentSplit.getTableName();
try {
if (Arrays.equals(tableName, storeSalesTable)) {
String date = new String(rowKey.get()).split("#")[0];
sales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sSales"));
storeSales = new String(sales);
sSales = new Integer(storeSales);
mapperKey = new Text("s#" + date);
mapperValue = new IntWritable(sSales);
context.write(mapperKey, mapperValue);
} else if (Arrays.equals(tableName, onlineSalesTable)) {
String date = new String(rowKey.get());
sales = columns.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("oSales"));
onlineSales = new String(sales);
Integer oSales = new Integer(onlineSales);
mapperKey = new Text("o#"+date);
mapperValue = new IntWritable(oSales);
context.write(mapperKey, mapperValue);
} catch (Exception e) {
// TODO : exception handling logic

Note that in mapper I am getting table name of current split and using different context.write based on table name. If your source tables have rowkeys with different prefixes you can use that also for different context.write logic.

The reducer class for this job is -
package com.rishav.hbase.union;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class UnionReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
if (key.toString().startsWith("s")) {
Integer dayStoreSales = 0;
for (IntWritable storeSale : values) {
dayStoreSales = dayStoreSales + new Integer(storeSale.toString());
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("tSales"), Bytes.toBytes(dayStoreSales));
try {
context.write(null, put);
} catch (IOException e) {
// TODO Auto-generated catch block
} catch (InterruptedException e) {
// TODO Auto-generated catch block
} else {
Integer dayStoreSales = 0;
for (IntWritable storeSale : values) {
dayStoreSales = dayStoreSales + new Integer(storeSale.toString());
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("tSales"), Bytes.toBytes(dayStoreSales));
try {
context.write(null, put);
} catch (IOException e) {
// TODO Auto-generated catch block
} catch (InterruptedException e) {
// TODO Auto-generated catch block

Based on intermediate key value I am using aggregation in reducer.

Finally the driver class for this job is
package com.rishav.hbase.union;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
public class UnionJob extends Configured implements Tool {
public int run(String[] arg0) throws Exception {
List<Scan> scans = new ArrayList<Scan>();
Scan scan1 = new Scan();
scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("storeSales"));
Scan scan2 = new Scan();
scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("onlineSales"));
Configuration conf = new Configuration();
Job job = new Job(conf);
return 0;
public static void main(String[] args) throws Exception {
UnionJob runJob = new UnionJob();
In the driver there are 2 HBase Scan for 2 input tables and I am passing these scans in a list to TableMapReduceUtil.initTableMapperJob method.

Package jar file (to hbase-union.jar) and execute below commands to invoke MapReduce job -
export HADOOP_CLASSPATH=`hbase classpath`
hadoop jar hbase-union.jar com.rishav.hbase.union.UnionJob

Once the job is complete use HBase shell to verify output results -
hbase(main):034:0> scan 'totalSales'
ROW                                        COLUMN+CELL                                                                                                               
 o#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x90                                                        
 o#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\x82                                                        
 s#20130101                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x00\xD2                                                        
 s#20130102                                column=cf1:tSales, timestamp=1410848221034, value=\x00\x00\x01\x9A                                                        
4 row(s) in 0.0410 seconds
hbase(main):035:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x90".to_java_bytes)
=> 400
hbase(main):036:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\x82".to_java_bytes)
=> 130
hbase(main):037:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x00\xD2".to_java_bytes)
=> 210
hbase(main):038:0> org.apache.hadoop.hbase.util.Bytes.toInt("\x00\x00\x01\x9A".to_java_bytes)
=> 410

MultiTableInputFormat can be used for doing HBase table joins too, I shall try that some time.