Wednesday, 30 September 2015

Apache Falcon - Defining a process dependent on Multiple Hive Tables

For some requirements we might need to use two or more Hive tables as input, to a single Apache Falcon process. In this post I am going to explain how we can define a Falcon process which takes two input tables and stores the output to one Hive table.

Let us assume that there are two input tables, each partitioned by feed_date column. For each feed_date we want to get record count for both the input tables and store table name along-with the record count in an output table which is also partitioned by feed_date.

The Falcon feed for first Hive table is defined as -

And the second feed Hive table is defined as -

Our output Hive table is defined as -

Note: DDL for output Hive table is -
USE tmp_rishav
;
CREATE TABLE rec_count_tbl (tbl_name STRING, cnt INT) partitioned by(feed_date date) stored as textfile
;

The Falcon process which uses two input tables and writes output to one table is -

The above Falcon process invokes a Hive script which is given below -

You might have notice that we are using some variables like falcon_output_database, falcon_input1_database, etc. These variables are defined by Falcon. For a complete list of variables set by Falcon you can check Falcon UI Log for your process and then check "Action Configuration" tab for this Oozie workflow.


Falcon sets up these variables for input Hive feed -
  <param>falcon_input1_storage_type=TABLE</param>
  <param>falcon_input1_catalog_url=thrift://localhost:9083</param>
  <param>falcon_input1_table=table1</param>
  <param>falcon_input1_database=tmp_rishav</param>
  <param>falcon_input1_filter=(feed_date='2015-09-28')</param>
this input1 part of variable's name is dependent on the name which we give to feed in Falcon process.
If you have defined multiple Hive input feeds with names like input1, input2, input3, etc. you can refer them by replacing input1 in above variable's name.

And for output Hive feed below variables are defined -
  <param>falcon_output_table=rec_count_tbl</param>
  <param>falcon_output_partitions_java='feed_date=2015-09-28'</param>
  <param>falcon_output_catalog_url=thrift://localhost:9083</param>
  <param>falcon_output_partitions_pig='feed_date=2015-09-28'</param>
  <param>falcon_output_partitions_hive=feed_date='2015-09-28'</param>
  <param>falcon_output_dated_partition_value_feed_date=2015-09-28</param>
  <param>falcon_output_storage_type=TABLE</param>
  <param>falcon_output_database=tmp_rishav</param>
  <param>falcon_output_partitions=feed_date='2015-09-28'</param>
again output part of variable's name is dependent on the name which we give to output feed in Falcon process.

Before submitting/scheduling these Falcon entities we need to upload the Hive script to HDFS.
Now we can submit and schedule these Falcon entities.

The commands to submit and schedule these Falcon entities are -
falcon entity -type feed -submit -file input-table-1-feed.xml
falcon entity -type feed -submit -file input-table-2-feed.xml
falcon entity -type feed -submit -file rec-count-tbl-feed.xml
falcon entity -type process -submit -file multi-table-process.xml

falcon entity -type feed -name input-table-1 -schedule
falcon entity -type feed -name input-table-2 -schedule
falcon entity -type feed -name rec-count-tbl -schedule
falcon entity -type process -name multi-table-process -schedule

Monday, 14 September 2015

Apache Falcon – Scheduling First Process



In my last post I described how we can define our first Apache Falcon process, in this post I will describe how we can schedule (execute) that process.

As a first step towards scheduling this process we need to submit our cluster to Falcon using below command –

$ falcon entity -type cluster -submit -file test-primary-cluster.xml
falcon/default/Submit successful (cluster) test-primary-cluster

We can verify all the clusters registered with Falcon using below command -

$ falcon entity -type cluster –list
(CLUSTER) test-primary-cluster

After the cluster is submitted we need to submit our feed and process respectively -

$ falcon entity -type feed -submit -file feed-01-trigger.xml
falcon/default/Submit successful (feed) feed-01-trigger

$ falcon entity -type process -submit -file process-01.xml
falcon/default/Submit successful (process) process-01
Now we have to upload our Oozie workflow which is referred by the Falcon process to HDFS –

$ hadoop fs -mkdir -p /tmp/oozie_workflow
$ hadoop fs -put workflow.xml /tmp/oozie_workflow/

After submitting all the Falcon entities we need to schedule our Falcon process and feed –

$ falcon entity -type feed -name feed-01-trigger –schedule
$ falcon entity -type process -name process-01 –schedule

Once the Falcon process is scheduled, the status of different instances of this process will be in waiting state as the feed file is not present on HDFS as shown in below screenshot -

Let us create one instance of this feed file

$ hadoop fs -mkdir -p /tmp/feed-01/2015-09-07
On refreshing Falcon UI for this process we can see that the first instance of this process has triggered and it is in running state and after some time the status is successful as shown in below screenshot -
 


We can also check /tmp/demo.out file to confirm if our script has executed successfully.

If you want to delete this Falcon feed and process entities execute below commands –


$ falcon entity -type process -name process-01 -delete
$ falcon entity -type feed -name feed-01-trigger -delete

Friday, 11 September 2015

Apache Falcon – Defining First Process

In last two posts (post 1, post 2) I provided basic introduction to Apache Falcon, in this post I will describe how we can write a basic Falcon data pipeline.
The Falcon process which I am going to describe about triggers on achieving two conditions –
  1.  Process start time (i.e. 15:00 UTC) is met.
  2. And a trigger folder is created in location /tmp/feed-01/ with name as ${YEAR}-${MONTH}-${DAY}.

Once the Falcon process is triggered it invokes an Oozie workflow which calls a SSH script which just prints the two input parameters to /tmp/demo.out file on local FS of SSH box.

The code for Falcon cluster (test-primary-cluster) is –

  
One important thing to note here is you need to create staging and working directories on HDFS with proper permission and ownership. The below permissions and ownership are needed on Hortonworks cluster –

hadoop fs -mkdir -p /apps/falcon/test-primary-cluster/staging/
hadoop fs -chmod 777 /apps/falcon/test-primary-cluster/staging/
hadoop fs -mkdir -p /apps/falcon/test-primary-cluster/working/
hadoop fs -chmod 755 /apps/falcon/test-primary-cluster/working/
hadoop fs -chown -R falcon:hadoop /apps/falcon/test-primary-cluster

The code for Falcon feed (feed-01-trigger) is –


For this feed -
  • The retention limit is set to 9999 months.
  • Late arrival limit is set to 20 hours.
  • And frequency is set to daily.

The code for Falcon process (process-01) is –

For this process -
  • The start time is set at 15:00 UTC.
  • Dependency is set to input feed feed-01-trigger.
  • Retry policy is set to 2 times with a gap of 15 minutes.
  • This process is also using EL expression to set input2 variable to get yesterday's date.

The oozie workflow with SSH action is as defined below –

This Oozie workflow -
  • Gets input1, input2 and workflowName variable from Falcon proces-01 process.
  • And invokes shell script on poc001 box with input1 and input2 as parameters.
And demo.bash script called by Oozie SSH action is given below –

demo.bash is a simple script which echos current date, input1 and input2 variable to /tmp/demo.out file.


In my next post I will explain how we can submit and schedule these Falcon process.

Tuesday, 8 September 2015

Apache Falcon – Basic Concepts



Apache Falcon use only three types of entities to describe all data management policies and pipelines. These entities are:

·         Cluster: Represents the “interfaces” to a Hadoop cluster
·         Feed: Defines a “dataset” File, Hive Table or Stream
·         Process: Consumes feeds, invokes processing logic & produces feeds



Using these three types of entities only we can manage replication, archival, retention of data and also handle job/process failures and late data arrival.

These Falcon entities–

  • Are easy and simple to define using XML.
  • Are modular - clusters, feeds & processes defined separately and then linked together and easy to re-use across multiple pipelines.
  • Can be configured for replication, late data arrival, archival and retention.


Using Falcon a complicated data pipeline like below


can be simplified to a few Falcon entities (which are further converted to multiple Oozie workflows by Falcon engine itself)

In my next post I will explain how we can define a Falcon process and perquisites for that.

Reference - http://www.slideshare.net/Hadoop_Summit/driving-enterprise-data-governance-for-big-data-systems-through-apache-falcon