In this post I will give an example of Hive GenericUDF to get Latitude and Longitude of a given location using Google Geocoding API. Lets call this Hive function as GeoEncodeUDF. GeoEncodeUDF function takes a String location and returns an array of Float containing latitude and longitude.
For obtaining latitude and longitude information I am using Google geocode API which is available here http://maps.googleapis.com/maps/api/geocode/json?address=<address>, this returns a JSON objects containg matching places and their latitude and longitude. This might return multiple address but for sake of simplicity I am taking the first address's latitude and longitude. I have created a helper method getLatLng in class GeoLatLng which takes location string and returns latitude and longitude in an array of float. This class is given below -
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rishav.hadoop.hive.ql.udf.utils; | |
import java.io.IOException; | |
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; | |
import org.apache.commons.httpclient.HttpClient; | |
import org.apache.commons.httpclient.HttpException; | |
import org.apache.commons.httpclient.HttpStatus; | |
import org.apache.commons.httpclient.URIException; | |
import org.apache.commons.httpclient.methods.GetMethod; | |
import org.apache.commons.httpclient.params.HttpMethodParams; | |
import org.apache.commons.httpclient.util.URIUtil; | |
import org.json.JSONException; | |
import org.json.JSONObject; | |
public class GeoLatLng { | |
public static Float[] getLatLng(String location) { | |
// String geoPoints = null; | |
Float[] geoPoints = new Float[] {null, null}; | |
// if input is null return null array | |
if (location == null ) return null; | |
String loc_uri = null; | |
try { | |
loc_uri = URIUtil.encodeQuery("http://maps.googleapis.com/maps/api/geocode/json?address=" + location); | |
} catch (URIException e) { | |
System.err.println("ERROR: URI encoding failed"); | |
e.printStackTrace(); | |
System.exit(1); | |
} | |
// Create an instance of HttpClient. | |
HttpClient client = new HttpClient(); | |
// Create a method instance. | |
GetMethod method = new GetMethod(loc_uri); | |
// Provide custom retry handler is necessary | |
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, | |
new DefaultHttpMethodRetryHandler(3, false)); | |
try { | |
// Execute the method. | |
int statusCode = client.executeMethod(method); | |
if (statusCode != HttpStatus.SC_OK) { | |
System.err.println("Method failed: " + method.getStatusLine()); | |
} | |
// Read the response body. | |
byte[] responseBody = method.getResponseBody(); | |
String responseStr = new String(responseBody); | |
JSONObject response = new JSONObject(responseStr); | |
JSONObject latlng = response.getJSONArray("results") | |
.getJSONObject(0).getJSONObject("geometry") | |
.getJSONObject("location"); | |
try { | |
geoPoints[0] = new Float(latlng.get("lat").toString()); | |
geoPoints[1] = new Float(latlng.get("lng").toString()); | |
} catch (Exception e) { | |
geoPoints[0] = null; | |
geoPoints[1] = null; | |
} | |
return geoPoints; | |
} catch (HttpException e) { | |
System.err.println("Fatal protocol violation: " + e.getMessage()); | |
e.printStackTrace(); | |
} catch (IOException e) { | |
System.err.println("Fatal transport error: " + e.getMessage()); | |
e.printStackTrace(); | |
} catch (JSONException e) { | |
return geoPoints; | |
} finally { | |
// Release the connection. | |
method.releaseConnection(); | |
} | |
return geoPoints; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rishav.hadoop.hive.ql.udf.generic; | |
import java.util.ArrayList; | |
import org.apache.hadoop.hive.ql.exec.Description; | |
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; | |
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; | |
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; | |
import org.apache.hadoop.hive.ql.metadata.HiveException; | |
import org.apache.hadoop.hive.ql.udf.UDFType; | |
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; | |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; | |
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; | |
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; | |
import org.apache.hadoop.io.FloatWritable; | |
import com.rishav.hadoop.hive.ql.udf.utils.GeoLatLng; | |
@Description(name = "GeoEncodeUDF", value = "Get Lat-Lng", extended = "fetches location co-ordinates for given location from Google geocode Api and returns an ARRAY of 2 floats [lat,lng]") | |
@UDFType(deterministic = true) | |
public class GeoEncodeUDF extends GenericUDF { | |
private ArrayList<FloatWritable> result; | |
// Verify the input is of the required type. | |
@Override | |
public ObjectInspector initialize(ObjectInspector[] arguments) | |
throws UDFArgumentException { | |
// Exactly one input argument | |
if( arguments.length != 1 ) { | |
throw new UDFArgumentLengthException(GeoEncodeUDF.class.getSimpleName() + " accepts exactly one argument."); | |
} | |
// Is the input a String | |
if (((PrimitiveObjectInspector)arguments[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING ) { | |
throw new UDFArgumentTypeException(0,"The single argument to " +GeoEncodeUDF.class.getSimpleName() + " should be String but " + arguments[0].getTypeName() + " is found"); | |
} | |
return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableFloatObjectInspector); | |
} | |
@Override | |
public Object evaluate(DeferredObject[] arguments) throws HiveException { | |
// Should be exactly one argument | |
if( arguments.length!=1 ) { | |
return null; | |
} | |
// If passed a null, return a null | |
if( arguments[0].get()==null ) { | |
return null; | |
} | |
// System.out.println("arguments[0].toString() is " + arguments[0].toString()); | |
// System.out.println("arguments[0] is " + arguments[0].get()); | |
Float[] tmpLatLng = GeoLatLng.getLatLng(arguments[0].get().toString()); | |
// System.out.println("LatLong are " + tmpLatLng[0] + "#" + tmpLatLng[1]); | |
ArrayList<FloatWritable> result = new ArrayList<FloatWritable>(); | |
result.add(new FloatWritable(tmpLatLng[0])); | |
result.add(new FloatWritable(tmpLatLng[1])); | |
return result; | |
} | |
// returns the string that will be returned when explain is used | |
@Override | |
public String getDisplayString(String[] arg0) { | |
return new String("geo_points"); | |
} | |
} |
Now to use this UDF in Hive we need to create a jar file of this UDF and add it to Hive. The commands to add this UDF to Hive are -
ADD JAR /path/to/HiveUDF.jar;Now we can use geo_points function on any table having address string like this -
CREATE TEMPORARY FUNCTION geo_points AS 'com.rishav.hadoop.hive.ql.udf.generic.GeoEncodeUDF';
hive> select geo_points("india") from test.x limit 1;This HQL will return an array containing lat-lng, to get them as separate columns use -
[20.593683,78.96288]
hive> select latlng[0], latlng[1] FROM (select geo_points("india") as latlng from test.x) tmp limit 1;
20.593683 78.96288
No comments:
Post a Comment