Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
July 15, 2014
prev slideNext slide

Four Steps Strategy for Incremental Updates in Apache Hive on Hadoop

Update August 2017: Why update Hive Tables in four steps when you can do it in one! Check out this updated guide for updating Hive Tables the easy way.


Incremental Updates

Hadoop and Hive are quickly evolving to outgrow previous limitations for integration and data access.
On the near-term development roadmap, we expect to see Hive supporting full CRUD operations (Insert, Select, Update, Delete). As we wait for these advancements, there is still a need to work with the current options—OVERWRITE or APPEND— for Hive table integration.

The OVERWRITE option requires moving the complete record set from source to Hadoop. While this approach may work for smaller data sets, it may be prohibitive at scale.

The APPEND option can limit data movement only to new or updated records. As true Inserts and Updates are not yet available in Hive, we need to consider a process of preventing duplicate records as Updates are appended to the cumulative record set.

In this blog, we will look at a four-step strategy for appending Updates and Inserts from delimited and RDBMS sources to existing Hive table definitions. While there are several options within the Hadoop platform for achieving this goal, our focus will be on a process that uses standard SQL within the Hive toolset.

IMPORTANT: For this blog, we assume that each source table will have a unique single or multi-key identifier and that a “modified_date” field is maintained for each record – either defined as part of the original source table or added as part of the ingest process.

Hive Table Definition Options: External, Local and View

External Tables are the combination of Hive table definitions and HDFS managed folders and files. The table definition exists independent from the data, so that, if the table is dropped, the HDFS folders and files remain in their original state.

Local Tables are Hive tables that are directly tied to the source data. The data is physically tied to the table definition and will be deleted if the table is dropped.

Views, as with traditional RDBMS, are stored SQL queries that support the same READ interaction as HIVE tables, yet they do not store any data of their own. Instead, the data are stored and sourced from the HIVE tables referenced in the stored SQL query.

The following process outlines a workflow that leverages all of the above in four steps:

Screen Shot 2014-07-14 at 2.24.01 PM
  • Ingest. Complete (base_table) table movement followed by Change (incremental_table) records only.
  • Reconcile. Creating a Single View of Base + Change records (reconcile_view) to reflect the most up-to-date record set.
  • Compact. Creating a Reporting table (reporting_table) from the reconciled view.
  • Purge. Replacing the Base table with Reporting table contents and deleting any previously processed Change records before the next Data Ingestion cycle.

The tables and views that will be a part of the Incremental Update Workflow are:

  • base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table (as explained in the Step 4: Purge).
  • incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content (as explained in the Step 4: Purge).
  • reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table (as explained in Step 3: Compact).
  • reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.

Step 1: Ingest

Depending on whether direct access is available to the RDBMS source system, you may opt for either a File Processing method (when no direct access is available) or RDBMS Processing (when database client access is available).

Regardless of the ingest option, the processing workflow in this article requires:

  1. One-time, initial load to move all data from source table to HIVE.
  2. On-going, “Change Only” data loads from the source table to HIVE.

Below, both File Processing and Database-direct (SQOOP) ingest will be discussed.

File Processing

For this blog, we assume that a file or set of files within a folder will have a delimited format and will have been generated from a relational system (i.e. records have unique keys or identifiers).

Files will need to be moved into HDFS using standard ingest options:

  • WebHDFS: Primarily used when integrating with applications, a Web URL provides an Upload end-point into a designated HDFS folder.
  • NFS: Appears as a standard network drive and allows end-users to use standard Copy-Paste operations to move files from standard file systems into HDFS.

Once the initial set of records are moved into HDFS, subsequent scheduled events can move files containing only new Inserts and Updates.

RDBMS Processing

SQOOP is the JDBC-based utility for integrating with traditional databases. A SQOOP Import allows for the movement of data into either HDFS (a delimited format can be defined as part of the Import definition) or directly into a Hive table.

The entire source table can be moved into HDFS or Hive using the “–table” parameter

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1

After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1--check-column modified_date --incremental lastmodified --last-value {last_import_date}

Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --target-dir /user/hive/incremental_table -m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’

Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.

Step 2: Reconcile

In order to support an on-going reconciliation between current records in HIVE and new change records, two tables should be defined: base_table and incremental_table


The example below shows DDL for the Hive table “base_table” that will include any delimited files located in HDFS under the ‘/user/hive/base_table’ directory. This table will house the initial, complete record load from the source system. After the first processing run, it will house the on-going, most up-to-date set of records from the source system:

CREATE TABLE base_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
LOCATION '/user/hive/base_table';


The DDL below shows an external Hive table “incremental_table” that will include any delimited files with incremental change records, located in HDFS under the ‘/user/hive/incremental_append’ directory:

CREATE EXTERNAL TABLE incremental_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
LOCATION '/user/hive/incremental_table';


This view combines record sets from both the Base (base_table) and Change (incremental_table) tables and is reduced only to the most recent records for each unique “id”.
It is defined as follows:

CREATE VIEW reconcile_view AS
(SELECT * FROM base_table
SELECT * FROM incremental_table) t1
(SELECT id, max(modified_date) max_modified FROM
(SELECT * FROM base_table
SELECT * FROM incremental_table) t2
GROUP BY id) s
ON = AND t1.modified_date = s.max_modified;


The sample data below represents the UNION of both the base_table and incremental_table. Note, there are new updates for “id” values 1 and 2, which are found as the last two records in the table. The record for “id” 3 remains unchanged.

Screen Shot 2014-07-14 at 5.57.32 PM

The reconcile_view should only show one record for each unique “id”, based on the latest “modified_date” field value.

The resulting query from “select * from reconcile_view” shows only three records, based on both unique “id” and “modified_date”

Screen Shot 2014-07-14 at 5.57.42 PM

Step 3: Compact

The reconcile_view now contains the most up-to-date set of records and is now synchronized with changes from the RDBMS source system. For BI Reporting and Analytical tools, a reporting_table can be generated from the reconcile_view. Before creating this table, any previous instances of the table should be dropped as in the example below.


DROP TABLE reporting_table;
CREATE TABLE reporting_table AS
SELECT * FROM reconcile_view;

Moving the Reconciled View (reconcile_view) to a Reporting Table (reporting_table) reduces the amount of processing needed for reporting queries.

Further, the data stored in the Reporting Table will also be static, unchanged until the next processing cycle. This provides consistency in reporting between processing cycles. In contrast, the Reconciled View (reconcile_view) is dynamic and will change as soon as new files (holding change records) are added to or removed from the Change table (incremental_table) folder /user/hive/incremental_table.

Step 4: Purge

To prepare for the next series of incremental records from the source, replace the Base table (base_table) with only the most up-to-date records (reporting_table). Also, delete the previously imported Change record content (incremental_table) by deleting the files located in the external table location (‘/user/hive/incremental_table’).

From a HIVE client:

DROP TABLE base_table;
CREATE TABLE base_table AS
SELECT * FROM reporting_table;

From an HDFS client:
hadoop fs –rm –r /user/hive/incremental_table/*

Final Thoughts

While there are several possible approaches to supporting incremental data feeds into Hive, this example has a few key advantages:

  1. By maintaining an External Table for updates only, the table contents can be refreshed by simply adding or deleting files to that folder.
  2. The four steps in the processing cycle (Ingest, Reconcile, Compact and Purge) can be coordinated in a single OOZIE workflow. The OOZIE workflow can be a scheduled event that corresponds to the data freshness SLA (i.e. Daily, Weekly, Monthly, etc.)
  3. In addition to supporting INSERT and UPDATE synchronization, DELETES can be synchronized by adding either a DELETE_FLAG or DELETE_DATE field to the import source. Then, use this field as a filter in the Hive reporting table to hide deleted records. For example,

    CREATE VIEW reconcile_view AS
    SELECT t1.* FROM
    (SELECT * FROM base_table
    SELECT * FROM incremental_table) t1
    (SELECT id, max(modified_date) max_modified FROM
    (SELECT * FROM base_table
    SELECT * FROM incremental_table)
    GROUP BY id) s
    ON = AND t1.modified_date = s.max_modified
    AND t1.delete_date IS NULL;

In essence, this four-step strategy enables incremental updates, as we await the near-term development
of Hive support for full CRUD operations (Insert, Select, Update, Delete).

Learn More



SB says:

How you are taking care of the running jobs if they are reading the data from hive table when you are dropping table in place of reconcile table.

Greg Phillips says:

Unlike the upcoming CRUD support for HIVE, a process of this nature should be considered as a scheduled event (Oozie) that corresponds to the data freshness SLA (i.e. Daily, Weekly, Monthly, etc.). Similar to ETL jobs, this type of reconciliation should be co-ordinated for windows of time when the tables will not be accessed (e.g. nights, weekends). Sorry… not a better story around data freshness!

Note: If the table sizes and change rate were relatively small, you could simply report on the reconcile_view and never compact & purge. The obvious danger in this approach is degradation of performance over time.

Pavan Emani says:


Nice post! Have you tested these queries for performance? I see that you are doing Union between the delta table and base table twice. Would that have any impact on performance if both the tables are huge? Would it help to store the Union results in a intermediate table?

Dhananjay says:


Nice post !!

I have question about data loading time around “Step 3” & “Step 4”. Say, we have around billion records in base_table and we are doing incremental load. Since its dropping “base_table” & “reporting_table” & re-loading the entire data, how will it impact the data loading time in step 3 & 4?


Vah says:


Excellent article!!
What if the Hive table is a partitioned table and needs to be updated?
Do I put nonstrict mode an update or run separate updates on each partition?

Akbar Shaik says:

It should be left semi join not a regular join in reconcile_view

Duncan says:

Hi Greg,

This looks like a great solution for Hive managed tables. Do you have any advice around external tables? I’ll try replacing the DROP/CREATE statements in the Purge step with INSERT OVERWRITE, but I was wondering if you had any other suggestions.



Nimisha says:

we are in process of upgrading Hadoop 2.1 to 2.2

May I know what is the best method to migrate tables from user schemas/databases during upgrade?

Vedant Jain says:
Your comment is awaiting moderation.


Are you upgrading an existing cluster or moving to a new cluster?

Ratan says:

Ratan says:

It is nice article.
Hive 14 have the feature for insert and update operation. Can you please tell how they have manage that?


VK says:

Nice article.

One observation is that recoicile_vew query assumes that incremental_table always contains updated modified_date.
I case if for some ids, modified_date have same values in both tables then t1.modified_date = s.max_modified condition will be true for both (one from base table and other from incremental_table) records, therefore duplicate records will be inserted.

I can understand this should not happen ideally if -last-value value is maintained properly (preferably in in cache/rdbms), but still this need to be taken care, if there is a possibility if such cases UNION DISTINCT should be used in place of UNION ALL.

Also are there any best practices when implement this strategy in prod environment using oozie action and base table size is around 20 GB ( 20 mn records) , and incremental data would be in the range of 50-100 mb.


Slavo says:

Nice article, thanks for sharing this idea.
A couple of remarks, though: Why do you use UNION? It includes DISTINCT starting with Hive 1.2.0 as increases the costs of the query unnecessarily.
And why the JOIN? Wouldn’t it be easier and more efficient to do just:
SELECT * FROM base_table WHERE id NOT IN (SELECT id FROM incremental_table)
SELECT * FROM incremental_table
Remark: query is not tested

Adam says:

This could be a good optimization, but still need to account for multiple records possibly in the incremental table with same Id, e.g. the same entity updated yesterday and today.

chris says:

Fantastic!!!!!!!Extraordinary article with every single detail!!!! Keep it up

Anand says:

Great post. Thanks !


Siva says:

Great post, thank you so much

Saif Pathan says:

i want to import data directly from oracle sql into hive table (staging layer) , whenever i m using the hive-import command null values are being imported in hive table…Can u please help!!

Michael Young says:

Have you tried using the following:

--null-string The string to be written for a null value for string columns
--null-non-string The string to be written for a null value for non-string columns

kishore says:

Great Article

Venkat says:

It’s very helpful. The below link also show some information about sqoop import from mysql to hadoop and hadoop eco systems. kindly visit this site.

Anandh kumar says:

hive doesn’t support DML queries,
if create a ACID property to do this,
set this properties

SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;
SET hive.enforce.bucketing=true;

then create a hive table using bucketing and stored as ORC file format,
then do the update and delete queries in hive,
for more information on hive DML go this blog for step by step process

Puneet Sharma says:

Can anyone write the query to implement SCD type II in HIVE. As i can see the above article is helpful in implementing SCD type I.

Craig Curtin says:

I’m with Puneet …. looking for a solution to SCD Type 2 (retain history)
Hive or HBase would be fine, we just need a solution for historical Time Series data.

think of a ticker like YHOO
YHOO, 40.96, 2016-09-01, 3000-12-31
YHOO, 41.88, 2016-09-02, 3000-12-31
YHOO, 42.25, 2016-09-03, 3000-12-31
YHOO, 41.50, 2016-09-03, 3000-12-31
YHOO, 44.50, 2016-09-04, 2016-09-28
#end is when bought by Verizon

Craig Curtin says:

Sorry … correction to the above data

YHOO, 40.96, 2016-09-01, 2016-09-02
YHOO, 41.88, 2016-09-02, 2016-09-03
YHOO, 42.25, 2016-09-03, 2016-09-04
YHOO, 41.50, 2016-09-04, 2016-09-05
YHOO, 44.50, 2016-09-05, 2016-09-06

Anisur Rehman says:

A Best Article on incremental table update and great article ever seen on Hadoop Ecosystem.

vamsi says:

base_table is not a exernal table.How we are loading the data into base_table is not clear during first run is not clear?could you please provide input on this?we are not using any load statement or insert into statement?

I think after using below statement.Manually we have to load the data from files present in path:-/user/hive/incremental_table/incremental_table into table base_table

sqoop import –connect jdbc:teradata://{host name or ip address}/Database=retail –connection-manager org.apache.sqoop.teradata.TeradataConnManager –username dbc –password dbc –table SOURCE_TBL –target-dir /user/hive/incremental_table -m 1

dmishra says:

Can you please elaborate how to handle deletes in incremental loading. As per the above adding delete_date and comparing to Null. How is it possible without importing whole table again.
Can you please give the whole design for delete ?

sasanka ghosh says:

I do not understand how this Hadoop stack is in demand except unstructured data, log processing or one big fat append or overwrite table mode . Is it just hype .? if u have 50 TB db and to ingest 30-40 GB of data u will drop at least 15-20 TB and re-create it ?

JPM Edu Solutions says:

Great Article Thanks for Sharing.

Useful link
DevOps Training Certification in Chennai

Carter Shanklin says:

For the modern take on updates in Hive, including Type 2 SCD, look at the new ACID MERGE capabilities introduced in HDP 2.6. For more info check out

Honey B says:

Great way to explain things in a simple manner. You just solved out my problem. Thanks

Karthik says:

Can we use MERGE statement instead of this?

furqan says:

how we identify the lastmodified ?

Pramitha says:

Can we use a FULL OUTER JOIN instead of multiple UNION ALL(s)

CREATE VIEW reconcile_view AS
SELECT NVL(B.ID,A.ID) , NVL(B.field1,A.field1),
NVL(B.field1,A.field1) , NVL(B.field2,A.field2),
NVL(B.field3,A.field3), NVL(B.field4,A.field4),
NVL(B.field5,A.field5),NVL(B.modified_date ,A.modified_date )
FROM base_table A FULL OUTER JOIN incremental_table B

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums