Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
September 14, 2016
prev slideNext slide

Change Data Capture using Apache NiFi

Originally posted in HCC – Hortonworks Community Connection


  1. Download HDP Sandbox
  2. MySQL database (Should already be present in the sandbox)
  3. Nifi 0.6 or later
    ( Download and install a new version of NIFI or use Ambari to install NIFI in
    the sandbox)

MySQL setup (Source Database)

In this setup we will create
a table in MySQL tables and create a few triggers on the tables to emulate transactions.

  • These triggers will find out if the change introduced
    was an insert or an update
  • also will update the time stamp on the
    updated/inserted row. ( This is very important as Nifi Will be polling on this
    column to extract changes based on the time stamp)
unix> mysql –u root –p
mysql> create database
mysql> create user
'test_cdc'@'localhost' identified by 'test_cdc';
mysql> GRANT ALL
mysql>Flush Privileges
mysql> exit;
unix> mysql –u test_cdc –p
mysql>create table CDC_TEST
Column_A int, 
Column_B text, 
Created_date datetime,

Create Triggers in MYSQL

      mysql> create trigger CDC_insert 
       before insert on
       for each row
          NEW.created_date =NOW()
        , NEW.information = 'INSERT';
mysql> create trigger CDC_UPDATE  
        before update on 
        for each row
      NEW.created_date = NOW()
     , NEW.information = 'UPDATE';

HIVE setup (Destination Database)

In hive, we have created an
external table, with exactly same data structure as MySQL table, NIFI would be
used to capture changes from the source and insert them into the Hive table.

Using AMBARI Hive view or
from HIVE CLI create the following table in the hive default database:

I have used hive cli to
create the table:

Unix> hive   Hive> create external table
                         (   COLUMN_A int ,   
                             COLUMN_B string,  
                             CREATED_DATE string, 
                              INFORMATION string)   
stored as avro   
location '/test-nifi/CDC/'

Note: I am not including how to
create Managed Hive table with ORC format, that would be covered in a different

Nifi Setup :

This is a simple NIFI setup,
the queryDatabase table processor is only available as part of default
processors from version 0.6 of Nifi.

queryDatabaseProcessor Configuration

Its very intuitive

The main things to configure
is DBCPConnection Pool and Maximum-value Columns

Please choose this to be the
date-time stamp column that could be a cumulative change-management column

This is the only limitation
with this processor as it is not a true CDC and relies on one column. If the
data is reloaded into the column with older data the data will not be
replicated into HDFS or any other destination.

This processor does not rely
on Transactional logs or redo logs like Attunity or Oracle Goldengate. For a
complete solution for CDC please use Attunity or Oracle Goldengate solutions.

DBCPConnectionPool Configuration:

putHDFS processor

configure the Hadoop
Core-site.xml and hdfs-site.xml and destination HDFS directory in this case it
is /test-nifi/CDC

Make sure this directory is
present in HDFS otherwise create it using the following command

hadoop fs –mkdir –p /test-nifi/CDC

sure all the processors are running in NiFi

Testing CDC

a bunch of insert statements on MySQL database.

mysql –u test_cdc –p

at the mysql CLI

run the following inserts:

insert into cdc_test values (3, ‘cdc3’, null,

insert into cdc_test values (4, ‘cdc3’, null,

insert into cdc_test values (5, ‘cdc3’, null,

insert into cdc_test values (6, ‘cdc3’, null,

insert into cdc_test values (7, ‘cdc3’, null,

insert into cdc_test values (8, ‘cdc3’, null,

insert into cdc_test values (9, ‘cdc3’, null,

insert into cdc_test values (10, ‘cdc3’, null,

insert into cdc_test values (11, ‘cdc3’, null,

insert into cdc_test values (12, ‘cdc3’, null,

insert into cdc_test values (13, ‘cdc3’, null,

select * from cdc_test

go to hive using cli and
check if the records were transferred over using NIFI.

Hive> select * from




Pavel Savchenko says:

1. It becomes trend to post articles without goals defined.
2. It is not CDC – this is only select&insert in Hive. queryDatabaseProcessor does not have CDC query at all.. Call articles appropriate.
3. Purpose of triggers is vague.

Satish BOMMA says:

@Pavel: Thanks for reading my article and your feedback, I really appreciate it. With CDC, mean capturing of inserts and updates made to a RDBMS tables at source. I agree its not true CDC but a way to emulate data capture from RDBMS when changes happen in the source systems ( Also it does not capture deletes). In my article i do refer to use oracle goldengate or Attunity if you want to mine the redo logs or transaction logs.

Also the reasons for creating Triggers in mysql are

1) Triggers set up date and time stamp whenever a row is inserted or updated and NIFI processor is polling on the date and time column to pull the latest data from RDBMS into nifi to generate a flow file. Date and time field is critical.

2) Also, it helps to figure out if the record was inserted or updated in Mysql as well as in Hive. So we know the state of the record in the source system. This field is just being used for demo purpose, its not really required to set this data.

Doug S says:

I wanted to applaud you for your professionalism even though that guy was taking stabs at you. Kept your cool and even tried to help him understood. Major Respect.

Faij Ali says:


I follow this tutorial and everything works fine except that all the records have null value for all the records.

Could you please suggest me what is wrong here?


Satish Bomma says:

Sorry for the late reply, try using convertAvrotoOrc Processor and then connect that to putHDFS and it should work. with Most recent version of NiFi i had some encoding issues with avro data structures where i had NULL value issue myself. With ORC you will get the compression and performance on the data.

Robin Dong says:

Like to know if Nifi can get triggers info from RDBMS only, do we have to setup triggers for each table we need to ingest to hive?
What if I need to ingest over 100 tables from Oracle or Teradata? and assume that we dont have Oracle Goldengate.

Satish Bomma says:

You dont have to setup triggers if you have tables with timestamps or primary keys that are unique. You can use those fields to track the most recent changes and updates to be pushed down to your Hadoop instance.

Matt G says:

Hey this is great!
Just to let you know, someone stole your article as their own:

Satish Bomma says:

@Matt: I know, i saw that. that’s ok!.. as long as people are able solve their CDC problems using NiFi. I would be happy to help..

khanhvo says:

thank for your topic,
but I use an other hadoop server so how to config in putHDFS?

Satish Bomma says:

Sorry for the late reply. You could copy the core-site.xml from the hadoop on the other server to a local directory on the server that is running NiFi. You could then configure to point to that local file to be able to write to hadoop located on a different server. Also, if there are any network segmentation or sub-nets present, then routes have to be established to be able to connect to your remote hadoop. i hope this helps. Also, if you have further questions please use HCC ( hortonworks community connection) to post your questions.

Mel J says:

Is there a failover or data reprocessing example for QueryDatabaseTable processor ? If for whatever reasons the state (maximum-value column) is cleared namely the node goes down or in a scenario when there is an error in the downstream processors , I want to avoid pulling the already processed records again or have the ability to reset the maximum-value column field to the datetime i want it to be.

Mel J says:

Just to elaborate further , I would like to
set an initial state value for my columns max value, or I’d like to change
the value stored in State Management for that column, this comes up in
operational scenarios (“reload all data after this exact timestamp, that’s
when everything went bad”).
But in looking through the code, and even the rest API, I don’t see any
options to set/modify a processes state. Am I missing something? Is there
a way to set initial values like I want?

Satish Bomma says:

These are great questions. Yes you can manually reset the maximum value column to the most recently acquired value. Or you can create a custom processor to manage that. Please use HCC ( Hortonworks Community Connection), to ask questions in the NiFi SME group. Also, there are many articles on how to create custom processors to manage and persist tidemarks and process based on that.

Also, you have access to resources who contribute to Apache NiFi directly from this location.

Ishan Kumar says:

I have used your solution to pull data from Oracle DB as incremental. I am giving 2 columns one is id and another is timeStamp which can be null. Now i want to pull the records when the null values get modified to new date. But its not pulling the values… Any Idea

Sushma Onkar says:

Hello Satish, Could I use NiFi to capture changes on files rather than a database?

Satish says:

You could use the GetFile Processors to grab files. But then you would have to figure out the changes within the file using either regular expression or execute script processor ( with a script to detect changes.). You can also extend a processor to meet your custom needs. What kind of files are you looking at and what changes are you seeking to capture?

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums