cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button

Realtime Event Processing in Hadoop with NiFi, Kafka and Storm

Introduction

Welcome to a three part tutorial series on real-time data ingesting and analysis.  The speed of today’s processing systems have moved from classical data warehousing batch reporting to the realm of real-time processing and analytics. The result is real-time business intelligence. Real-time means near to zero latency and access to information whenever it is required. This tutorial will show how geolocation information from trucks can be combined with sensor data from trucks and roads.  These sensors report real-time events like speeding, lane-departure, unsafe tailgating, and unsafe following distances. We will capture these events in real-time.

Pre-Requisites

  • Downloaded and Installed latest Hortonworks Sandbox
  • Learning the Ropes of the Hortonworks Sandbox
  • 8GB+ RAM (Assigning more is recommended) and preferably 4 processor cores, otherwise you may encounter errors in the third tutorial
  • Data sets used:
  • New York City Truck Routes from NYC DOT.
  • Truck Events Data generated using a custom simulator.
  • Weather Data, collected using APIs from Forcast.io.
  • Traffic Data, collected using APIs from MapQuest.

All data sets used in these tutorials are real data sets but modified to fit these use cases

Tutorial Overview

The events generated by sensors will be ingested and routed by Apache NiFi, captured through a distributed publish-subscribe messaging system named Apache Kafka. We will use Apache Storm to process this data from Kafka and eventually persist that data into HDFS and HBase.

Goals of the tutorial

  • Understand Real-time Data Analysis
  • Understand Apache NiFi Architecture
  • Create NiFi DataFlow
  • Understand Apache Kafka Architecture
  • Create Consumers in Kafka
  • Understand Apache Storm Architecture
  • Create Spouts and Bolts in Storm
  • Persist data from Storm into Hive and HBase

Outline

  1. Tutorial Introduction
  2. Pre-Requisites:
  3. Tutorial Overview
  4. Goals of the Tutorial
  5. Concepts:
  6. Get Started with HDF labs:
  7. Next Steps/Try these
  8. References

Concepts

Introduction

In this tutorial, you will strengthen your foundation of technologies used in real-time event processing. You will learn in detail how Apache Kafka sends messages, the process Apache Storm undergoes to collect that data and the process involved for HBase to read that streaming data.

Pre-Requisites

Outline

1st Concept: Apache NiFi

Introduction:

NiFi works with Apache Kafka, Apache Storm, Apache HBase and Spark for real-time distributed messaging of streaming data. NiFi is an excellent platform for ingesting real-time streaming data sources, such as the internet of things, sensors and transactional systems. If the data that comes in is garbage data, NiFi offers tools to filter out the undesired data. Additionally, NiFi can also act as a messenger and send data to the desired location.

Goals of this module:

  • Understand how Apache NiFi works

How NiFi Works

NiFi’s system design can be thought of as an Automated Teller Machine, where incoming data is securely processed and written sequentially to disk. There are four main components involved in moving data in and out of NiFi:

  • FlowFile
  • Processor
  • Connections
  • Flow Controller

Image of NiFi Flow

In NiFi, a FlowFile is data brought into the flow from any data source and moves through the dataflow. Connections are linkages between components that enable FlowFiles to move throughout the dataflow. A Flow Controller regulates the exchange of FlowFiles between processors. Processors are actions taken on the FlowFiles to process their content and attributes to ensure desired data moves throughout the dataflow to eventually be stored at a secure location. Therefore, NiFi acts as a Producer to publish messages to one or more topics. So, at a high level, producers send messages over the network to the Kafka cluster.

2nd Concept: Apache kafka

Introduction:

In a modern data architecture built on YARN-enabled Apache Hadoop, Kafka works in combination with Apache Storm, Apache HBase and Apache Spark for real-time distributed messaging of streaming data. Kafka is an excellent low latency messaging platform for real-time streaming data sources, such as the internet of things, sensors, and transactional systems. Whatever the industry or use case, Kafka brokers massive message streams for low-latency analysis in Enterprise Apache Hadoop.
Kafka is fully supported and included in HDP today.

Goals of this module:

  • Understand Apache Kafka Architecture
  • Understand how Apache Kafka works

What Kafka Does

Apache Kafka supports a wide range of use cases as a general-purpose messaging system for scenarios where high throughput, reliable delivery, and horizontal scalability are important. Apache Storm and Apache Spark both work very well in combination with Kafka. Common use cases include:

  • Stream Processing
  • Website Activity Tracking
  • Metrics Collection and Monitoring
  • Log Aggregation
    Some of the important characteristics that make Kafka such an attractive option for these use cases include the following:
Feature Description
Scalability Distributed messaging system scales easily with no downtime
Durability Persists messages on disk, and provides intra-cluster replication
Reliability Replicates data, supports multiple consumers, and automatically balances consumers in case of failure
Performance High throughput for both publishing and subscribing, with disk structures that provide constant performance even with many terabytes of stored messages

How Kafka Works

Kafka’s system design can be thought of as that of a distributed commit log, where incoming data is written sequentially to disk. There are four main components involved in moving data in and out of Kafka:

  • Topics
  • Producers
  • Consumers
  • Brokers

Image of Kafka Flow

In Kafka, a Topic is a user-defined category to which messages are published. NiFi will act in the role of Producers to publish messages to one or more topics and Consumers subscribe to topics and process the published messages. At a high level, producers send messages over the network to the Kafka cluster, which in turn serves them up to consumers. Finally, a Kafka cluster consists of one or more servers, called Brokers that manage the persistence and replication of message data (i.e. the commit log).

Image of Kafka Partitions

One of the keys to Kafka’s high performance is the simplicity of the brokers’ responsibilities. In Kafka, topics consist of one or more Partitions that are ordered, immutable sequences of messages. Since writes to a partition are sequential, this design greatly reduces the number of hard disk seeks (with their resulting latency).
Another factor contributing to Kafka’s performance and scalability is the fact that Kafka brokers are not responsible for keeping track of what messages have been consumed – that responsibility falls on the consumer. In traditional messaging systems, such as JMS, the broker bore this responsibility, severely limiting the system’s ability to scale as the number of consumers increased.

Image of Brokers w/ Zookeeper

For Kafka consumers, keeping track of which messages have been consumed (processed) is simply a matter of keeping track of an Offset, which is a sequential id number that uniquely identifies a message within a partition. Because Kafka retains all messages on disk (for a configurable amount of time), consumers can rewind or skip to any point in a partition simply by supplying an offset value. Finally, this design eliminates the potential for back-pressure when consumers process messages at different rates.

2nd Concept: Apache Storm

Introduction:

Apache Storm is a distributed real-time computation system for processing large volumes of high-velocity data in parallel and at scale. Storm is to realtime data processing as Apache Hadoop and MapReduce are to batch data processing. With its simple programming interface, Storm allows application developers to write applications that analyze streams of tuples of data; a tuple may can contain object of any type.

At the core of Storm’s data stream processing is a computational topology, which is discussed below. This topology of nodes dictates how tuples are processed, transformed,aggregated, stored, or re-emitted to other nodes in the topology for further processing.

Storm on Apache Hadoop YARN

Storm on YARN is powerful for scenarios requiring continuous analytics, real-time predictions, and continuous monitoring of operations. Eliminating a need to have dedicated silos, enterprises using Storm on YARN benefit on cost savings (by accessing the same datasets as other engines and applications on the same cluster) and on security, data governance, and operations (by employing the same compute resources managed by YARN.

Storm in the Enterprise

Some of the specific new business opportunities include: real-time customer service management, data monetization, operational dashboards, or cyber security analytics and threat detection.

Storm is extremely fast, with the ability to process over a million records per second per node on a cluster of modest size. Enterprises harness this speed and combine it with other data access applications in Hadoop to prevent undesirable events or to optimize positive outcomes.

Here are some typical “prevent” and “optimize” use cases for Storm.

“Prevent” Use Cases “Optimize” Use Cases
Financial Services Securities fraud, Operational risks & compliance violations Order routing, Pricing
Telecom Security breaches, Network outages Bandwidth allocation, Customer service
Retail Shrinkage, Stock outs Offers, Pricing
Manufacturing Preventative maintenance, Quality assurance Supply chain optimization, Reduced plant downtime
Transportation Driver monitoring, Predictive maintenance Routes, Pricing
Web Application failures, Operational issues Personalized content

Now with Storm on YARN, a Hadoop cluster can efficiently process a full range of workloads from real-time to interactive to batch. Storm is simple, and developers can write Storm topologies using any programming language.
Five characteristics make Storm ideal for real-time data processing workloads. Storm is:

  • Fast – benchmarked as processing one million 100 byte messages per second per node
  • Scalable – with parallel calculations that run across a cluster of machines
  • Fault-tolerant – when workers die, Storm will automatically restart them. If a node dies, the worker will be restarted on another node.
  • Reliable – Storm guarantees that each unit of data (tuple) will be processed at least once or exactly once. Messages are only replayed when there are failures.
    Easy to operate – standard configurations are suitable for production on day one. Once deployed, Storm is easy to operate.

How Storm Works

Storm Cluster Components
A storm cluster has three sets of nodes:

  • Nimbus node (master node, similar to the Hadoop JobTracker):
    • Uploads computations for execution
    • Distributes code across the cluster
    • Launches workers across the cluster
    • Monitors computation and reallocates workers as needed
  • ZooKeeper nodes – coordinates the Storm cluster
  • Supervisor nodes – communicates with Nimbus through Zookeeper, starts and stops workers according to signals from Nimbus

Image of Storm-Zookeeper

Five key abstractions help to understand how Storm processes data:

  • Tuples– an ordered list of elements. For example, a “4-tuple” might be (7, 1, 3, 7)
  • Streams – an unbounded sequence of tuples.
  • Spouts –sources of streams in a computation (e.g. a Twitter API)
  • Bolts – process input streams and produce output streams. They can run functions, filter, aggregate, or join data, or talk to databases.
  • Topologies – the overall calculation, represented visually as a network of spouts and bolts (as in the following diagram)

Img of Spouts and Bolts

Storm users define topologies for how to process the data when it comes streaming in from the spout. When the data comes in, it is processed and the results are passed onto to other bolts or stored in Hadoop.

Learn more about how the community is working to integrate Storm with Hadoop and improve its readiness for the enterprise.

Storm Topologies

A Storm cluster is similar to a Hadoop cluster. Whereas on Hadoop you run “MapReduce jobs,” on Storm you run “topologies.” “Jobs” and “topologies” are different – one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called “Nimbus” that is similar to Hadoop’s “JobTracker”. Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the “Supervisor.” It listens for work assigned to its machine and starts and stops worker processes as dictated by Nimbus. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk. This means you can kill -9 Nimbus or the Supervisors and they’ll start back up like nothing happened. Hence, Storm clusters are stable and fault-tolerant

Streams Within Storm Topologies

The core abstraction in Storm is the “stream.” It is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are “spouts” and “bolts.” Spouts and bolts have interfaces that you, as an application developer, implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kafka Topics and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt consumes any number of input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more.

Networks of spouts and bolts are packaged into a “topology,” which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Another Img of spouts and bolts

Links between nodes in your topology indicate how tuples should be passed around. For example, if there is a link between Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C, then every time Spout A emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt B’s output tuples will go to Bolt C as well.

Each node in a Storm topology executes in parallel. In your topology, you can specify how much parallelism you want for each node, and then Storm will spawn that number of threads across the cluster to do the execution.

A topology runs forever, or until you kill it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if machines go down and messages are dropped.

3rd Concept: Kafka on Storm

Introduction:

Hortonworks Data Platform’s YARN-based architecture enables multiple applications to share a common cluster and data set while ensuring consistent levels of response made possible by a centralized architecture. Hortonworks led the efforts to on-board open source data processing engines, such as Apache Hive, HBase, Accumulo, Spark,Storm and others, on Apache Hadoop YARN.

In this tutorial, we will focus on one of those data processing engines—Apache Storm—and its relationship with Apache Kafka. I will describe how Storm and Kafka form a multi-stage event processing pipeline, discuss some use cases, and explain Storm topologies.

Goals of this tutorial:

  • Understand Relationship between Apache Kafka and Apache Storm
  • Understand Storm topologies

Kafka on Storm:

An oil refinery takes crude oil, distills it, processes it and refines it into useful finished products such as the gas that we buy at the pump. We can think of Storm with Kafka as a similar refinery, but data is the input. A real-time data refinery converts raw streaming data into finished data products, enabling new use cases and innovative business models for the modern enterprise.

Apache Storm is a distributed real-time computation engine that reliably processes unbounded streams of data. While Storm processes stream data at scale, Apache Kafka processes messages at scale. Kafka is a distributed pub-sub real-time messaging system that provides strong durability and fault tolerance guarantees.
Storm and Kafka naturally complement each other, and their powerful cooperation enables real-time streaming analytics for fast-moving big data. HDP 2.4 contains the results of Hortonworks’ continuing focus on making the Storm-Kafka union even more powerful for stream processing.

Img of Cluster Layout

Conceptual Reference Architecture for Real-Time Processing in HDP 2.2

Image of Event Processing Pipeline

Conceptual Introduction to the Event Processing Pipeline

In an event processing pipeline, we can view each stage as a purpose-built step that performs some real-time processing against upstream event streams for downstream analysis. This produces increasingly richer event streams, as data flows through the pipeline:

  • raw events stream from many sources,
  • those are processed to create events of interest, and
  • events of interest are analyzed to detect significant events.

Here are some typical uses for this event processing pipeline:

  • a. High Speed Filtering and Pattern Matching
  • b. Contextual Enrichment on the Fly
  • c. Real-time KPIs, Statistical Analytics, Baselining and Notification
  • d. Predictive Analytics
  • e. Actions and Decisions

Build the Data Refinery with Topologies

To perform real-time computation on Storm, we create “topologies.” A topology is a graph of a computation, containing a network of nodes called “Spouts” and “Bolts.” In a Storm topology, a Spout is the source of data streams and a Bolt holds the business logic for analyzing and processing those streams.

Storm and Kafka Pipelines

Hortonworks’ focus for Apache Storm and Kafka has been to make it easier for developers to ingest and publish data streams from Storm topologies. The first topology ingests raw data streams from Kafka and fans out to HDFS, which serves as persistent store for raw events. Next, a filter Bolt emits the enriched event to a downstream Kafka Bolt that publishes it to a Kafka Topic. As events flow through these stages, the system can keep track of data lineage that allows drill-down from aggregated events to its constituents and can be used for forensic analysis. In a multi-stage pipeline architecture, providing right cluster resources to most intense part of the data processing stages is very critical, an “Isolation Scheduler” in Storm provides the ability to easily and safely share a cluster among many topologies.
In summary, refinery style data processing architecture enables you to:

  • Incrementally add more topologies/use cases
  • Tap into raw or refined data streams at any stage of the processing
  • Modularize your key cluster resources to most intense processing phase of the pipeline

Further Reading

Tutorial 0: Set Up Simulator, Apache Services and IDE Environment

Introduction

In this tutorial, we are going to set up all the services required to run the Connected Car Application(Trucking Demo). We will install NiFi onto our Hortonworks Sandbox while activating Kafka and Storm for later use in the tutorial series. We will also walkthrough how to set up an IDE on our local machine for Storm development testing and deploy our Storm project to the Sandbox for further testing.

Pre-Requisites

  • Downloaded and Installed latest Hortonworks Sandbox
  • If you are new to the sandbox shell, refer to Learning the Ropes of the Hortonworks Sandbox
  • We recommend you have some experience with Java Concepts, Syntax and OOP, refer to this Intro to Java Programming Course if you are interested in building a strong foundation.
  • Memory must be at least 8GB RAM, preferably 4 processor cores, else errors may occur in fourth tutorial
  • For windows users, to run linux terminal commands in these tutorials, download Git Bash.
  • Read Table 1 to figure out some basic details about your sandbox

Table 1: Virtual Machine Information

Parameter Value (VirtualBox) Value(VMware) Value(MS Azure)
Host Name 127.0.0.1 172.16.110.129 23.99.9.233
Port 2222 2222 22
Terminal Username root root {username-of-azure}
Terminal Password hadoop hadoop {password-of-azure}

Note: Host Name values are unique for VMware & Azure Sandbox compared to the table. For VMware and VirtualBox, Host Name is located on welcome screen. For Azure, Host Name is located under Public IP Address on Sandbox Dashboard. For Azure users, the terminal username and password is one you created while deploying the sandbox on azure. For VMware and VirtualBox users, terminal password changes after first login.

  • Added sandbox.hortonworks.com to your /private/etc/hosts file (mac and linux users)
  • Added sandbox.hortonworks.com to your /c/Windows/System32/Drivers/etc/hosts file (windows 7 users)

The following terminal commands in the tutorial instructions are performed in VirtualBox Sandbox and Mac machine. For windows users, to run the following terminal commands, download Git Bash.

If on mac or linux, to add sandbox.hortonworks.com to your list of hosts, open the terminal, enter the following command, replace {Host-Name} with the appropriate host for your sandbox:

echo '{Host-Name} sandbox.hortonworks.com' | sudo tee -a /private/etc/hosts

If on windows 7, to add sandbox.hortonworks.com to your list of hosts, open git bash, enter the following command, replace {Host-Name} with the appropriate host for your sandbox:

echo '{Host-Name} sandbox.hortonworks.com' | tee -a /c/Windows/System32/Drivers/etc/hosts

changing-hosts-file.png

Outline

Section 1: Overview

  • Install and Activate NiFi By Ambari Install Wizard

Section 1: Setup NiFi Environment

Step 1: Install NiFi

NiFi will be installed into the Ambari Stack of the Hortonworks Sandbox VirtualBox image because it
will be used to activate the truck events simulator and transport data to Kafka.

1. If you do not have NiFi installed on your sandbox, refer to Section 2: Setup NiFi on Sandbox by Ambari Wizard from Tutorial 0: Download, Install, and Start NiFi of
Learning the Ropes of Apache NiFi for step-by-step instructions.

Step 2: Start NiFi

1. To activate the NiFi service, refer to Section 2: Start NiFi via Ambari Service from Tutorial 0: Set Up NiFi Environment of
Learning the Ropes of Apache NiFi for step-by-step instructions.

Section 2: Overview

  • Activate Kafka and Configure with Zookeeper

Section 2: Setup Kafka Service

We need to setup Kafka because it will be used as secure cluster or the location
where NiFi transports the data. Storm will pull that data from the cluster and
push it into it’s topology(dataflow).

Step 1: Start Kafka

1.1 Access Ambari

If you haven’t reset your Ambari admin password, refer to Section 2.2 SETUP AMBARI ADMIN PASSWORD MANUALLY from Learning the Ropes of the Hortonworks Sandbox. Login to Ambari to activate Kafka. Enter the URL in your browser http://sandbox.hortonworks.com:8080

login_page_ambari

Note: username for admin is admin. Password is the password you defined.

1.2 Use Ambari to Activate Kafka

1. Click on Kafka located in the left sidebar list of installed services. (If Kafka is not installed, refer to Appendix A: Install Kafka instructions.):

kafka_service_on_off

2. Click on Service Actions -> Start located at the top left of the Kafka Services Page:

3. Check the Turn off Maintenance Mode for Kafka box and click on Confirm Start:

Screen Shot 2015-06-04 at 3.06.10 PM.png

Wait for Kafka to start.

Step 2: Configure Kafka with Zookeeper

ZooKeeper is the coordination interface between the Kafka broker and consumers:

Single Broker based Kakfa cluster

The important Zookeeper properties can be checked in Ambari.

2.1 Configure ZooKeeper

Click on ZooKeeper in the list of services, then open the Configs tab. Verify ZooKeeper runs on port 2181:

zookeeper_port_config

If this port 2181 is busy or consumed by other processes, you can change the default port number of ZooKeeper to any other valid port number. If ZooKeeper is not running, you can start the Zookeeper service from Ambari:

zookeeper_start_service_iot

2.2 Configure Kafka

From the Kafka page, click on the Configs tab. Verify the zookeeper.connect property points to your ZooKeeper server name and port:

Section 3: Overview

  • Activate Storm & HBase

Section 3: Setup Storm & HBase Services

We need to setup the Storm & HBase services because storm will deploy a topology
that is configured to send data to HBase and HBase is used to create the tables
that storm populates.

Step 1: Start Storm & HBase

1. View the HBase Services page

From the previous tutorials: HDFS, Hive, YARN and Kafka should already be running but HBase may be down. From the Dashboard page of Ambari, click on HBase from the list of installed services.

hbase_service_on_off_iot

1.1 Setup HBase

2. Start HBase

From the HBase page, click on Service Actions -> Start

start_hbase_service_iot

Check the box and click on Confirm Start:

confirm_hbase_start_iot

Wait for HBase to start (It may take a few minutes to turn green)

hbase_started_iot

1.2 Setup Storm

3. Start Storm the same way we started HBase in the previous steps. We will need it later for streaming real-time event data.

storm_service_on_off_iot

4. After starting storm, a green check symbol will be present:

storm_service_started_iot

Now that we have storm activated, we need to download a storm demo project for
later when we use Storm’s Visualization feature.

5. Let’s SSH into the sandbox by shell:

ssh root@sandbox.hortonworks.com -p 2222

6. We will download the demo project because it contains the necessary
code and configuration properties required for storm to deploy the topology.

cd ~
git clone -b hdp25experiment https://github.com/james94/iot-truck-streaming

7. Navigate to the iot-truck-streaming folder, which is the location where the
appropriate storm configuration files reside for configuring our topology.

cd ~/iot-truck-streaming

8. Since we are at the base of our project, let’s export our storm demo
configurations. We will create a new folder storm_demo as the new location
where storm will look for instructions on configuring the storm topology:

sudo mkdir /etc/storm_demo
sudo cp config.properties /etc/storm_demo/
sudo cp -R storm-demo-webapp/routes/ /etc/storm_demo/

Note: Storm will refer to the properties in the config.properties file to configure the topology.

You can use the Ambari dashboard to check status of other components too. If HDFS, Hive, YARN are down, you can start them in the same way: by selecting the service and then using the Service Actions to start it. The remaining components do not have to be up. (Oozie can be stopped to save memory, as it is not needed for this tutorial)

Section 4: Overview

  • Understand the Stream Simulator Functionality
  • Run the Simulator By Shell

Section 4: Run the Simulator by Terminal

The simulator must be setup in order for NiFi to pull data from it and push
that data into the dataflow.

Stream Simulator

The stream simulator is a lightweight framework that generates truck event data. The simulator uses New York City Truck Routes (kml) which defines driver road paths with Latitude and Longitude information.

The simulator uses Akka to simplify concurrency, messaging and inheritance. It has two Plain Old Java Objects (POJOS), one for Trucks and another for Drivers that generate the events. Consequently, the AbstractEventEmitter class becomes extended while the onReceive method generates events, creates new Actors, sends messages to Actors and delivers those events to an EventCollector class. This class’s purpose is to collect events generated from the domain objects and print them to standard output. Let’s run the simulator through the terminal and see the events that are generated.

Step 1: Run the Simulator By shell

Before we run the simulator, let’s install and download the simulator. For VMware and Azure users, refer to Table 1 for more details on using ssh for that environment. For windows users, download Git Bash to run the terminal commands.

1.1 Setup The Simulator

1. If you have not already ssh into sandbox shell, type the command to access the sandbox by shell:

ssh root@sandbox.hortonworks.com -p 2222

2. Install Apache Maven. We will use it to compile the simulator code,
so we can activate the simulator by shell or NiFi later. Execute the command:

cd ~/iot-truck-streaming
./setup/bin/install_maven.sh

Note: You will be prompted to allow maven to install, type ‘y’ for yes

After your maven package installs, you should obtain the message: Complete!

3. For maven to run, it needs to detect the pom.xml file. Rename pom25.xml to pom.xml, copy/paste the commands:

mv -f storm-streaming/pom25.xml storm-streaming/pom.xml

/usr/maven/bin/mvn clean package -DskipTests

Note: You should receive that all sub projects compiled successfully.

1.2 Run The Simulator

1. To test the simulator, run generate.sh script.

cd stream-simulator
chmod 750 *.sh
./generate.sh

Note: press ctrl+c stop the simulator

You should see message data generated. The data in the image includes logs as can be seen in the top portion and truck events bottom portion. We will use NiFi to separate this data.

generate_sh_data

Note: generate.sh runs java source code located at iot-truck-streaming/stream-simulator/src/main/java/com/hortonworks/streaming/impl/collectors/StdOutEventCollector.java. If you would like to see modify/run the code.

Section 5: Overview

  • Setup Intellij For Hadoop Related Development
  • Run Topologies On Sandbox that were built on Local Machine

Section 5: Setup Intellij IDE Locally and Run Topologies on Sandbox

We will use Intellij as our editor to write and change storm code on our local computer and then send our storm project to the sandbox.

Step 1: Install Intellij Locally

If you have not installed Intellij, refer to JetBrains Installing and Launching instructions.

Step 2: Download Trucking Demo for Development with IDE

Earlier we cloned the iot-truck-streaming project from github onto our sandbox. Now we will clone it onto our local machine.

1. Perform the git clone command on the local machine. Feel free to clone it in any directory, just remember the location. In the tutorial, let’s clone it in our Documents folder.

cd ~/Documents
git clone -b hdp25experiment https://github.com/james94/iot-truck-streaming.git

Note: You may receive an error if you don’t have git installed on your local machine.

Step 3: Open Trucking Demo in Intellij

1. Open Intellij. On the welcome screen, click on the Open button. We will open our iot-truck-streaming project.

welcome_button_intellij

2. Select iot-truck-streaming project.

open_storm_project

3. Intellij will display the project in its IDE.

storm_project_not_recognized_initial

Step 4: Configure Intellij To Recognize Maven Project

You’ll notice, the image icons next to the code files have small red circles on them. Intellij does not recognize the Trucking Demo is a Maven Project. The solution we will use is to run mvn clean package from the command line and cause Intellij to warn us that Maven Projects need to be imported to be recognized by the IDE. When the Enable Auto-Import box appears in Intellij, we will activate for Maven projects.

1. Let’s begin the process to run maven against our Trucking Demo Project.

4.1 Specify pom.xml for Maven

  • For maven to work, we have to tell it which pom file to use from the storm-streaming directory. Let’s rename pom25 to pom using shell.
mv iot-truck-streaming/storm-streaming/pom25.xml iot-truck-streaming/storm-streaming/pom.xml

4.2 Compile and Package Demo Files into Jar

  • Now that we specified the pom for maven, it knows how to build the project. Let’s execute the command:
cd iot-truck-streaming
mvn clean package -DskipTests

Output should show success for each sub project within the overall project.

maven_success_for_IDE_storm_project

Apache Maven command:

mvn clean deletes everything in the target folder. For example, if this was the second time you ran the mvn command from iot-truck-streaming folder, the storm-streaming folder as well as other folders that contain pom.xml files will have their target folders impacted. Clean part of the command removes the old target folder, while the package part of the command compiles the code and packages it into jar files according to the pom file.

Note: packaging may take around 9 minutes. Add -DskipTests to the end of mvn command to speed up process.

Why is Maven important for development testing?

It enables developers to test their large software projects that contain many
java files at fast speeds.

4.3 Instruct Intellij to Enable Auto-Import for Maven Projects

1. Switch to the Intellij IDE to see if it recognizes the Maven Project.

2. As you run maven, you will see in Intellij that a box in the top right corner appears and states Maven projects need to be imported. Click on Enable Auto-Import.

enable_auto_import_maven

4.4 Intellij Recognizes Trucking Demo is Maven Project

  • Notice that the icons next to the code files changed to blue circles.

code_files_recognized_unlock_symbol

Note: If you want to make commits to github from Intellij, feel free to try out Intellij’s git feature, else ignore the “Add Files to Git” Window if it appears.

4.5 Intellij Setup To Develop Hadoop Projects Locally

1. Now we can develop our storm code from the storm-streaming folder directly on our local machine. Feel free to explore the code, modify or add bolts, spouts, or topology.

develop_hadoop_projects_locally

2. Once you’ve added or modified the code, we can run mvn command used earlier to package our storm project into a jar.

cd ~/Documents/iot-truck-streaming
mvn clean package -DskipTests

Note: If you want to add an enhancement to the demo, all you need to do is re-execute the above steps and the new modifications will be added to the jar.

3. As you will see, mvn generates a target folder in every sub project folder; for instance, let’s view the project we are working on, storm-streaming, it’s target folder contains:

ls -ltr storm-streaming/target

list_files_target_folder

Notice the target folder contains storm-streaming-1.0-SNAPSHOT.jar file. This jar is a collection of java classes for our storm project. When you add an enhancement to the demo and maven is executed, the jar file will be removed and replaced with a new version.

4. Let’s send our jar to the sandbox for later use in tutorial 3.

scp -P 2222 ~/Documents/iot-truck-streaming/storm-streaming/target/storm-streaming-1.0-SNAPSHOT.jar root@sandbox.hortonworks.com:/root/iot-truck-streaming/storm-streaming/target

Note: Each time we update the demo, we have to transport the latest jar file to the sandbox.

Summary

Since we can build jars on our local machine and have an instance of the iot-truck-streaming project on our sandbox with the appropriate configurations, we can make changes to our code locally, and then send the jar to any directory on our sandbox. In our case, we will send it to storm-streaming/target folder. This approach makes it easy to develop storm topologies locally, and test it on an HDP environment. Therefore, we can test if our code performs as expected by viewing the topology visually through the Storm View’s Visualization. For example, did we connect our spouts and bolts properly. We can also use HBase to view if Storm sending the proper data to the tables. We will perform these tests in tutorial 3

Appendix A: Install Kafka

Follow these steps if your version of the Sandbox does not have Kafka installed:

1. From the Ambari Dashboard, select Actions -> Add Service:

2. Select Kafka from the list of Services and click Next:

3. Keep clicking Next with the selected defaults until you reach the following screen:

4. Set the value of logs.dir to  /tmp/kafka-logs

5. Click the Deploy button:

6. Wait for Kafka to install:

7. After Kafka is installed, you may be asked to restart some dependent Services. Please select the appropriate Services and click Restart.

Appendix B: Update iot-truck-streaming Project

  • Copy /etc/hbase/conf/hbase-site.xml to src/main/resources/ directory
[root@sandbox ~]# cd /iot-truck-streaming
[root@sandbox ~]# cp /etc/hbase/conf/hbase-site.xml src/main/resources/
  • Check pom.xml to ensure it includes the below dependencies (check after line 104)
    <dependency>
      <groupId>xerces</groupId>
      <artifactId>xercesImpl</artifactId>
      <version>2.9.1</version>
    </dependency>

    <dependency>
      <groupId>xalan</groupId>
      <artifactId>xalan</artifactId>
      <version>2.7.1</version>
    </dependency>

    <dependency>
      <groupId>org.htrace</groupId>
      <artifactId>htrace-core</artifactId>
      <version>3.0.4</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.0</version>
    </dependency>
  • recompile the Maven project. This may run for 10+ min
[root@sandbox ~]# mvn clean package

The maven build should succeed.

Appendix C: Enable remote desktop on sandbox and set up Storm topology as Eclipse project

  1. Setup Ambari VNC service on the sandbox to enable remote desktop via VNC and install eclipse using steps here https://github.com/hortonworks-gallery/ambari-vnc-service#setup-vnc-service
  2. Import code as Eclipse project using steps here:

https://github.com/hortonworks-gallery/ambari-vnc-service#getting-started-with-storm-and-maven-in-eclipse-environment

Further Reading

Refer to Readings below if you want to learn best practices for installing and configuring the NiFi, Kafka, Storm, Simulator, IDE Environments

Tutorial 1: Ingest, Route and Land Real Time Events with Apache NiFi

Introduction

Apache NiFi can collect and transport data from numerous sources and provide interactive command and control of live flows with full and automated data provenance. We will install NiFi onto our Hortonworks Sandbox and become familiar with the NiFi Web Interface. We will create a flow of data using Hortonworks DataFlow to activate the truck stream simulator to generate truck data, remove the log data, extract the live truck events data and store the events into a file. We will use a file to verify that the correct data is being inserted into the file.

Pre-Requisites

Outline

Tutorial Overview

  • Understand the Stream Simulator
  • Run Stream Simulator From Terminal
  • Create NiFi DataFlow to Generate and Store Truck events

Apache NiFi

Apache NiFi is an open source tool for automating and managing the flow of data between systems. To create an effective dataflow, users must understand the various types of processors nifi_processor_mini. This tool is the most important building block available to NiFi because it enables NiFi to perform:

  • Data Transformation
  • Routing and Mediation
  • Database Access
  • Attribute Extraction
  • System Interaction
  • Data Ingestion
  • Data Egress/Sending Data
  • Splitting and Aggregation
  • HTTP
  • Amazon Web Services

NiFi is designed to help tackle modern dataflow challenges, such as system failure, data access exceeds capacity to consume, boundary conditions are mere suggestions, systems evolve at different rates, compliance and security.

Step 1: Explore NiFi Web Interface

NiFi’s web interface consists of 5 components to build data flows: The components toolbar, the actions toolbar, the management toolbar, the search bar and the help button. View the image below for a visualization of the user interface for orchestrating a dataflow. Near the top of the UI are multiple toolbars essential for building dataflows.

nifi_dataflow_html_interface

Step 2: Understand NiFi DataFlow Build Process

We can begin to build a data flow by adding, configuring and connecting the processors. We will also troubleshoot common problems that occur when creating data flows.
By the end of the IoT Tutorial Series, you will have built the following dataflow. Refer back to this image if you want to replicate the processors positions on the graph:

dataflow_withKafka_running_iot

Figure 1: This IoT_Lab_Series_DataFlow.xml dataflow performs System Interaction, Splitting and Aggregation, Attribute Extraction, Routing and Mediation and Data Egress/Sending Data.

2.1 Download, Import and Drop the Template onto the Graph (Optional)

If you want to view and run the dataflow from the template, follow the steps below, else skip these two steps and move forward.

1. To open the template xml in NiFi, hover over to the management toolbar and click on the template icon template_icon_nifi_iot. Click on the Browse button and find the dataflow xml file that you downloaded and click open. The template should appear in your NiFi Flow Templates spreadsheet.

2. To display your dataflow template xml onto the screen, drag the template icon from the components toolbar onto the graph. The dataflow should appear as in the dataflow image above, except this dataflow will be missing the PutKafka processor. We will add it in the next tutorial.

2.2 Overiew of Processors in NiFi DataFlow

Eight processors are needed to ingest, filter and store live stream data relating to truck events into your dataflow. Each processor holds a critical role in transporting the enriched data to a destination:

ExecuteProcess Runs the operating system command to activate the stream simulator and the StdOut is redirected such that the content is written to StdOut becomes the content of the outbound FlowFile.

SplitText takes in one FlowFile whose content is textual and splits it into 1 or more FlowFiles based on the configured number of lines. Each FlowFile is 1 line.

UpdateAttribute updates each FlowFile with a unique attribute name. ${UUID()}.

RouteOnContent search content of the FlowFile to see if it matches the regex regular expression, such as (Normal), (Overspeed), (“Lane Departure”). The expressions are driving event keywords. If so, the Flowfile is routed to the processor with the configured relationship.

MergeContent(x2) merges many FlowFiles into a single FlowFile. In the tutorial, each FlowFile is merged by concatenating their content together. There will be two processors: one of them will merge all the truck events into a single FlowFile while the other merges all the log data together.

PutFile(x2) writes the contents of a FlowFile to a directory on a local file system. There will be two processors: one that writes the filtered logs data to log_data folder. The other that writes the filtered truck event data to truck_events folder.

2.3 Troubleshoot Common Processor Issues

You will notice each time you add a new processor, it will have a warning symbol warning_symbol_nifi_iot in the upper left corner of the processor face. These warning symbols indicate the processors are invalid.

1. Hover over one of the processors to troubleshoot the issue. This message informs us of the requirements to make a processor valid, such as the ExecuteProcess.

warning_message_executeprocess_nifi_iot

The warning message indicates: we need to enter a command into the value field of the property command since it is empty and connect this processor to another component to establish a relationship.
Each Processor will have its own alert message. Let’s configure and connect each processor to remove all the warning messages, so we can have a complete data flow.

2.4 Add, Configure & Connect processors

We will build our NiFi DataFlow by adding, configuring and connecting processors. When adding processors, you have three ways to find your desired processor from the Add Processor window: Tags section left of the table, Processor List located in the table and filter bar positioned above the table. After we add our processor, we can configure it from the Configure Processor window using the 4 tabs: Settings, Scheduling, Properties and Commands. For this tutorial, we will spend most of our time in the properties tab. The properties in bold must contain default or updated values for the processor to run. If you are curious to learn more about a specific property, hover over the help icon next to the Property Name to read a description on that property. Every processor has a relationship on how it transfers data to the next processor, another word for this is connection. Relationships affect how data is transferred between processors. For instance, you can have a split relationship that when true transfer a bunch of FlowFiles that were split from one large FlowFile to the next processor.

If you would like to read more about configuring and connecting processors, refer to Hortonworks Apache NiFi User Guide, Building a DataFlow: section 6.2 and 6.5.

Step 3: Build Stream Simulator DataFlow Section

3.1 ExecuteProcess

1. Drag and drop the processor icon onto the graph. Add the ExecuteProcess processor. Click the Add button.

2. Right click on ExecuteProcess processor and click the Configure button. Move to the Properties tab. Add the properties listed in Table 1 to the processor’s appropriate properties and if their original properties already have values, update them.

Table 1: Update ExecuteProcess Property Values

Property Value
Command sh
Command Arguments /root/iot-truck-streaming/stream-simulator/generate.sh
Batch Duration 10 sec

Command instructs processor on what type of command to run
Command Arguments inform processor which particular directory to look for script files
Batch Duration instructs processor to run a task every 10 seconds

executeProcess_properties_config

Figure 1: ExecuteProcess Configuration Property Tab Window

3. Move to the Scheduling tab. Modify the Run Schedule field from 0 sec to 1 sec, which makes the processor every 1 second. Click Apply.

3.2 SplitText

1. Add the SplitText processor below ExecuteProcess. Connect ExecuteProcess to SplitText processor. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

2. Open SplitText Properties Tab, add the properties listed in Table 2 to the processor’s appropriate properties and if their original properties already have values, update them.

Table 2: Update SplitText Property Values

Property Value
Line Split Count 1
Remove Trailing Newlines false

Line Split Count adds 1 line to each split FlowFile
Remove Trailing Newlines controls whether newlines are removed at the end of each split file. With the value set to false, newlines are not removed.

splittext_property_config_nifi_iot

Figure 2: SplitText Configuration Property Tab Window

3. Move to the Settings tab. Check the failure and original checkbox under Auto terminate relationships. Click the Apply button.

3.3 UpdateAttribute

1. Add the UpdateAttribute processor below SplitText. Connect SplitText to UpdateAttribute processor. When the Create Connection window appears, verify splits checkbox is checked, if not check it. Click Add.

2. Open SplitText Properties Tab, add a new dynamic property for NiFi expression, select the New property button. Insert the following property name and value into your properties tab as shown in Table 3 below:

Table 3: Update UpdateAttribute Property Value

Property Value
filename {UUID()}

filename uses NiFi Expression language to assign each FlowFile a unique name

3. Click Apply.

Step 4: Build Filter Logs & Enrich TruckEvents DataFlow Section

4.1 RouteOnContent

1. Add the RouteOnContent processor onto the right of the ExecuteProcess. Connect the UpdateAttribute processor to RouteOnContent. In the Create Connection window, check the success checkbox for the relationship.

2. Open RouteOnContent Properties Tab, add the properties listed in Table 4 to the processor’s appropriate properties and if their original properties already have values, update them. For the second property and onward, add a new dynamic property for NiFi expression, select the New property button. Insert the following property name and value, refer to Table 4.

Table 4: Update RouteOnContent Property Values

Property Value
Match Requirement content must contain match
search_for_truck_event_data (Normal)|(Overspeed)|(Lane Departure)|(Unsafe tail distance)|(Unsafe following distance)

Match Requirements specifies condition for FlowFile to be transferred to next processor
search_for_truck_event_data is Regex Expression that searches each FlowFile for the truck event keywords

routeOnContent_filter_nifi_iot

Figure 3: RouteOnContent Configuration Property Tab Window

3. Click OK. Move to the Settings tab, under Auto terminate relationships, check the unmatched checkbox. Click Apply.

4.2 MergeContent(truck_events)

1. Add the MergeContent processor below RouteOnContent. Connect the RouteOnContent processor to this MergeContent processor. In the Create Connection window, check the search_for_truck_event_data checkbox for the relationship. All the FlowFiles sent to this processor are truck events.

2. Open MergeContent Properties Tab. Add the properties listed in Table 5 and if their original properties already have values, update them.

Table 5: Update MergeContent(truck_events) Property Values

Property Value
Minimum Number of Entries 50
Maximum Number of Entries 70

Minimum Number of Entries specifies minimum amount of FlowFiles to gather at the queue before FlowFiles merge together
Maximum Number of Entries specifies maximum amount of FlowFiles to gather at the queue before FlowFiles merge together

mergeContent_property_configs

Figure 4: MergeContent(truck_events) Configuration Property Tab Window

3. Navigate to the Settings tab, rename the processor: MergeContent(truck_events). Under Auto terminate relationships, check the failure and original checkboxes. Click Apply.

4.3 MergeContent(logs)

1. Right click the MergeContent processor created in the previous step and copy it. Move the mouse slightly to the right of MergeContent(truck_events) processor and paste it. Connect the RouteOnContent processor to this new MergeContent processor. In the Create Connection window, check the unmatched checkbox for the relationship. All the FlowFiles sent to this processor are logs and data we don’t want kafka to receive.

2. Open configure Settings tab, and rename the processor MergeContent(logs). Click Apply.

4.4 PutFile(truck_events)

1. Add the PutFile processor below MergeContent slightly to the left. Connect the MergeContent(truck_events) to this new PutFile processor. In the Create Connection window, check the merged checkbox for the relationship. All the FlowFiles sent to this processor are truck event data we do want kafka to receive.

2. Open PutFile Properties Tab. Add the properties listed in Table 6 and if their original properties already have values, update them.

Table 6: Update PutFile(truck_events) Property Values

Property Value
Directory /root/nifi_output/truck_events

Directory instructs processor which directory to store the output data files

putfile_properties_truck_events_config_nifi_iot

Figure 5: PutFile(truck_events) Configuration Property Tab Window

3. Open configure Settings tab, and rename the processor PutFile(truck_events). Then check the failure and success checkbox below the Auto terminated relationships. Click Apply.

4.5 PutFile(logs)

1. Right click the PutFile(truck_events) processor created in the previous step and copy it. Move the mouse slightly above MergeContent(logs) processor and paste it. Connect the MergeContent(logs) to this new PutFile processor. In the Create Connection window, check the merged checkbox for the relationship. All the FlowFiles sent to this processor are logs and data we don’t want kafka to receive.

2. Open PutFile Properties Tab. Add the properties listed in Table 7 and if their original properties already have values, update them.

Table 7: Update PutFile(logs) Property Values

Property Value
Directory /root/nifi_output/log_data

putfile_properties_config_logs_nifi_iot

Figure 6: PutFile(logs) Configuration Property Tab Window

3. Open configure Settings tab, and rename the processor PutFile(log). Click Apply.

We added, configured and connected all processors, your NiFi DataFlow should look similar as below:

dataflow_lab0_complete_nifi_iot

Step 5: Run NiFi DataFlow

1. The processors are valid since the warning symbols disappeared. Notice the processors have a stop symbol stop_symbol_nifi_iot in the upper left corner and are ready to run. To select all processors, hold down the shift-key and drag your mouse across the entire data flow. This step is important if you have different dataflows on the same graph.

dataflow_selected_nifi_iot

2. Now all processors are selected, go to the actions toolbar and click the start button start_button_nifi_iot. Your screen should look like the following:

run_dataflow_nifi_iot

Note: To run the DataFlow again, you will need to copy & paste the ExecuteProcess processor onto the graph, then delete the old one, and connect the new one to the splittext processor. You will need to repeat this process each time you want to run the DataFlow. This step will ensure dataflow flows through each processor. Currently,the ExecuteProcess processor is getting a patch to fix this problem.

3. To quickly see what the processors are doing and the information on their faces, right click on the graph, click the refresh status button refresh_nifi_iot

Note: On each processor face, the In, Read/Write and Out all have data increasing.

5.1 Check Data Stored In Correct Directory

To check that the log and truck event data were written to the correct directory, wait 20 seconds, then open your terminal and navigate to their appropriate directories. Make sure to SSH into your sandbox.

5.2 Verify Logs Stored In log_data Directory

1. Navigate to through directory path: /root/nifi_output/log_data, view the files and open two random files to verify only log data is being sent to this directory.

cd /root/nifi_output/nifi_output/log_data
ls
cat 28863080789498

Once the file is opened, you should obtain similar output as below:

logs_stream_simulator_nifi_output

5.3 Verify Events Stored In truck_events Directory

1. Navigate to truck events directory: /root/nifi_output/truck_events, view the files. Open two random file to verify only event data is being sent to this directory.

cd /root/nifi_output/truck_events
ls
cat 28918091050702

Once the file is opened, you should obtain similar output as below:

truck_events_file_nifi_iot

Summary

Congratulations! You made it to the end of the tutorial and built a NiFi DataFlow that reads in a simulated stream of data, filters truck events from log data, stores the truck event data into a specific directory and writes that data to a file. You also learned how to use Apache Maven to package and compile code in order to run the simulator through the shell or by NiFi. You even explored troubleshooting common issues that occur when creating a flow of data. If you are interested in learning to integrate NiFi with other technologies, such as Kafka continue onto our next lab in the IoT tutorial series.

Further Reading

Tutorial 2: Capture Real Time Event Stream with Apache Kafka

Introduction

Apache Kafka can be used on the Hortonworks Data Platform to capture real-time events. We will begin with showing you how to configure Apache Kafka and Zookeeper. Next we will show you how to capture truck event data from Apache NiFi using Kafka.

Pre-Requisites

  • Tutorial 0: Set Up Simulator, Apache Services and IDE Environment
  • Tutorial 1: Ingest, Route and Land Real Time Events with Apache NiFi
  • Downloaded and Installed latest Hortonworks Sandbox
  • You will need admin privileges for this tutorial, refer to Learning the Ropes of the Hortonworks Sandbox to setup your Ambari admin password
  • Memory must be at least 8GB RAM, preferably 4 processor cores, else errors may occur in third tutorial

Outline

Apache Kafka

Apache Kafka is an open source messaging system designed for:

  • Persistent messaging
  • High throughput
  • Distributed
  • Multi-client support
  • Real time

Kafka Producer-Broker-Consumer

Kafka Producer-Broker-Consumer

Tutorial Overview

  1. Create Kafka topics for Truck events.
  2. Write Kafka Producers for Truck events.

Step 1: Create a Kafka Topic

1.1 SSH into the Sandbox

SSH into the Sandbox to define the Kafka topic. Type the following command:

ssh root@sandbox.hortonworks.com -p 2222

ssh_into_sandbox_shell_kafka_iot

NOTE: You can also SSH using a program like Putty for Windows or the Terminal application on a Mac.

1.2 Create a new Topic

Use the kafka-topics.sh script (which should be in your PATH), create a new topic named truck_events:

[root@sandbox ~]# kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 2 --topic truck_events

If the kafka-topics.sh script is not in your PATH and you get a command not found error, then change directories to where the Kafka scripts are installed:

[root@sandbox ~]# cd /usr/hdp/current/kafka-broker/bin/

You will need to add a dot and a slash (./) to the beginning of the commands:

[root@sandbox bin]# ./kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 2 --topic truck_events

Also note that sometimes ZooKeeper does not listen on localhost, so you may need to use sandbox.hortonworks.com or the Sandbox’s IP address instead.

The output should show your topic was created:

created_kafka_topic_iot

1.3 Verify the topic was created successfully

Check if topic truck_events was created successfully with the following command:

[root@sandbox ~]# ./kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181

You should see truck_events in the list of topics (and probably your only topic):

Step 2: Create NiFi PutKafka Processor

In the previous tutorial, we stored the truck event data into a file. Now we can use the PutKafka processor since the Kafka service is running, we have access to the “Known Broker”, “Topic Name” and “Client Name.” We will send the truck event contents of a FlowFile to Kafka as a message. Similar to the Kafka Producer, NiFi acts as a producer since it creates messages and publishes them to the Kafka broker for further consumption.

1. If not already open, navigate to the NiFi Web Interface at http://sandbox.hortonworks.com:9090/nifi/. For vmware and azure, the port may be different.

2. If your data flow is still running, click on the stop button stop_symbol_nifi_iot in the actions toolbar to stop the flow.

2.1 Add PutKafka Processor

1. Drag the processor icon onto your graph. Add the PutKafka processor.

2. Right click on the Putkafka processor, click on configure.

3. The warning message tells us, we need to add “Known Broker”, “Topic Name” and “Client Name” values to our Properties section. Enter the following information into the Properties section in the Configure Processor window:

# Property = Value
Known Brokers = sandbox.hortonworks.com:6667
Topic Name = truck_events
Message Delimiter = press "Shift+enter"
Client Name = truck_events_client

putkafka_processor_config_nifi_iot

Note: Every property above is required except Message Delimiter, but this property is helpful with splitting apart the contents of the FlowFile.

Known Brokers can be found in Kafka configs under listeners

Topic Name is the name you created earlier for Kafka. Type the following command to see your topic name: ./kafka-topics.sh –list –zookeeper sandbox.hortonworks.com:2181.

Message Delimiter set as “Shift+enter” in the value field makes each line of incoming FlowFile a single message, so kafka does not receive an enormous flowfile as a single message.

Client Name can be named to your liking, it is the name that is used when communicating with kafka.

4. Open the Configure Processor window again, navigate to the Settings tab. Set the Auto termination relationship to success and failure. Click apply.

5. Connect the MergeContent(truck_events) processor to PutKafka. A Create Connection window will appear, set the For relationship to merged.

You should obtain a similar dataflow as the following:

dataflow_final_withkafka_iot

Note: If there is a warning symbol after updating the PutKafka, verify that the property values are correct. Check 3. in case you need to review the values changed.

6. Let’s start our Hortonworks DataFlow to see a real live stream of truck event data be read from NiFi and written to a Kafka cluster. In the actions toolbar, hit the start button.

dataflow_withKafka_running_iot

Dataflow generates data, filtering truck events from the dataflow and sending those events to kafka.

2.2 Verify PutKafka Published Message to Kafka

After a few seconds, stop the NiFi DataFlow using the stop button in the actions toolbar.
To verify that the PutKafka processor successfully published messages to the Kafka cluster, execute the following command to start a consumer to see the produced events:

[root@sandbox ~]# cd /usr/hdp/current/kafka-broker/bin/
[root@sandbox bin]# ./kafka-console-consumer.sh --zookeeper sandbox.hortonworks.com:2181 --topic truck_events --from-beginning

Your terminal should show that messages successfully published to Kafka:

messages_published_toKafka

Summary

This tutorial gave you brief glimpse of how to use Apache Kafka to capture real-time event data as a message and verify that Kafka successfully received those messages. In our next tutorial, you will create HBase and Hive tables to ingest real-time streaming data using Storm.

Further Reading

Tutorial 3: Real Time Data Ingestion in HBase and Hive using Storm

Introduction

The Trucking business is a high-risk business in which truck drivers venture into remote areas, often in  harsh weather conditions and chaotic traffic on a daily basis. Using this solution illustrating Modern Data Architecture with Hortonworks Data Platform, we have developed a centralized management system that can help reduce risk and lower the total cost of operations.

This system can take into consideration adverse weather conditions, the driver’s driving patterns, current traffic conditions and other criteria to alert and inform the management staff and the drivers themselves when risk factors run high.

In previous tutorial, we have explored generating and capturing streaming data with Apache NiFi and Apache Kafka.

In this tutorial, we will build a solution to ingest real time streaming data into HBase using Storm. Storm has a spout that reads truck_events data from Kafka and passes it to bolts, which process and persist the data into Hive & HBase tables.

Pre-Requisites

Outline

HBase

HBase provides near real-time, random read and write access to tables (or to be more accurate ‘maps’) storing billions of rows and millions of columns.

In this case, once we store this rapidly and continuously growing dataset from Internet of Things (IoT), we will be able to perform a swift lookup for analytics regardless of the data size.

Apache Storm

Apache Storm is an Open Source distributed, reliable, fault–tolerant system for real time processing of large volume of data.
It’s used for:

  • Real time analytics
  • Scoring machine learning modeles
  • Continuous statics computations
  • Operational Analytics
  • And, to enforce Extract, Transform, and Load (ETL) paradigms.

A Storm Topology is network of Spouts and Bolts. The Spouts generate streams, which contain sequences of tuples (data) while the Bolts process input streams and produce output streams. Hence, the Storm Topology can talk to databases, run functions, filter, merge or join data. We will be using Storm parse data, perform complex computations on truck events and send data to HBase Tables.

  • Spout: Works on the source of data streams. In the “Truck Events” use case, Spout will read data from Kafka topics.
  • Bolt: Spout passes streams of data to Bolt which processes and persists  it to a data store or sends it downstream to another Bolt. We have a RouteBolt that transforms the tuple and passes that data onto the other for further processing. We have 3 HBase bolts that write to 3 tables.

Learn more about Apache Storm at the Storm Documentation page.

Tutorial Overview

  • Create HBase Tables
  • Deploy Storm Topology
  • Analyze a Storm Spout and several Bolts.
  • Store Persisting data into HBase.
  • Verify Data Stored in HBase.

Step 1: Create tables in HBase

  • Create HBase tables

We will work with 3 Hbase tables in this tutorial.

The first table stores all events generated, the second stores only
dangerous events and third stores the number of incidents per driverId.

su hbase

hbase shell

create 'driver_events', 'allevents'    
create 'driver_dangerous_events', 'events'
create 'driver_dangerous_events_count', 'counters'
list    
exit

Now let’s exit from hbase user, type exit.

  • driver_events can be thought of as All Events table

  • driver_dangerous_events can be thought of as Dangerous Events Table

  • driver_dangerous_events_count can be thought of as
    Incidents Per Driver Table

Note: ‘driver_events’ is the table name and ‘allevents’ is column family.
In the script above, we have one column family. Yet, if we want we can have
multiple column families. We just need to include more arguments.

Screen Shot 2015-06-04 at 7.03.00 PM.png

Step 2: Launch Storm Topology

Recall that the source code is under directory path
iot-truck-streaming/storm-streaming/src/.

The pre-compiled jars are under the directory path
iot-truck-streaming/storm-streaming/target/.

Note: Back in Tutorial 0 in which we set up the Trucking Demo, we used maven
to create the target folder.

2.1 Verify Kafka is Running & Deploy Topology

1. Verify that Kafka service is running using Ambari dashboard. If not, start the Kafka service as we did in tutorial 3.

2. Deploy Storm Topology

We now have ‘supervisor’ daemon and Kafka processes running.
To do real-time computation on Storm, you create what are called “topologies”. A topology is a Directed Acyclic Graph (DAG) of spouts and bolts with streams of tuples representing the edges. Each node in a topology contains processing logic, and links between nodes indicate how data should be passed around between nodes.

Running a topology is straightforward. First, you package all your code and dependencies into a single jar. In tutorial 0 when we set up our IDE environment, we ran mvn clean package after we were satisfied with the state of our code for the topology, which packaged our storm project into a storm…SNAPSHOT.jar. The command below will deploy a new Storm Topology for Truck Events.

[root@sandbox ~]# cd ~/iot-truck-streaming
[root@sandbox iot-truck-streaming]# storm jar storm-streaming/target/storm-streaming-1.0-SNAPSHOT.jar com.hortonworks.streaming.impl.topologies.TruckEventKafkaExperimTopology /etc/storm_demo/config.properties

You should see that the topology deployed successfully:

Screen Shot 2015-06-04 at 7.55.23 PM.png

This runs the class TruckEventKafkaExperimTopology. The main function of the class defines the topology and submits it to Nimbus. The storm jar part takes care of connecting to Nimbus and uploading the jar.

Open your Ambari Dashboard. Click the Storm View located in the Ambari User Views list.

storm_view_iot

You should see the new Topology truck-event-processor.

storm_view_topology_listing

Run the NiFi DataFlow to generate events.
Return to the Storm View and click on truck-event-processor topology in the list of topologies to drill into it.

As you scroll down the page, let’s analyze a Visualization of our truck-event-processor topology:

storm_topology_new_stormAPI_iot

2.2 Analysis of Topology Visualization:

  • RouteBolt processes the data received by KafkaSpout

  • CountBolt takes the data from RoutBolt and counts the incidents per driver
  • 1 HBaseBolt performs complex transformations on the data received by CountBolt
    to write to Incidents Per Driver Table

  • 2 HBase Bolts perform complex transformations on the data from RouteBolt to
    write to All Events and Dangerous Event Tables.

2.3 Overview of the Storm View

After 6-10 minutes, you should see that numbers of emitted and transferred tuples for each node(Spout or Bolt) in the topology is increasing, which shows that the messages are processed in real time by Spout and Bolts. If we hover over one of the spouts or bolts, we can see how much data they process and their latency.

Here is an example of the data that goes through the kafkaSpout and RouteBolt in the topology:

analysis_of_dive_into_storm_view

Overview of truck-event-processor in Storm View

  • Topology Summary
  • Topology Stats
  • truck-event-processor Visualization
  • Spout
  • Bolts
  • Topology Configuration

2.4 Overview of Spout Statistics:

To see statistics of a particular component or node in the storm topology, click on that component located in the Spouts or Bolts section. For instance, let’s dive into the KafkaSpout’s statistics.

Overview of Spout Statistics

  • Component Summary
  • Spout Stats
  • Output Stats ( All time )
  • Executor Stats ( All time )
  • Error Stats ( All time )

spout_statistics_iot

2.5 Overview of Bolt Statistics:

Follow similar process in section 2.4 to see the statistics of a particular bolt in your topology. Let’s dive into the RouteBolt statistics.

Overview of Bolt Statistics

  • Component Summary
  • Bolt Stats
  • Input Stats ( All time )
  • Output Stats ( All time )
  • Executor Stats ( All time )
  • Error Stats ( All time )

bolt_statistics_iot

What differences do you notice about the spout statistics compared to the bolt statistics?

Step 3: Verify Data in HBase

Let’s verify that Storm’s 3 HBase bolts successfully sent data to the 3 HBase Tables.

  • If you haven’t done so, you can you can stop the NiFi DataFlow. Press the stop symbol.
  • Verify that the data is in HBase by executing the following commands in HBase shell:
hbase shell

list
count 'driver_events'
count 'driver_dangerous_events'
count 'driver_dangerous_events_count'    
exit

The driver_dangerous_events table is updated upon every violation event.

verify_data_in_hbase_iot

3.1 Troubleshoot Unexpected Data in HBase Table

If the data in the HBase table is displayed in hexadecimal values, you can perform the following special operation on a table to display the data in the correct format. For instance, if your driver_dangerous_events had unexpected data, run the following hbase query:

scan 'driver_dangerous_events_count' , {COLUMNS => ['counters:driverId:toInt', 'counters:incidentTotalCount:toLong']}

Your data should look as follows:

hbase_dangerous_correct_format

  • Once done, stop the Storm topology

Open the terminal of your sandbox:, then we can deactivate/kill the Storm topology from the Storm View or shell.

storm kill TruckEventKafkaExperimTopology

storm_topology_actions_iot

Conclusion

Congratulations, you built your first Hortonworks DataFlow Application.
When NiFi, Kafka and Storm are combined, they create the Hortonworks DataFlow.
You have used the power of NiFi to ingest, route and land real-time streaming
data. You learned to capture that data with Kafka and perform instant processing
with Storm. A common challenge with many use cases, which is also observed in
this tutorial series is ingesting a live stream of random data, and filtering
the junk data from the actual data we care about. Through these tutorials, you
learned to manipulate, persist and perform many other operations on random data.
We have a working application that shows us a visualization of driver behavior,
normal and dangerous events per city. Can you brainstorm ways to further enhance
this application?

Further Reading