Tuesday, 30 September 2014

Hive UDF to get Latitude and Longitude

In my previous post I explained about Hive GenericUDF.
In this post I will give an example of Hive GenericUDF to get Latitude and Longitude of a given location using Google Geocoding API. Lets call this Hive function as GeoEncodeUDF. GeoEncodeUDF function takes a String location and returns an array of Float containing latitude and longitude.


For obtaining latitude and longitude information I am using Google geocode API which is available here http://maps.googleapis.com/maps/api/geocode/json?address=<address>, this returns a JSON objects containg matching places and their latitude and longitude. This might return multiple address but for sake of simplicity I am taking the first address's latitude and longitude. I have created a helper method getLatLng in class GeoLatLng which takes location string and returns latitude and longitude in an array of float. This class is given below -
package com.rishav.hadoop.hive.ql.udf.utils;
import java.io.IOException;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.httpclient.util.URIUtil;
import org.json.JSONException;
import org.json.JSONObject;
public class GeoLatLng {
public static Float[] getLatLng(String location) {
// String geoPoints = null;
Float[] geoPoints = new Float[] {null, null};
// if input is null return null array
if (location == null ) return null;
String loc_uri = null;
try {
loc_uri = URIUtil.encodeQuery("http://maps.googleapis.com/maps/api/geocode/json?address=" + location);
} catch (URIException e) {
System.err.println("ERROR: URI encoding failed");
e.printStackTrace();
System.exit(1);
}
// Create an instance of HttpClient.
HttpClient client = new HttpClient();
// Create a method instance.
GetMethod method = new GetMethod(loc_uri);
// Provide custom retry handler is necessary
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(3, false));
try {
// Execute the method.
int statusCode = client.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
System.err.println("Method failed: " + method.getStatusLine());
}
// Read the response body.
byte[] responseBody = method.getResponseBody();
String responseStr = new String(responseBody);
JSONObject response = new JSONObject(responseStr);
JSONObject latlng = response.getJSONArray("results")
.getJSONObject(0).getJSONObject("geometry")
.getJSONObject("location");
try {
geoPoints[0] = new Float(latlng.get("lat").toString());
geoPoints[1] = new Float(latlng.get("lng").toString());
} catch (Exception e) {
geoPoints[0] = null;
geoPoints[1] = null;
}
return geoPoints;
} catch (HttpException e) {
System.err.println("Fatal protocol violation: " + e.getMessage());
e.printStackTrace();
} catch (IOException e) {
System.err.println("Fatal transport error: " + e.getMessage());
e.printStackTrace();
} catch (JSONException e) {
return geoPoints;
} finally {
// Release the connection.
method.releaseConnection();
}
return geoPoints;
}
}
view raw GeoLatLng.java hosted with ❤ by GitHub
The GenericUDF is GeoEncodeUDF
package com.rishav.hadoop.hive.ql.udf.generic;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.FloatWritable;
import com.rishav.hadoop.hive.ql.udf.utils.GeoLatLng;
@Description(name = "GeoEncodeUDF", value = "Get Lat-Lng", extended = "fetches location co-ordinates for given location from Google geocode Api and returns an ARRAY of 2 floats [lat,lng]")
@UDFType(deterministic = true)
public class GeoEncodeUDF extends GenericUDF {
private ArrayList<FloatWritable> result;
// Verify the input is of the required type.
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
// Exactly one input argument
if( arguments.length != 1 ) {
throw new UDFArgumentLengthException(GeoEncodeUDF.class.getSimpleName() + " accepts exactly one argument.");
}
// Is the input a String
if (((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING ) {
throw new UDFArgumentTypeException(0,"The single argument to " +GeoEncodeUDF.class.getSimpleName() + " should be String but " + arguments[0].getTypeName() + " is found");
}
return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// Should be exactly one argument
if( arguments.length!=1 ) {
return null;
}
// If passed a null, return a null
if( arguments[0].get()==null ) {
return null;
}
// System.out.println("arguments[0].toString() is " + arguments[0].toString());
// System.out.println("arguments[0] is " + arguments[0].get());
Float[] tmpLatLng = GeoLatLng.getLatLng(arguments[0].get().toString());
// System.out.println("LatLong are " + tmpLatLng[0] + "#" + tmpLatLng[1]);
ArrayList<FloatWritable> result = new ArrayList<FloatWritable>();
result.add(new FloatWritable(tmpLatLng[0]));
result.add(new FloatWritable(tmpLatLng[1]));
return result;
}
// returns the string that will be returned when explain is used
@Override
public String getDisplayString(String[] arg0) {
return new String("geo_points");
}
}
I have overwritten initialize(), evaluate() and getDisplayString() methods which I have already described in my previous post.

Now to use this UDF in Hive we need to create a jar file of this UDF and add it to Hive. The commands to add this UDF to Hive are -
ADD JAR /path/to/HiveUDF.jar;
CREATE TEMPORARY FUNCTION geo_points AS 'com.rishav.hadoop.hive.ql.udf.generic.GeoEncodeUDF';
Now we can use geo_points function on any table having address string like this -
hive> select geo_points("india") from test.x limit 1;
[20.593683,78.96288]
This HQL will return an array containing lat-lng, to get them as separate columns use -
hive> select latlng[0], latlng[1] FROM (select geo_points("india") as latlng from test.x) tmp limit 1;
20.593683    78.96288

No comments:

Post a Comment