Define and Process Data Pipelines in Hadoop with Apache Falcon

Apache Falcon is a framework for simplifying data governance and pipeline processing

If you have any errors in completing this tutorial. Please ask questions or notify us on Hortonworks Community Connection!

Apache Falcon is a framework to simplify data pipeline processing and management on Hadoop clusters.

It makes it much simpler to onboard new workflows/pipelines, with support for late data handling and retry policies. It allows you to easily define relationship between various data and processing elements and integrate with metastore/catalog such as Hive/HCatalog. Finally it also lets you capture lineage information for feeds and processes.In this tutorial we are going walk the process of:

  • Defining the feeds and processes
  • Defining and executing a job to mirror data between two clusters
  • Defining and executing a data pipeline to ingest, process and persist data continuously

Prerequisite

Download Hortonworks Sandbox

Once you have download the Hortonworks sandbox and run the VM, navigate to the Ambari interface on the port 8080 of the IP address of your Sandbox VM. Login with the username and password of admin and admin respectively:

Scenario

In this tutorial we will walk through a scenario where email data lands hourly on a cluster. In our example:

  • This cluster is the primary cluster located in the Oregon data center.
  • Data arrives from all the West Coast production servers. The input data feeds are often late for up to 4 hrs.

The goal is to clean the raw data to remove sensitive information like credit card numbers and make it available to our marketing data science team for customer churn analysis.

To simulate this scenario, we have a pig script grabbing the freely available Enron emails from the internet and feeding it into the pipeline.

Starting Falcon

By default, Falcon is not started on the sandbox. You can click on the Falcon icon on the left hand bar:

Then click on the Service Actions button on the top right:

Then click on Start:

Once, Falcon starts, Ambari should clearly indicate as below that the service has started:

Downloading and staging the dataset

Now let’s stage the dataset using the commandline. Although we perform many of these file operations below using the command line, you can also do the same with the HDFS Files View in Ambari.

First SSH into the Hortonworks Sandbox with the command:

The default password is hadoop

Then login as user hdfs

su - hdfs

Then download the file falcon.zip with the following command”

wget http://hortonassets.s3.amazonaws.com/tutorial/falcon/falcon.zip

and then unzip with the command

unzip falcon.zip

Now let’s give ourselves permission to upload files

hadoop fs -chmod -R 777 /user/ambari-qa

then let’s create a folder falcon under ambari-qa with the command

hadoop fs -mkdir /user/ambari-qa/falcon

Now let’s upload the decompressed folder with the command

hadoop fs -copyFromLocal demo /user/ambari-qa/falcon/

Creating the cluster entities

Before creating the cluster entities, we need to create the directories on HDFS representing the two clusters that we are going to define, namely primaryCluster and backupCluster.

Use hadoop fs -mkdir commands to create the directories /apps/falcon/primaryCluster and /apps/falcon/backupCluster directories on HDFS.

hadoop fs -mkdir /apps/falcon/primaryCluster
hadoop fs -mkdir /apps/falcon/backupCluster

Further create directories called staging inside each of the directories we created above:

hadoop fs -mkdir /apps/falcon/primaryCluster/staging
hadoop fs -mkdir /apps/falcon/backupCluster/staging

Next we will need to create the working directories for primaryCluster and backupCluster

hadoop fs -mkdir /apps/falcon/primaryCluster/working
hadoop fs -mkdir /apps/falcon/backupCluster/working

Finally you need to set the proper permissions on the staging/working directories:

hadoop fs -chmod 777 /apps/falcon/primaryCluster/staging
hadoop fs -chmod 755 /apps/falcon/primaryCluster/working
hadoop fs -chmod 777 /apps/falcon/backupCluster/staging
hadoop fs -chmod 755 /apps/falcon/backupCluster/working
hadoop fs –chown –R falcon /apps/falcon/*

Let’s open the Falcon Web UI. You can easily launch the Falcon Web UI from Ambari:

You can also navigate to the Falcon Web UI directly on our browser. The Falcon UI is by default at port 15000. The default username is ambari-qa and the password is admin.

This UI allows us to create and manage the various entities like Cluster, Feed, Process and Mirror. Each of these entities are represented by a XML file which you either directly upload or generate by filling up the various fields.

You can also search for existing entities and then edit, change state, etc.

Let’s first create a couple of cluster entities. To create a cluster entity click on the Cluster button on the top.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="primaryCluster" description="this is primary cluster" colo="primaryColo" xmlns="uri:falcon:cluster:0.1">
    <tags>primaryKey=primaryValue</tags>
    <interfaces>
        <interface type="readonly" endpoint="hftp://sandbox.hortonworks.com:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.2.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/primaryCluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/primaryCluster/working"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <properties>
        <property name="test" value="value1"/>
    </properties>
</cluster>

Click Finish on top of the XML Preview area to save the XML.

Falcon UI should have automatically parsed out the values from the XML and populated in the right fields. Once you have verified that these are the correct values press Next.

Click Save to persist the entity.

Similarly, we will create the backupCluster entity. Again click on Cluster button on the top to open up the form to create the cluster entity.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cluster name="backupCluster" description="this is backup colo" colo="backupColo" xmlns="uri:falcon:cluster:0.1">
    <tags>backupTag=backupTagValue</tags>
    <interfaces>
        <interface type="readonly" endpoint="hftp://sandbox.hortonworks.com:50070" version="2.2.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.2.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.2.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.0.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.1.6"/>
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/backupCluster/staging"/>
        <location name="temp" path="/tmp"/>
        <location name="working" path="/apps/falcon/backupCluster/working"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <properties>
        <property name="key1" value="val1"/>
    </properties>
</cluster>

Click Finish on top of the XML Preview area to save the XML and then the Next button to verify the values.

Click Save to persist the backupCluster entity.

Defining the rawEmailFeed entity

To create a feed entity click on the Feed button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<feed name="rawEmailFeed" description="Raw customer email feed" xmlns="uri:falcon:feed:0.1">
    <tags>externalSystem=USWestEmailServers</tags>
    <groups>churnAnalysisDataPipeline</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(1)"/>
    <clusters>
        <cluster name="primaryCluster" type="source">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="days(90)" action="delete"/>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/user/ambari-qa/falcon/demo/primary/input/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
        <location type="stats" path="/"/>
        <location type="meta" path="/"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <schema location="/none" provider="/none"/>
</feed>

Click Finish on the top of the XML Preview area

Falcon UI should have automatically parsed out the values from the XML and populated in the right fields. Once you have verified that these are the correct values press Next.

On the Clusters page ensure you modify the validity to a time slice which is in the very near future.

Click Next

Save the feed

Defining the rawEmailIngestProcess entity

Now lets define the rawEmailIngestProcess.

To create a process entity click on the Process button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="rawEmailIngestProcess" xmlns="uri:falcon:process:0.1">
    <tags>email=testemail</tags>
    <clusters>
        <cluster name="primaryCluster">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <outputs>
        <output name="output" feed="rawEmailFeed" instance="now(0,0)"/>
    </outputs>
    <workflow name="emailIngestWorkflow" version="4.0.1" engine="oozie" path="/user/ambari-qa/falcon/demo/apps/ingest/fs"/>
    <retry policy="exp-backoff" delay="minutes(3)" attempts="3"/>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
</process>

Click Finish on the top of the XML Preview area

Accept the default values and click next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click next

Accept the default values and click Next

Let’s Save the process.

Defining the cleansedEmailFeed

Again, to create a feed entity click on the Feed button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<feed name="cleansedEmailFeed" description="Cleansed customer emails" xmlns="uri:falcon:feed:0.1">
    <tags>cleanse=cleaned</tags>
    <groups>churnAnalysisDataPipeline</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <late-arrival cut-off="hours(4)"/>
    <clusters>
        <cluster name="primaryCluster" type="source">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="hours(90)" action="delete"/>
            <locations>
                <location type="data" path="/user/ambari-qa/falcon/demo/primary/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
                <location type="stats" path="/"/>
                <location type="meta" path="/"/>
            </locations>
        </cluster>
        <cluster name="backupCluster" type="target">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
            <retention limit="hours(90)" action="delete"/>
            <locations>
                <location type="data" path="/falcon/demo/bcp/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
                <location type="stats" path="/"/>
                <location type="meta" path="/"/>
            </locations>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/user/ambari-qa/falcon/demo/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
        <location type="stats" path="/"/>
        <location type="meta" path="/"/>
    </locations>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
    <schema location="/none" provider="/none"/>
</feed>

Click Finish on the top of the XML Preview area

Accept the default values and click Next

Accept the default values and click Next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click Next

Accept the default values and click Save

Defining the cleanseEmailProcess

Now lets define the cleanseEmailProcess.

Again, to create a process entity click on the Process button on the top of the main page on the Falcon Web UI.

Then click on the edit button over XML Preview area on the right hand side of the screen and replace the XML content with the XML document below:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<process name="cleanseEmailProcess" xmlns="uri:falcon:process:0.1">
    <tags>cleanse=yes</tags>
    <clusters>
        <cluster name="primaryCluster">
            <validity start="2015-07-22T01:00Z" end="2015-07-22T10:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>
    <inputs>
        <input name="input" feed="rawEmailFeed" start="now(0,0)" end="now(0,0)"/>
    </inputs>
    <outputs>
        <output name="output" feed="cleansedEmailFeed" instance="now(0,0)"/>
    </outputs>
    <workflow name="emailCleanseWorkflow" version="pig-0.13.0" engine="pig" path="/user/ambari-qa/falcon/demo/apps/pig/id.pig"/>
    <retry policy="exp-backoff" delay="minutes(5)" attempts="3"/>
    <ACL owner="ambari-qa" group="users" permission="0x755"/>
</process>

Click Finish on the top of the XML Preview area

Accept the default values and click Next

On the Clusters page ensure you modify the validity to a time slice which is in the very near future and then click Next

Select the Input and Output Feeds as shown below and Save

Running the feeds

From the Falcon Web UI home page search for the Feeds we created

Select the rawEmailFeed by clicking on the checkbox

Then click on the Schedule button on the top of the search results

Next run the cleansedEmailFeed in the same way

Running the processes

From the Falcon Web UI home page search for the Process we created

Select the cleanseEmailProcess by clicking on the checkbox

Then click on the Schedule button on the top of the search results

Next run the rawEmailIngestProcess in the same way

If you visit the Oozie process page, you can seen the processes running

Input and Output of the pipeline

Now that the feeds and processes are running, we can check the dataset being ingressed and the dataset egressed on HDFS.

Here is the data being ingressed

and here is the data being egressed from the pipeline

Summary

In this tutorial we walked through a scenario to clean the raw data to remove sensitive information like credit card numbers and make it available to our marketing data science team for customer churn analysis by defining a data pipeline with Apache Falcon.

We hope you enjoyed the tutorial! If you’ve had any trouble completing this tutorial or require assistance, please head on over to Hortonworks Community Connection where hundreds of Hadoop experts are ready to help!

Comments

|
September 26, 2014 at 12:43 pm
|

Very fascinating, quite promising and spectacular!!

Rizwan Mian
|
December 3, 2014 at 12:36 pm
|

Works as suggests on the tin.

    Kiran
    |
    July 16, 2015 at 8:58 am
    |

    Rizwan,
    I need some help in falcon, did setup falcon without errors, but don’t see processing going as suggested, if you don’t mind can you share some details on the same.

    Thanks
    Kiran Jilla

Snehil Suresh Wakchaure
|
January 17, 2015 at 9:09 pm
|

Thank you so much! This is a very nice introductory tutorial!

|
April 16, 2015 at 10:21 pm
|

The tutorial missing the following steps in creating working/staging HDFS directory. Make sure that you manually create these needed directory. Or, you can create another “prepare-step” shell to create these directories.
hdfs dfs -mkdir -p /apps/falcon/primaryCluster/staging
hdfs dfs -mkdir -p /apps/falcon/primaryCluster/working
hdfs dfs -mkdir -p /apps/falcon/backupCluster/working
hdfs dfs -mkdir -p /apps/falcon/backupCluster/staging

|
April 19, 2015 at 9:32 pm
|

After the success of running and I tried to figure out how to kill the workflow jobs since they are going to run periodically as your specifications that you submitted to Oozie by Falcon scheduling. You can’t kill those jobs using Oozie web UI. If you do, you will get something “… HTTP authentication error”. You have to go back to the shell (ambari-qa) commands as below to kill the “scheduled” jobs in Oozie. But, you can leave these entities up in falcon for later reuse or re-run demo.

falcon instance -type process -name rawEmailIngestProcess -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
falcon instance -type process -name cleanseEmailProcess -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
falcon instance -type feed -name rawEmailFeed -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
falcon instance -type feed -name cleansedEmailFeed -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”

Just be careful with the time start and end format since it is very picky about it. Otherwise, it will throw back exception with something “… not a valid UTC string” (in fact, it should say not an ISO8601 format since UTC is not a format).

Also, you can use REST command line too for doing the above. But, I didn’t try it yet.

    Ray Sheu
    |
    January 11, 2016 at 10:36 am
    |

    Correction in how to kill:
    1.) Login “falcon” user using “sudo su falcon”
    2.) The sequence of killing Falcon processes is important too following the orders
    falcon instance -type process -name cleanseEmailProcess -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
    falcon instance -type feed -name cleansedEmailFeed -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
    falcon instance -type process -name rawEmailIngestProcess -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”
    falcon instance -type feed -name rawEmailFeed -kill -start “2014-02-28T00:00Z” -end=”2016-03-31T00:00Z”

jaipal reddy
|
June 17, 2015 at 10:15 pm
|

How to configure Falcon to send simple email when Workflow starts & ends.

    Andrew Ahn
    |
    July 28, 2015 at 9:22 am
    |

    The default notification is via JMS messages. Email notification is available for mirroring of HDFS and Hive DR entity creation. Generically for feeds and process will be address in the next maintenance release.

xin
|
July 7, 2015 at 7:03 pm
|

What’s the user/password of the falcon Web UI?

Vik1
|
July 27, 2015 at 3:49 pm
|

does not work with hdp 2.3 sandbox

    Chakra
    |
    August 7, 2015 at 4:56 am
    |

    I just tested this tutorial in HDP 2.3 sandbox and it works.
    However you have do few additional things in terms of directory creation and its permissions. Before you submit the entities you need to create following directories and set the following permissions.
    Login to sandbox and change to falcon user
    su falcon

    then create following directories
    hdfs dfs -mkdir -p /apps/falcon/primaryCluster/staging
    hdfs dfs -mkdir -p /apps/falcon/primaryCluster/working
    hdfs dfs -mkdir -p /apps/falcon/backupCluster/working
    hdfs dfs -mkdir -p /apps/falcon/backupCluster/staging

    Then change the permission using following command

    hdfs dfs -chmod 777 /apps/falcon/primaryCluster/staging
    hdfs dfs -chmod 755 /apps/falcon/primaryCluster/working
    hdfs dfs -chmod 777 /apps/falcon/backupCluster/staging
    hdfs dfs -chmod 755 /apps/falcon/backupCluster/working

    Rest of the tutorial is perfect.

Andrew Ahn
|
July 28, 2015 at 9:06 am
|

Not correct. Please verify the service is running through Ambari. It is off by default in the sandbox to reduce VM load. The setups will updated to us the new UI, but this current tutorial is still valid.

    Andrew Ahn
    |
    July 28, 2015 at 9:59 am
    |

    To access Ambari from the Sandbox first confirm the VM is running. Then browse to:

    1) http://127.0.0.1:8080 with user: admin / pass: admin
    2) Select Falcon on left margin and enable the Falcon server and Client service.

|
August 20, 2015 at 12:09 am
|

Pretty good data governance tool, but Falcon Web UI does not reveal much about the pipeline path and lineage. Does anyone know about where to look for the graphical tree?

Marko
|
October 27, 2015 at 1:50 am
|

[ambari-qa@sandbox]$ falcon entity -type process -schedule -name rawEmailIngestProcess
ERROR: Bad Request;Entity schedule failed for process: rawEmailIngestProcess

Marko
|
October 27, 2015 at 1:53 am
|

[ambari-qa@sandbox falconChurnDemo]$ falcon entity -type cluster -submit -file oregonCluster.xml
ERROR: Bad Request;Path [/apps/falcon/primaryCluster/staging] on the cluster [primaryCluster] has owner [ambari-qa]. Current user [falcon] is not the owner of the path

Ben Speed
|
November 18, 2015 at 7:01 am
|

I’m struggling at being able to schedule the Process Entities, using the UI and CLI I get a message:

[ambari-qa@sandbox falconChurnDemo]$ falcon entity -type process -schedule -name rawEmailIngestProcess
ERROR: Bad Request;default/org.apache.falcon.FalconWebException::org.apache.falcon.FalconException: Entity schedule failed for process: rawEmailIngestProcess

from the CLI and similar from the UI. I’ve followed the instructions on a fresh boot with Hortonworks Sandbox 2.3.2

Please can you help with this

Avijeet
|
December 15, 2015 at 4:49 am
|

oozie job failed at Eviction action with : JA006: Call From sandbox.hortonworks.com to sandbox.hortonworks.com:10020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused

Isaac
|
December 16, 2015 at 8:44 am
|

I cannot schedule any of the processes. I constantly got:

ERROR: Bad Request;default/org.apache.falcon.FalconWebException::org.apache.falcon.FaconException: Entity schedule failed for process: rawEmailIngestProcess

Thanks

avinash
|
December 22, 2015 at 12:36 am
|

I have tried the same tutorial But i am getting error in saving the xml specification I am getting invalid oozie server or port .
I am able to login to the oozie web ui but still it gives the same error.

mohit
|
January 20, 2016 at 1:00 pm
|

Couple of things:
1. I think this is only a workflow management tool & not for visualizing lineage or impact assement. I believe Apache Falcon is purposed for that.
2. I guess that the advantage of using it over Oozie is that its like end to end worklow management & also takes care of data retention & mirroring etc.
3. A complete graphical tutorial can be found at
hortonworks.com/hadoop-tutorial/processing-data-pipeline-with-apache-falcon/

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>