Monday, 14 October 2013

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

Introduction To Hive's Partitioning

A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can over come this issue by implementing partitions in Hive. Hive makes it very easy to implement partition by using automatic partition scheme when the table is created. In Hive’s implementation of partitioning, data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query. Today we are going to see how we can load a csv file to a partitioned table. For this we are going to use Airline OnTime dataset. Loading csv data to a partitioned table involves below mentioned two steps:
  1. Load csv file to a non-partitioned table.
  2. Load non-partitioned table data to partitioned table.
We shall partition Airline OnTime data based on two columns - year and month. 1. Load csv file to a non-partitioned table. We shall create a staging table to hold data from csv file. The hive commands to create schema and table are given below:
create schema stg_airline;
use stg_airline;

create table stg_airline.onTimePerf
(Year INT ,
Month INT ,
DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
After creating the table load the csv data (note - delete header from csv) to table using below hive command:
LOAD DATA LOCAL INPATH "1987.csv" OVERWRITE INTO TABLE stg_airline.onTimePerf;
2. Load non-partitioned table data to partitioned table. We shall now create a table partitioned by year and month columns, the commands for this are given below:
create schema airline;
use airline;

create table airline.onTimePerf
(DayofMonth INT ,
DayOfWeek INT ,
DepTime INT ,
CRSDepTime INT ,
ArrTime INT ,
CRSArrTime INT ,
UniqueCarrier STRING ,
FlightNum INT ,
TailNum STRING ,
ActualElapsedTime INT ,
CRSElapsedTime INT ,
AirTime STRING ,
ArrDelay INT ,
DepDelay INT ,
Origin STRING ,
Dest STRING ,
Distance INT ,
TaxiIn STRING ,
TaxiOut STRING ,
Cancelled INT ,
CancellationCode STRING ,
Diverted INT ,
CarrierDelay STRING ,
WeatherDelay STRING ,
NASDelay STRING ,
SecurityDelay STRING ,
LateAircraftDelay STRING)
PARTITIONED BY (Year INT, Month INT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE;
To load partitioned table we use below command:
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE airline.onTimePerf PARTITION(Year, Month) SELECT DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay, Year, Month FROM stg_airline.onTimePerf;
While writing insert statement for a partitioned table make sure that you specify the partition columns at the last in select clause. The 2 SET commands instruct hive to change our query to dynamically load partitions. If you don't execute the above 2 SET commands you will get below error:
FAILED: SemanticException [Error 10096]: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
In my next blog I shall describe about using HCatalog in MapReduce program.

11 comments:

  1. This is very nice post, thank you.
    can you differentiate how bucketing and partition works?

    ReplyDelete
    Replies
    1. Hive partitioning and bucketing are very nicely explained here http://www.bidn.com/blogs/cprice1979/ssas/4646/partitions-amp-buckets-in-hive

      Delete
  2. its very nice explanaion and very helful can u explain about view and indexs with commands pls

    ReplyDelete
  3. How to rename table names based on partitions
    for ex

    2013 data in the table 2013_airline.onTimePerf
    2014 data in the table 2014_airline.onTimePerf
    2015 data in the table 2014_airline.onTimePerf

    ReplyDelete
    Replies
    1. I am not sure if that can be achieved directly using Hive queries, but you can easily write some shell script which can achieve this.
      You can also look into multi table insert for achieving this.

      Delete
  4. HI , i wanna do partitions by date , how can be alter table formate ?

    ReplyDelete
  5. Realy good blog for hive partitioning..here also see our blog for hive partitioning and bucketing..http://www.geoinsyssoft.com/hive-partition-bucketing/

    ReplyDelete
  6. where is data ?can u please forward the data.

    ReplyDelete
  7. INSERT OVERWRITE TABLE airline.onTimePerf PARTITION(Year, Month) SELECT DayofMonth, DayOfWeek, DepTime, CRSDepTime, ArrTime, CRSArrTime, UniqueCarrier, FlightNum, TailNum, ActualElapsedTime, CRSElapsedTime, AirTime, ArrDelay, DepDelay, Origin, Dest, Distance, TaxiIn, TaxiOut, Cancelled, CancellationCode, Diverted, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay, Year, Month FROM stg_airline.onTimePerf;
    this is not working for me.
    I had to give year='1987'. because i have so many years data from 1987-2008. also i am not suppose to mention year in select query.

    ReplyDelete