Introduction to Apache Falcon: Data Governance for Hadoop

Fly, Data, Fly

Apache Falcon is a data governance engine that defines, schedules, and monitors data management policies. Falcon allows Hadoop administrators to centrally define their data pipelines, and then Falcon uses those definitions to auto-generate workflows in Apache Oozie.

InMobi is one of the largest Hadoop users in the world, and their team began the project 2 years ago. At the time, InMobi was processing billions of ad-server events in Hadoop every day. The InMobi team started the project to meet their need for policies to manage how that data flowed into and through their cluster—specifically for replication, lifecycle management, and data lineage and traceability.

This became the Apache Falcon incubator project in April 2013. Since it became an Apache incubator project, three Falcon releases have resolved nearly 400 JIRAs with contributions from many individuals in the open source community.

Apache Falcon will ship with Hortonworks Data Platform version 2.1. This is the first time that Falcon will be included in a top-tier Hadoop distribution.

Although Falcon is a newer addition to the Hadoop ecosystem, it has already been running in production at InMobi for nearly two years. InMobi uses it for various processing pipelines and data management functions, including SLA feedback pipelines and revenue pipelines.

What does Apache Falcon do?

Apache Falcon simplifies complicated data management workflows into generalized entity definitions. Falcon makes it far easier to:

  • Define data pipelines
  • Monitor data pipelines in coordination with Ambari, and
  • Trace pipelines for dependencies, tagging, audits and lineage.

This architecture diagram, gives a high-level view of how Falcon interacts with the Hadoop cluster to define, monitor and trace pipelines:

falc1

Apache Oozie is Hadoop’s workflow scheduler, but mature Hadoop clusters can have hundreds to thousands of Oozie coordinator jobs. At that level of complexity, it becomes difficult to manage so many data set and process definitions.

This results in some common mistakes. Processes might use the wrong copies of data sets. Data sets and processes may be duplicated, and it becomes increasingly more difficult to track down where a particular data set originated.

Falcon addresses these data governance challenges with high-level and reusable “entities” that can be defined once and re-used many times. Data management policies are defined in Falcon entities and manifested as Oozie workflows, as shown in the following diagram.

falc2

Apache Falcon defines three types of entities that can be combined to describe all data management policies and pipelines. These entities are:

  • Cluster
  • Feed (i.e. Data Set)
  • Process

falc3

Getting Started with Apache Falcon

To better illustrate how Apache Falcon manages data, let’s take a look at a few hands-on examples in the Hortonworks Sandbox. For these examples, we will assume that you have a Falcon server running.

See “Hortonworks Technical Preview for Apache Falcon” instructions on Falcon server installation.)

Example #1: A Simple Pipeline

Let’s start with a simple data pipeline. We want to take a data set that comes in every hour, process it with a pig script, and then land the output for further use with other processes.

falc4

To orchestrate this simple process with Falcon, we need to define the required entities in Falcon’s XML format. The XML definition is very straightforward and it helps us better understand how the entity is defined.

Cluster Entity

First we define a Cluster entity. This includes all of the service interfaces used by Falcon. The Feed and Process entities (to follow) both depend on the Cluster definition.

<?xml version="1.0" encoding="UTF-8"?>
<cluster colo="toronto" description="Primary Cluster"
         name="primary-cluster"
         xmlns="uri:falcon:cluster:0.1">
    <tags>class=production,site=canada-east</tags>
    <interfaces>    
        <interface type="readonly" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.4.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.4.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.4.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.1.0"/> 
        <interface type="registry" endpoint="thrift://sandbox.hortonworks.com:9083" version="0.13.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.4.3"/>    </interfaces>
    <locations>
        <location name="staging" path="/tmp"/>
        <location name="working" path="/tmp"/>
        <location name="temp" path="/tmp"/>
    </locations>
</cluster>

Submit this Cluster Entity to Falcon:

falcon entity -type cluster -submit -file primary-cluster.xml

Feed Entities

The “raw input feed” and “filtered feed” must also be defined. They are both considered Feed entities in Falcon.

For both of the Feed entities, we define the following key parameters:

  • Feed Frequency. How often this feed will land. This is should coincide with the parameterized input directories (“${YEAR}…”). In our example, the raw-input-feed has data landing every hour and the filtered-feed aggregates the data to a daily file.
  • Cluster. The cluster(s) where this feed will land. The primary-cluster entity is cited by name. Multiple clusters can be specified for replication policies.
  • Retention Policy. How long the data will remain on the cluster. This will create a job that consistently deletes files older than 90 days.
  • Data Set Locations. The HDFS path to the location of the files. This example has a parameterized path, allowing the flexibility to define one Feed entity for sets of data that are from the same source but “partitioned” by folder as they land.

Create the raw-input-feed.xml:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="raw input feed" name="raw-input-feed" xmlns="uri:falcon:feed:0.1">
    <tags>owner=landing,pipeline=adtech,category=click-logs</tags>
    <frequency>minutes(60)</frequency>
    <clusters>
        <cluster name="primary-cluster" type="source">
            <validity start="2014-03-01T00:00Z" end="2016-01-01T00:00Z"/>
            <retention limit="days(90)" action="delete"/>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/landing/raw-input-feed/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
    </locations>   
    <ACL owner="adam" group="landing" permission="0x755"/>
	<schema location="/none" provider="none" />
</feed>

Create the filtered-feed.xml:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="filtered feed" name="filtered-feed" xmlns="uri:falcon:feed:0.1">
    <tags>owner=landing,pipeline=adtech,category=click-logs</tags>
    <frequency>days(1)</frequency>
    <clusters>
        <cluster name="primary-cluster" type="source">
            <validity start="2014-03-01T00:00Z" end="2016-01-01T00:00Z"/>
            <retention limit="months(36)" action="delete"/>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/landing/filtered-feed/${YEAR}-${MONTH}-${DAY}"/>
    </locations>   
    <ACL owner="bizuser" group="analytics" permission="0x755"/>
	<schema location="/none" provider="none" />
</feed>

Submit these entities:

falcon entity -type feed -submit -file raw-input-feed.xml
falcon entity -type feed -submit -file filtered-feed.xml

Schedule the Feeds:

falcon entity -type feed -schedule -name raw-input-feed
falcon entity -type feed -schedule -name filtered-feed

Unlike the Cluster definition, the Feed entities have policies attached to them that need to be explicitly scheduled by Falcon. Falcon takes the retention, replication, feed frequency, and delays and creates Oozie Coordinator jobs to automate all of these actions for you.

Note: To submit the entities you may have to specify the correct proxy user groups in the core-site.xml. You can set these to the explicit groups for your or set these to a value of  “*” to grant everyone user proxy rights. Here are two examples:

hadoop.proxyuser.oozie.groups
hadoop.proxyuser.falcon.groups

Process Entity

The Process entity in Falcon is the high-level abstraction of any data processing task in your pipeline. Falcon can define the following built-in actions:

  • Any Oozie workflow (which includes many actions)
  • A Hive script
  • A Pig script

The actions can be sent parameterized values so you can use one script to work on many different Falcon-defined processes. This helps reduce code complexity and makes it easier to manage multiple workflows.

While you can call complicated Oozie workflows with one Falcon job, we recommend that you split up the complex Oozie workflows into modular steps. This helps Falcon better manage retention of any intermediary data sets. It also allows for reuse of Processes and Feeds.

The following process defines a Pig Script as the execution engine. In this process, we:

  1. Take the data from the HDFS file specified in “raw-input-feed”,
  2. Call the “simplefilter.pig” script to filter a specific subset of the data, and
  3. Land the output data in the feed specified in “filtered-feed”.

The frequency of the input files is every hour and we aggregate that into a daily, filtered data set.

For the Process entity, we define the following key parameters:

  • Clusters – the site or sites where the process is executed
  • Frequency – the frequency of the job execution. This usually corresponds to the named output feed frequency.
  • Inputs – We specify the name of the Feed entity (“raw-input-feed”) and the time range of the data to use in the process.
  • Outputs – We specify the name of the Feed entity (“filtered-feed”) for output and the time that this output will correspond to.  This results in a single file for all of the 1 hour input files in the input Feed.
  • Retry – We can control the number of retry attempts and the delay between each attempt.

Create filter-process.xml:

<?xml version="1.0" encoding="UTF-8"?>
<process name="filter-process" xmlns="uri:falcon:process:0.1">
    <tags>owner=landing,pipeline=adtech,category=click-logs</tags>
    <clusters>
        <cluster name="primary-cluster">
            <validity start="2014-03-02T00:00Z" end="2016-04-30T00:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>days(1)</frequency>
    <inputs>
        <input name="input" feed="raw-input-feed" start="yesterday(0,0)" end="today(-1,0)" />
    </inputs>
    <outputs>
        <output name="output" feed="filtered-feed" instance="yesterday(0,0)" />
    </outputs>
    <workflow engine="pig" path="/landing/scripts/simplefilter.pig" />
    <retry policy="periodic" delay="minutes(30)" attempts="2" />
</process>

Example pig script used (simplefilter.pig):

A = LOAD '$input' using PigStorage(',') AS (id:chararray, value:chararray, errcode:int);
B = FILTER A BY (errcode == 200);
STORE B INTO '$output' USING PigStorage('|');

Submit and Schedule the process:

falcon entity -type process -submit -file filter-process.xml
falcon entity -type process -schedule -name filter-process

Note the Expression Language used (today(), yesterday()) in the Process entity is defined in the Falcon documentation

Once we schedule our Feeds and Processes we can take a look at the Oozie GUI to see the jobs automatically generated. We will see a job for each retention policy.

falc5

Example #2: Late Data Arrival

falc6

A late data arrival policy can be added to the Process entity. This allows for an alternate workflow to handle input data that does not arrive within the defined times.

A late cut-off time can be supplied on the Feed entities. This uses the Falcon Expression Language to specify the amount of time before a Feed is considered late. We have set this to 10 minutes in our “raw-input-feed”.

Edit raw-input-feed.xml:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="raw input feed" name="raw-input-feed" xmlns="uri:falcon:feed:0.1">
    <tags>owner=landing,pipeline=adtech,category=click-logs</tags>
    <frequency>minutes(60)</frequency>
    <late-arrival cut-off="minutes(10)"/>
    <clusters>
        <cluster name="primary-cluster" type="source">
            <validity start="2014-03-01T00:00Z" end="2016-01-01T00:00Z"/>
            <retention limit="days(90)" action="delete"/>
        </cluster>
    </clusters>
    <locations>
        <location type="data" path="/landing/raw-input-feed/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
    </locations>   
    <ACL owner="adam" group="landing" permission="0x755"/>
	<schema location="/none" provider="none" />
</feed>

The late arrival policy on the Process entity can be specified with: time-delayed back-off, exponential back-off, or final (no delay or back-off). For our modified filter-process, we have specified a strict policy that uses an alternative workflow if a feed is later than its cut-off time (10 minutes, in the case of “raw-input-feed”).  The alternative workflow is defined as an Oozie action located in hdfs://landing/late/workflow (workflow not shown below).

Edit filter-process.xml:

<?xml version="1.0" encoding="UTF-8"?>
<process name="filter-process" xmlns="uri:falcon:process:0.1">
    <tags>owner=landing,pipeline=adtech,category=click-logs</tags>
    <clusters>
        <cluster name="primary-cluster">
            <validity start="2014-03-02T00:00Z" end="2016-04-30T00:00Z"/>
        </cluster>
    </clusters>
    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>days(1)</frequency>
    <inputs>
        <input name="input" feed="raw-input-feed" start="yesterday(0,0)" end="today(-1,0)" />
    </inputs>
    <outputs>
        <output name="output" feed="filtered-feed" instance="yesterday(0,0)" />
    </outputs>
    <workflow engine="pig" path="/landing/scripts/simplefilter.pig" />
    <retry policy="periodic" delay="minutes(30)" attempts="2" />
    <late-process policy="final" delay="minutes(1)">
        <late-input input="input" workflow-path="hdfs://landing/late/workflow"/>
    </late-process>
</process>

Update the relevant Entities:


falcon entity -type feed -name raw-input-feed -file raw-input-feed.xml –update
falcon entity -type process -name filter-process -file filter-process.xml –update

Note: We updated the active existing entities rather than delete and re-schedule them. For other Falcon CLI commands please see the Apache Falcon documentation.

Example #3: Cross-Cluster Replication

falc7

In this final example, we will illustrate Cross-Cluster Replication with a Feed entity. This is a simple way to enforce Disaster Recovery policies or aggregate data from multiple clusters to a single cluster for enterprise reporting. To further illustrate Apache Falcon’s capabilities, we will use an HCatalog/Hive table as the Feed entity.

First, we need to create a second Cluster entity to replicate to. If you are running this as an example, you can startup another Hortonworks Sandbox virtual machine or just point the second Cluster entity back to the same Sandbox you were already using. If you choose to try this out with a separate cluster, specify that second cluster’s endpoints appropriately.

Create secondary-cluster.xml:

<?xml version="1.0" encoding="UTF-8"?>
<cluster colo="toronto" description="Secondary Cluster"
         name="secondary-cluster"
         xmlns="uri:falcon:cluster:0.1">
    <tags>class=production,site=canada-east</tags>
    <interfaces>    
        <interface type="readonly" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.4.0"/>
        <interface type="write" endpoint="hdfs://sandbox.hortonworks.com:8020" version="2.4.0"/>
        <interface type="execute" endpoint="sandbox.hortonworks.com:8050" version="2.4.0"/>
        <interface type="workflow" endpoint="http://sandbox.hortonworks.com:11000/oozie/" version="4.1.0"/> 
        <interface type="registry" endpoint="thrift://sandbox.hortonworks.com:9083" version="0.13.0"/>
        <interface type="messaging" endpoint="tcp://sandbox.hortonworks.com:61616?daemon=true" version="5.4.3"/>
    </interfaces>
    <locations>
        <location name="staging" path="/tmp"/>
        <location name="working" path="/tmp"/>
        <location name="temp" path="/tmp"/>
    </locations>
</cluster>

Submit Cluster entity:

falcon entity -type cluster -submit -file secondary-cluster.xml

After we have a secondary cluster to replicate to, we can define the database and tables on both the primary and secondary clusters.  This is a prerequisite for defining a Table Feed entity; Falcon will validate the existence of the tables before allowing them to be defined in a Feed.

Create database and tables in Apache Hive:

-- Run on primary cluster
create database landing_db;
use landing_db;
CREATE TABLE summary_table(id int, value string) PARTITIONED BY (ds string);
ALTER TABLE summary_table ADD PARTITION (ds = '2014-01');
ALTER TABLE summary_table ADD PARTITION (ds = '2014-02');
ALTER TABLE summary_table ADD PARTITION (ds = '2014-03');
 
-- Run on secondary cluster 
create database archive_db;
use archive_db;
CREATE TABLE summary_archive_table(id int, value string) PARTITIONED BY (ds string);

With the tables created, we can define the Feed entity. Two clusters are defined as a source and target for replication. The tables are referred to by a URI that denotes their database and their table name. This Feed could later be used with Falcon processes for Hive and Pig to create larger workflows.

In this example, there is no process required because the replication policy is defined on the Feed itself.

Finally, the retention policy would remove partitions past the retention limit set on each cluster. This last example Feed entity demonstrates the following:

  • Cross-cluster replication of a Data Set
  • The native use of a Hive/HCatalog table in Falcon
  • The definition of a separate retention policy for the source and target tables in replication

Create replication-feed.xml:

<?xml version="1.0" encoding="UTF-8"?>
<feed description="Monthly Analytics Summary" name="replication-feed"
      xmlns="uri:falcon:feed:0.1">
    <tags>owner=landing,pipeline=adtech,category=monthly-report</tags>
    <frequency>months(1)</frequency>
    <clusters>
        <cluster name="primary-cluster" type="source">
            <validity start="2014-03-01T00:00Z" end="2016-01-31T00:00Z"/>
            <retention limit="months(36)" action="delete"/>
        </cluster>
        <cluster name="secondary-cluster" type="target">
            <validity start="2014-03-01T00:00Z" end="2016-01-31T00:00Z"/>
            <retention limit="months(180)" action="delete"/>
            <table uri="catalog:archive_db:summary_archive_table#ds=${YEAR}-${MONTH}" />
        </cluster>
    </clusters>
    <table uri="catalog:landing_db:summary_table#ds=${YEAR}-${MONTH}" />
    <ACL owner="etluser" group="landing" permission="0755"/>
    <schema location="hcat" provider="hcat"/>
</feed>

Submit and Schedule Feed entity:

falcon entity -type feed -submit -file replication-feed.xml
falcon entity -type feed -schedule -name replication-feed

What’s Next?

The Apache Falcon project is expanding rapidly. This blog has only covered a portion of its capabilities.  We plan future blogs to cover:

  • Distributed Cluster Management with Apache Falcon Prism
  • Management UI
  • Data Lineage
  • Data Management Alerting and Monitoring
  • Falcon Integration and Extensibility via the REST API
  • Secure Data Management

Additional Information

Categorized by :
Administrator CIO & ITDM Data Management Developer Falcon HDP 2 Oozie

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Join the Webinar!

Big Data Virtual Meetup Chennai
Wednesday, October 29, 2014
9:00 pm India Time / 8:30 am Pacific Time / 4:30 pm Europe Time (Paris)

More Webinars »

HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.