Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
September 14, 2016
prev slideNext slide

Change Data Capture using Apache NiFi

Originally posted in HCC – Hortonworks Community Connection

Prerequisites

  1. Download HDP Sandbox
  2. MySQL database (Should already be present in the sandbox)
  3. Nifi 0.6 or later
    ( Download and install a new version of NIFI or use Ambari to install NIFI in
    the sandbox)

MySQL setup (Source Database)

In this setup we will create
a table in MySQL tables and create a few triggers on the tables to emulate transactions.

  • These triggers will find out if the change introduced
    was an insert or an update
  • also will update the time stamp on the
    updated/inserted row. ( This is very important as Nifi Will be polling on this
    column to extract changes based on the time stamp)
unix> mysql –u root –p
unix>Enter
password:
mysql>
mysql> create database
test_cdc;
mysql> create user
'test_cdc'@'localhost' identified by 'test_cdc';
mysql> GRANT ALL
PRIVILEGES ON *.* TO 'test_CDC'@'%' IDENTIFIED BY 'test_CDC' WITH GRANT OPTION;
mysql>Flush Privileges
mysql> exit;
unix> mysql –u test_cdc –p
test_cdc
mysql>create table CDC_TEST
(
Column_A int, 
Column_B text, 
Created_date datetime,
INFORMATION text
);

Create Triggers in MYSQL

      mysql> create trigger CDC_insert 
       before insert on
       cdc_test
       for each row
       set 
          NEW.created_date =NOW()
        , NEW.information = 'INSERT';
mysql> create trigger CDC_UPDATE  
        before update on 
        cdc_test
        for each row
    set 
      NEW.created_date = NOW()
     , NEW.information = 'UPDATE';

HIVE setup (Destination Database)

In hive, we have created an
external table, with exactly same data structure as MySQL table, NIFI would be
used to capture changes from the source and insert them into the Hive table.

Using AMBARI Hive view or
from HIVE CLI create the following table in the hive default database:

I have used hive cli to
create the table:

Unix> hive   Hive> create external table
                         HIVE_TEST_CDC   
                         (   COLUMN_A int ,   
                             COLUMN_B string,  
                             CREATED_DATE string, 
                              INFORMATION string)   
stored as avro   
location '/test-nifi/CDC/'

Note: I am not including how to
create Managed Hive table with ORC format, that would be covered in a different
article.

Nifi Setup :

This is a simple NIFI setup,
the queryDatabase table processor is only available as part of default
processors from version 0.6 of Nifi.

queryDatabaseProcessor Configuration

Its very intuitive

The main things to configure
is DBCPConnection Pool and Maximum-value Columns

Please choose this to be the
date-time stamp column that could be a cumulative change-management column

This is the only limitation
with this processor as it is not a true CDC and relies on one column. If the
data is reloaded into the column with older data the data will not be
replicated into HDFS or any other destination.

This processor does not rely
on Transactional logs or redo logs like Attunity or Oracle Goldengate. For a
complete solution for CDC please use Attunity or Oracle Goldengate solutions.

DBCPConnectionPool Configuration:

putHDFS processor

configure the Hadoop
Core-site.xml and hdfs-site.xml and destination HDFS directory in this case it
is /test-nifi/CDC

Make sure this directory is
present in HDFS otherwise create it using the following command

Unix>
hadoop fs –mkdir –p /test-nifi/CDC

Make
sure all the processors are running in NiFi


Testing CDC

Run
a bunch of insert statements on MySQL database.

mysql –u test_cdc –p

at the mysql CLI

run the following inserts:

insert into cdc_test values (3, ‘cdc3’, null,
null);

insert into cdc_test values (4, ‘cdc3’, null,
null);

insert into cdc_test values (5, ‘cdc3’, null,
null);

insert into cdc_test values (6, ‘cdc3’, null,
null);

insert into cdc_test values (7, ‘cdc3’, null,
null);

insert into cdc_test values (8, ‘cdc3’, null,
null);

insert into cdc_test values (9, ‘cdc3’, null,
null);

insert into cdc_test values (10, ‘cdc3’, null,
null);

insert into cdc_test values (11, ‘cdc3’, null,
null);

insert into cdc_test values (12, ‘cdc3’, null,
null);

insert into cdc_test values (13, ‘cdc3’, null,
null);

select * from cdc_test

go to hive using cli and
check if the records were transferred over using NIFI.

Hive> select * from
hive_test_cdc

Voila…

Tags:

Comments

  • 1. It becomes trend to post articles without goals defined.
    2. It is not CDC – this is only select&insert in Hive. queryDatabaseProcessor does not have CDC query at all.. Call articles appropriate.
    3. Purpose of triggers is vague.

  • @Pavel: Thanks for reading my article and your feedback, I really appreciate it. With CDC, mean capturing of inserts and updates made to a RDBMS tables at source. I agree its not true CDC but a way to emulate data capture from RDBMS when changes happen in the source systems ( Also it does not capture deletes). In my article i do refer to use oracle goldengate or Attunity if you want to mine the redo logs or transaction logs.

    Also the reasons for creating Triggers in mysql are

    1) Triggers set up date and time stamp whenever a row is inserted or updated and NIFI processor is polling on the date and time column to pull the latest data from RDBMS into nifi to generate a flow file. Date and time field is critical.

    2) Also, it helps to figure out if the record was inserted or updated in Mysql as well as in Hive. So we know the state of the record in the source system. This field is just being used for demo purpose, its not really required to set this data.

    • I wanted to applaud you for your professionalism even though that guy was taking stabs at you. Kept your cool and even tried to help him understood. Major Respect.

  • Hi,

    I follow this tutorial and everything works fine except that all the records have null value for all the records.

    Could you please suggest me what is wrong here?

    Thanks

    • Sorry for the late reply, try using convertAvrotoOrc Processor and then connect that to putHDFS and it should work. with Most recent version of NiFi i had some encoding issues with avro data structures where i had NULL value issue myself. With ORC you will get the compression and performance on the data.

  • Like to know if Nifi can get triggers info from RDBMS only, do we have to setup triggers for each table we need to ingest to hive?
    What if I need to ingest over 100 tables from Oracle or Teradata? and assume that we dont have Oracle Goldengate.

    • @Robert
      You dont have to setup triggers if you have tables with timestamps or primary keys that are unique. You can use those fields to track the most recent changes and updates to be pushed down to your Hadoop instance.

    • Sorry for the late reply. You could copy the core-site.xml from the hadoop on the other server to a local directory on the server that is running NiFi. You could then configure to point to that local file to be able to write to hadoop located on a different server. Also, if there are any network segmentation or sub-nets present, then routes have to be established to be able to connect to your remote hadoop. i hope this helps. Also, if you have further questions please use HCC ( hortonworks community connection) to post your questions.

  • Is there a failover or data reprocessing example for QueryDatabaseTable processor ? If for whatever reasons the state (maximum-value column) is cleared namely the node goes down or in a scenario when there is an error in the downstream processors , I want to avoid pulling the already processed records again or have the ability to reset the maximum-value column field to the datetime i want it to be.

  • Just to elaborate further , I would like to
    set an initial state value for my columns max value, or I’d like to change
    the value stored in State Management for that column, this comes up in
    operational scenarios (“reload all data after this exact timestamp, that’s
    when everything went bad”).
    But in looking through the code, and even the rest API, I don’t see any
    options to set/modify a processes state. Am I missing something? Is there
    a way to set initial values like I want?

  • These are great questions. Yes you can manually reset the maximum value column to the most recently acquired value. Or you can create a custom processor to manage that. Please use HCC ( Hortonworks Community Connection), to ask questions in the NiFi SME group. Also, there are many articles on how to create custom processors to manage and persist tidemarks and process based on that.

    https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html

    Also, you have access to resources who contribute to Apache NiFi directly from this location.

  • Leave a Reply

    Your email address will not be published. Required fields are marked *

    If you have specific technical questions, please post them in the Forums

    You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>