cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button

Analyze Traffic Patterns with Apache NiFi

Introduction

Apache NiFi is the first integrated platform that solves the real-time challenges of collecting and transporting data from a multitude of sources and provides interactive command and control of live flows with full and automated data provenance. NiFi provides the data acquisition, simple event processing, transport and delivery mechanism designed to accommodate the diverse dataflows generated by a world of connected people, systems, and things.

For the purposes of this tutorial, assume that a city planning board is evaluating the need for a new highway. This decision is dependent on current traffic patterns, particularly as other roadwork initiatives are under way. Integrating live data poses a problem because traffic analysis has traditionally been done using historical, aggregated traffic counts. To improve traffic analysis, the city planner wants to leverage real-time data to get a deeper understanding of traffic patterns. NiFi was selected for this real-time data integration.

Goals and Objectives

The goal of this tutorial is to provide you with an opportunity to interact with Apache NiFi features while building a dataflow. You do not need programming experience or flow-based programming syntax and feature knowledge to successfully complete this tutorial.

The learning objectives of this tutorial are to:

  • Understand Apache NiFi fundamentals
  • Introduce NiFi’s HTML user interface
  • Introduce NiFi processor configuration, relationships, data provenance, and documentation
  • Create dataflows
  • Incorporate APIs into a NiFi dataflow
  • Learn about NiFi templates
  • Create Process Groups

Prerequisites

  • Downloaded and installed HDF Sandbox for VMware, VirtualBox or Native Docker
  • For Windows 10 users, use Ubuntu bash shell

Outline

In this tutorial, we work with San Francisco MUNI Transit agency data, gathered from NextBus XML Live Feed, handling vehicle locations, speeds, and other variables.

The tutorial consists of four sections:

  1. Tutorial 0 – Launch your NiFi HTML User Interface (UI). Get NiFi up and running on Hortonworks DataFlow Sandbox.
  2. Tutorial 1 – Open NiFi UI and explore its features. Create a dataflow by adding and configuring eleven processors. Ingest data from a transit location XML simulator, extract transit location detail attributes from flowfiles, and route attributes to a converted JSON file. Run the dataflow and verify the results in a terminal.
  3. Tutorial 2 – Add geographic location enrichment to the dataflow; incorporate Google Places Nearby API into the dataflow to retrieve places near the vehicle’s location.
  4. Tutorial 3 – Ingest NextBus’s live stream data for San Francisco MUNI agency.

Each tutorial provides step by step instructions, so that you can complete the learning objectives and tasks associate with it. You are also provided with a dataflow template for each tutorial that you can use for verification. Each tutorial builds on the previous.


NiFi DataFlow Automation Concepts

Introduction

The concepts section is tailored toward enriching your hands-on experience in the tutorials. By the end of this section, you will be able to define NiFi, know how to create dataflows for specific use cases, acquire knowledge on how to build a NiFi DataFlow and become familiar with the core concepts of NiFi. The goal of this section is to help NiFi practitioners know how to use the NiFi documentation for their advantage.

Outline

What is Apache NiFi?

Apache NiFi is an open source tool for automating and managing the flow of data between systems (Databases, Sensors, Data Lakes, Data Platforms). In the tutorial, we will use NiFi to process the flow of data between sensors, web services (NextBus and Google Places API), various locations and our local file system.

Who Uses NiFi, and for what?

NiFi is used for data ingestion to pull data into NiFi, from numerous different data sources and create FlowFiles. For the tutorial, GetFile, GetHTTP, InvokeHTTP are processors you will use to stream data into NiFi from the local file system and ingest data from the internet. Once the data is ingested, as the DataFlow Manager (DFM), the user, you will create 2 process groups or sections of the dataflow that handle a particular purpose in data preprocessing. A process group is a complex processor composed of multiple processors. You will create a Data Management process group to monitor and obtain feedback about the current status of the NiFi DataFlow. You will use bulletins to troubleshoot problems in the dataflow. Bulletins are located on the processor and the management toolbar. They provide a tool-tip of the time, severity and message of the alert. While the data is managed, you will create Data Enrichment process group to enhance, refine and improve the quality of data to make it meaningful and valuable for users. NiFi enables users to filter out unnecessary information from data to make easier to understand. You will use NiFi to geographically enrich real-time data to show neighborhoods nearby locations as the locations change.

Understand NiFi DataFlow Build Process

Explore NiFi HTML User Interface

When NiFi is accessed at sandbox.hortonworks.com:19090/nifi by users who run NiFi from HDF sandbox or sandbox.hortonworks.com:8080/nifi by users who run NiFi from their local machine, NiFi’s User Interface (UI) appears on the screen. The UI is where dataflows will be developed. It includes a canvas and mechanisms to build, visualize, monitor, edit, and administer our dataflows in the tutorials. The components toolbar contains all tools for building the dataflow. The actions toolbar consists of buttons that manipulate the components on the canvas. The management toolbar has buttons for the DFM to manage the flow and a NiFi administrator to manage user access & system properties. The search toolbar enables users to search for any component in the dataflow. The image below shows a visualization of where each mechanism is located.

nifi_dataflow_html_interface

Find and Add Processor Overview

Every dataflow requires a set of processors. In the tutorials, you will use the processor icon processor_nifi_iot to add processors to the dataflow. Let’s view the add processor window. There are 3 options to find our desired processor. The processor list contains almost 190 items with descriptions for each processor. The tag cloud reduces the list by category, so if you know what particular use case your desired processor is associated with, select the tag and find the appropriate processor faster. The filter bar searches for the processor based on the keyword entered. The image below illustrates where each option is located on the add processor window.

add_processor_window

Configure Processor Dialog

As we add each processor to our dataflow, we must make sure they are properly configured. DataFlow Managers navigate around the 4 configuration tabs to control the processor’s specific behavior and instruct the processor on how to process the data that is flowing. Let’s explore these tabs briefly. The Settings tab allows users to change the processor’s name, define relationships & includes many different parameters. The Scheduling tab affects how the processor is scheduled to run. The Properties tab affects the processor’s specific behavior. The Comments tab provides a place for DFMs to include useful information about the processor’s use-case. For the tutorial series, you will spend most of time modifying properties.

putfile_logs_nifi_iot

Configure Processor Properties Tab

Let’s further explore the properties tab, so we can be familiar with this tab in advance for the tutorials. If you want to know more about what a particular property does, hover over the help symbol question_mark_symbol_properties_config_iot located next to the property name to find additional details about that property, its value and history. Some processors enable the DFM to add new properties into the property table. For the tutorials, you will add user-defined properties into processors, such as UpdateAttribute. The custom user-defined property you create will assign unique filenames to each FlowFile that transfer through this processor. View the processor properties tab below:

updateAttribute_config_property_tab_window

Connections & relationships

As each processor configuration is completed, we must connect it to another component. A connection is a linkage between processors (or components) that contain at least one relationship. The user selects the relationship and based on the processing outcome that will determine where the data is routed. Processors can have zero or more auto-terminate relationships. If the processing outcome for FlowFile is true for a processor with a relationship tied to itself, the FlowFile will be removed from the flow. For instance, if EvaluateXPath has an unmatched relationship defined to itself and when that outcome is true, then a FlowFile is removed from the flow. Else a FlowFile is routed to the next processor based on matched. View the visual to see the objects that define connections and relationships.

completed-data-flow-lab1-connection_relationship_concepts

Running the NiFi DataFlow

Once we finish connecting and configuring the components in our dataflow, there are at least 3 conditions we should check to ensure our dataflow successfully runs. We must verify that all relationships are established, the components are valid, stopped, enabled and have no active tasks. After you complete the verification process, you can select the processors, click the play symbol start_button_nifi_iot in the actions toolbar to run the dataflow. View the image of a dataflow that is active.

run_dataflow_lab1_nifi_learn_ropes_concepts_section

The Core Concepts of NiFi

When we learned the process of building a dataflow, we crossed paths with many of the core concepts of NiFi. You may be wondering what is the meaning behind a FlowFile, processor, connection, and other terms? Let’s learn briefly about these terms because they will appear throughout the tutorial series. We want you to have the best experience in the tutorial. Table 1 summarizes each term.

Table 1: NiFi Core Concepts

NiFi Term Description
FlowFile Data brought into NiFi that moves through the system. This data holds attributes and can contain content.
Processor Tool that pulls data from external sources, performs actions on attributes and content of FlowFiles and publishes data to external source.
Connection Linkage between processors that contain a queue and relationship(s) that effect where data is routed.
Flow Controller Acts as a Broker to facilitate the exchange of FlowFiles between processors.
Process Group Enables the creation of new components based on the composition of processors, funnels, etc.

NiFi Architecture

Let’s dive deeper into the infrastructure that enables NiFi to perform so well when it comes to building DataFlows that address various use cases.

nifi_architecture_diagram

NiFi executes within the Java Virtual Machine (JVM) located in the Host Operating System (OS/Host).

Inside the JVM,

  • The Web Server enables us to access NiFi’s User Interface from the web browser.
  • The Flow Controller acts as the brains of the operation, it gives threads to extensions (custom processors), and keeps track of all the operations being executed by extensions
  • The FlowFile Repository is the area NiFi keeps track of all the status updates regarding FlowFiles as they travel throughout the DataFlow
  • The Content Repository is the location the content bytes of FlowFiles reside.
  • The Provenance Repository consists of all the provenance event data.

A Brief History of NiFi

Apache NiFi originated from the NSA Technology Transfer Program in Autumn of 2014. NiFi became an official Apache Project in July of 2015. NiFi has been in development for 8 years. NiFi was built with the idea to make it easier for people to automate and manage data-in-motion without having to write numerous lines of code. Therefore, the user interface comes with pallet of data flow components that can be dropped onto the graph and connected together. NiFi was also created to solve many challenges of data-in-motion, such as multi-way dataflows, data ingestion from any data source, data distribution with the required security and governance. NiFi can be used by a wide variety of users who come from a variety of backgrounds(development, business) and want to tackle the challenges stated above.

Further Reading

The topics covered in the concepts section were brief and tailored toward the tutorial series.


Tutorial 0: Launch NiFi HTML UI

Introduction

With the HDF Sandbox, Apache NiFi comes preinstalled in the Ambari Stack and preconfigured out of the box to utilize many of its features. In the tutorial, it shows you how to access the NiFi HTML UI in one of two ways: use autoscript to access the NiFi UI or manually access it via Ambari.

Prerequisites

  • Completed Analyze Traffic Patterns with Apache NiFi Introduction
  • Read NiFi DataFlow Automation Concepts
  • Downloaded and installed HDF Sandbox for VMware, VirtualBox or Native Docker

Outline

Refer to Approach 1 to quickly launch NiFi HTML UI via script,
else go to Approach 2 to launch NiFi HTML UI from Ambari,
but first complete Step 1.

Step 1: Download HDF NiFi Shell Script

Download the auto-scripts and change permissions for the scripts:

Download auto_scripts.zip

cd ~/Downloads
unzip auto_scripts.zip
chmod -R 755 auto_scripts

Note: You will utilize these scripts throughout either section.

Approach 1: Launch NiFi HTML UI via Shell Script

1. Execute the script:

./auto_scripts/launch_nifi_ui.sh

NiFi HTML UI will open in your chrome browser as below:

open_nifi_html_interface.png

Approach 2: Access NiFi HTML UI via Ambari

1. Start HDF Sandbox via Script:

./auto_scripts/launch_ambari_ui.sh

2. Login to Ambari UI with credentials (admin/admin).

The Ambari Login UI will look as below:

login_ambari_ui.png

3. Verify the NiFi Service is running, it should have a green check mark:

verify_nifi_running.png

4. Select the NiFi Service, click on Quick Links dropdown and press the NiFi UI:

open-nifi-ui-via-ambari.png

NiFi HTML UI will open as below:

open_nifi_html_interface.png

Summary

Congratulations! As a review, HDF Sandbox comes preinstalled and preconfigured with NiFi. Therefore, you launched NiFi HTML UI through Ambari or via Shell Script within a few minutes. Now that you have NiFi running, let’s head to the next tutorial to began building our simple dataflow.


Tutorial 1: Build A Simple NiFi DataFlow

Introduction

In this tutorial, we will build a NiFi DataFlow to fetch vehicle location, speed, and other sensor data from a San Francisco Muni traffic simulator, look for observations of a specific few vehicles, and store the selected observations into a new file. Even though this aspect of the tutorial is not streaming data, you will see the importance of file I/O in NiFi dataflow application development and how it can be used to simulate streaming data.

In this tutorial, you will build the following dataflow:

completed-data-flow-lab1

Figure 1: The completed dataflow contains three sections: ingest data from vehicle location XML Simulator, extract vehicle location detail attributes from FlowFiles and route these detail attributes to a JSON file as long as they are not empty strings. You will learn more in depth about each processors particular responsibility in each section of the dataflow.

Prerequisites

  • Completed Analyze Traffic Patterns with Apache NiFi Introduction
  • Completed Tutorial 0: Launch NiFi HTML UI

Outline

If you want to see the NiFi flow in action within minutes, refer to Approach 1. Else if you prefer to build the dataflow manually step-by-step, continue on to Approach 2.

Approach 1: Import Simple NiFi Flow

1. Download the Tutorial-1-simple-nifi-flow.xml template file.

2. Click on the template icon nifi_template_icon located in the actions toolbar (also known as Operate Panel).

3. Click Browse, find the template file, click Open and hit Import.

4. Hover over to the top left of the NiFi HTML interface, drag the template icon nifi_template_icon onto the graph and select the tutorial-1-simple-nifi-flow.xml template file.

5. Run the following shell script, so NiFi has Vehicle Location input source data:

./auto_scripts/init_nifi_in_src_data.sh

6. Hit the start button start_button_nifi_iot. to activate the dataflow.

run_dataflow_lab1_nifi_learn_ropes

Note: We highly recommend you read through the tutorial, so you become familiar with the process of building a dataflow.

Approach 2: Manually Build Simple NiFi Flow

Step 1: Create a NiFi DataFlow

The building blocks of every dataflow consist of processors. These tools perform actions on data to ingest, route, extract, split, aggregate or store it. Our dataflow will contain these processors, each processor includes a high level description of their role in the tutorial:

  • GetFile reads vehicle location data from traffic stream zip file
  • UnpackContent decompresses the zip file
  • ControlRate controls the rate at which FlowFiles move to the flow
  • EvaluateXPath(x2) extracts nodes (elements, attributes, etc.) from the XML file
  • SplitXml splits the XML file into separate FlowFiles, each comprised of children of the parent element
  • UpdateAttribute assigns each FlowFile a unique name
  • RouteOnAttribute makes the filtering decisions on the vehicle location data
  • AttributesToJSON represents the filtered attributes in JSON format
  • MergeContent merges the FlowFiles into one FlowFile by concatenating their JSON content together
  • PutFile writes filtered vehicle location data content to a directory on the local file system

Refer to NiFi’s Documentation to learn more about each processor described above.

1.1 Learning Objectives: Overview of DataFlow Build Process

  • Build a NiFi DataFlow to ingest, filter, convert and store moving data
  • Establish connections and relationships between processors
  • Troubleshoot problems that may occur
  • Run the the NiFi DataFlow

Your dataflow will extract the following XML Attributes from the transit data listed in Table 1. We will learn how to do this extraction with the EvaluateXPath processor when we build our dataflow.

Table 1: Extracted XML Attributes From Transit Data

Attribute Name Type Comment
id string Vehicle ID
time int64 Observation timestamp
lat float64 Latitude (degrees)
lon float64 Longitude (degrees)
speedKmHr float64 Vehicle speed (km/h)
dirTag float64 Direction of travel

After extracting, filtering and converting the data, your new file, which contains transit location data will be stored in the Output Directory listed in Table 2. We will learn how to satisfy the conditions in Table 2 with RouteOnAttribute, AttributesToJSON and PutFile processors.

Table 2: Other DataFlow Requirements

Parameter Value
Input Directory /tmp/nifi/input
Output Directory /tmp/nifi/output/filtered_transitLoc_data
File Format JSON
Filter For id, time, lat, lon, speedKmHr, dirTag

Let’s build our dataflow to fetch, filter, convert and store transit sensor data from San Francisco Muni, M-Ocean View route. Here is a visualization, courtesy of NextBus and Google, of the data NiFi generates using our Traffic XML Simulator:

sf_ocean_view_route_nifi_streaming

1.2 Add processors

1. Go to the components toolbar, drag and drop the processor icon processor_nifi_iot onto the graph.

An Add Processor window will appear with 3 ways to find our desired processor: processor list, tag cloud, or filter bar

  • processor list: contains almost 180 processors
  • tag cloud: reduces list by category
  • filter bar: search for desired processor

2. Select the GetFile processor and a short description of the processor’s function will appear.

Creates FlowFiles from files in a directory. NiFI will ignore files it doesn’t have at least read permissions for.

Click the Add button to add the processor to the graph.

add_processor_getfile_nifi-learn-ropes

3. Add the UnpackContent, ControlRate, EvaluateXPath, SplitXML, UpdateAttribute, EvaluateXPath, RouteOnAttribute, AttributesToJSON, MergeContent and PutFile processors using the processor icon.

Overview of Each Processor’s Role in our DataFlow:

  • GetFile fetches the vehicle location simulator data for files in a directory.
  • UnpackContent decompresses the contents of FlowFiles from the traffic simulator zip file.
  • ControlRate controls the rate at which FlowFiles are transferred to follow-on processors enabling traffic simulation.
  • EvaluateXPath extracts the timestamp of the last update for vehicle location data returned from each FlowFile.
  • SplitXML splits the parent’s child elements into separate FlowFiles. Since vehicle is a child element in our xml file, each new vehicle element is stored separately.
  • EvaluateXPath extracts attributes: vehicle id, direction latitude, longitude and speed from vehicle element in each FlowFile.
  • RouteOnAttribute transfers FlowFiles to follow-on processors only if vehicle ID, speed, latitude, longitude, timestamp and direction match the filter conditions.
  • AttributesToJSON generates a JSON representation of the attributes extracted from the FlowFiles and converts XML to JSON format this less attributes.
  • MergeContent merges a group of JSON FlowFiles together based on a number of FlowFiles and packages them into a single FlowFile.
  • UpdateAttribute updates the attribute name for each FlowFile.
  • PutFile writes the contents of the FlowFile to a desired directory on the local filesystem.

Follow the step above to add these processors. You should obtain the image below:

added_processors_nifi_part1

Note: To find more information on the processor, right click ExecuteProcess and click usage. An in app window will appear with that processor’s documentation. Also if you want to create color coded labels that act as a background for a processor group, refer to Appendix B at the bottom of the page.

1.3 Troubleshoot Common Processor Issues

Notice the eleven processors in the image above have warning symbols warning_symbol_nifi_iot in the upper left corner of the processor face. These warning symbols indicate the processors are invalid.

1. To troubleshoot, hover over one of the processors, for instance the GetFile processor, and a warning symbol will appear. This message informs us of the requirements needed, so we can run this processor.

error_getFile_processor_nifi_lab1

The warning message indicates: we need to specify a directory path to tell the processor where to pull data and a connection for the processor to establish a relationship.
Each Processor will have its own alert message. Let’s configure and connect each processor to remove all the warning messages, so we can have a live data flow.

1.4 Configure & Connect processors

Now that we added some processors, we will configure our processors in the Configure Processor window, which contains 4 tabs: Settings, Scheduling, Properties and Comments. We will spend most of our time in the properties tab since it is the main place to configure specific information that the processor needs to run properly. The properties that are in bold are required for the processor to be valid. If you want more information on a particular property, hover over the help icon question_mark_symbol_properties_config_iot.png located next to the Property Name with the mouse to read a description of the property. While we configure each processor, we will also connect each processor together and establish a relationship for them to make the dataflow complete.

If you would like to read more about configuring and connecting processors, refer to Hortonworks Apache NiFi User Guide, Building a DataFlow: section 6.2 and 6.5.

Step 2: Build XML Simulator DataFlow Section

2.1 GetFile

1. Right click on the GetFile processor and click configure from dropdown menu

configure_processor_nifi_iot

2. Click on the Properties tab. Add the properties listed in Table 3 to the processor’s appropriate properties and if their original properties already have values, update them. Click the OK button after changing a property.

Table 3: Update GetFile Property Values

Property Value
Input Directory /tmp/nifi/input
Keep Source File true
  • Input Directory location at which data is ingested into the dataflow

  • Keep Source File source files in directory remain after all data is ingested

getFile_config_property_tab_window

Figure 3: GetFile Configuration Property Tab Window

3. Now that each property is updated. Navigate to the Scheduling tab and change the Run Schedule from 0 sec to 1 sec, so that the processor executes a task every 1 second. Therefore, overuse of system resources is prevented.

4. Now that each required item is updated, click Apply. Connect GetFile to UnpackContent processor by dragging the arrow icon from the first processor to the next component. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

2.2 UnpackContent

1. Open the processor configuration properties tab. Add the properties listed in Table 4 and if their original properties already have values, update them.

Table 4: Update UnpackContent Property Value

Property Value
Packaging Format zip

Packaging Format tells the processor of packaging format used to create the file

unpackContent_config_property_tab_window

Figure 4: UnpackContent Configuration Property Tab Window

2. Open the processor config Settings tab, under Auto terminate relationships, check the failure and original checkboxes. Click Apply.

3. Connect UnpackContent to ControlRate processor. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

2.3 ControlRate

1. Open the processor configuration properties tab. Add the properties listed in Table 5 and if their original properties already have values, update them.

Table 5: Update ControlRate Property Values

Property Value
Rate Control Criteria flowfile count
Maximum Rate 1
Time Duration 6 second
  • Rate Control Criteria instructs the processor to count the number of FlowFile before a transfer takes place

  • Maximum Rate instructs the processor to transfer 1 FlowFile at a time

  • Time Duration makes it so only 1 flowfile will transfer through this processor every 6 seconds.

controlRate_config_property_tab_window

Figure 5: ControlRate Configuration Property Tab Window

2. Open the processor config Settings tab, under Auto terminate relationships, check the failure checkbox. Click Apply.

3. Connect ControlRate to EvaluateXPath processor. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

Step 3: Build Key Attribute Extraction DataFlow Section

3.1 EvaluateXPath

1. Open the processor configuration properties tab. Add the properties listed in Table 6 and if the original properties already have values, update them. For the second property in Table 6, add a new dynamic property for XPath expression, select the New property button. Insert the following property name and value into your properties tab as shown in the table below:

Table 6: Update EvaluateXPath Property Values

Property Value
Destination flowfile-attribute
Last_Time //body/lastTime/@time
  • Destination result from XPath evaluation stored into FlowFile attribute

  • Last_Time is a FlowFile Attribute and XPath expression that retrieves value of time node in the XML file

evaluateXPath_config_property_tab_window

Figure 6: EvaluateXPath Configuration Property Tab Window

3. Open the processor config Settings tab, under Auto terminate relationships, check the failure and unmatched checkboxes. Click Apply.

4. Connect EvaluateXPath to SplitXML processor. When the Create Connection window appears, verify matched checkbox is checked, if not check it. Click Add.

3.2 SplitXML

1. Keep SplitXML configuration properties as default.

2. Open the processor config Settings tab, under Auto terminate relationships, check the failure and original checkboxes. Click Apply.

3. Connect SplitXML to EvaluateXPath processor. When the Create Connection window appears, verify split checkbox is checked, if not check it. Click Add.

3.3 EvaluateXPath

1. Open the processor configuration properties tab. Add the properties listed in Table 7 and if their original properties already have values, update them. For the remaining properties in Table 7, add new dynamic properties for XPath expressions, click on the New property button. Insert the following property name and value into your properties tab as shown in the table below:

Table 7: Update EvaluateXPath Property Values

Property Value
Destination flowfile-attribute
Direction_of_Travel //vehicle/@dirTag
Latitude //vehicle/@lat
Longitude //vehicle/@lon
Vehicle_ID //vehicle/@id
Vehicle_Speed //vehicle/@speedKmHr
  • Destination set to FlowFile attribute because the result of values from XPath expressions need to be stored in FlowFile attributes.

  • 5 user-defined attributes each hold a value that is used in NiFi Expression language filtering condition in the next processor.

evaluateXPath_extract_splitFlowFiles_config_property_tab

Figure 8: EvaluateXPath Configuration Property Tab Window

2. Open the processor config Settings tab, under Auto terminate relationships, check the failure and unmatched checkboxes. Click Apply.

3. Connect EvaluateXPath to RouteOnAttribute processor. When the Create Connection window appears, verify matched checkbox is checked, if not check it. Click Add.

Step 4: Build Filter and JSON Conversion DataFlow Section

4.1 RouteOnAttribute

1. Open the processor configuration properties tab. Add a new dynamic property for NiFi expression, select the New property button. Insert the following property name and value into your properties tab as shown in the table below:

Table 8: Add RouteOnAttribute Property Value

Property Value
Filter_Attributes ${Direction_of_Travel:isEmpty():not():and(${Last_Time:isEmpty():not()}):and(${Latitude:isEmpty():not()}):and(${Longitude:isEmpty():not()}):and(${Vehicle_ID:isEmpty():not()}):and(${Vehicle_Speed:equals('0'):not()})}

Filter_Attributes uses the FlowFile Attribute values obtained from XPath Expressions to filter out any FlowFiles that either have at least one empty Attribute value or the speed attribute value equals 0. Else the FlowFiles are passed to the remaining processors.

routeOnAttribute_config_property_tab_window

Figure 9: RouteOnAttribute Configuration Property Tab Window

2. Open the processor config Settings tab, under Auto terminate relationships, check the unmatched checkbox. Click Apply.

3. Connect RouteOnAttribute to AttributesToJSON processor. When the Create Connection window appears, verify Filter_Attributes checkbox is checked, if not check it. Click Add.

4.2 AttributesToJSON

1. Open the processor configuration properties tab. Add the properties listed in Table 9 and if their original properties already have values, update them.

Table 9: Update AttributesToJSON Property Values

Property Value
Attributes List Vehicle_ID, Direction_of_Travel, Latitude, Longitude, Vehicle_Speed, Last_Time
Destination flowfile-content
  • Attributes List takes FlowFile attribute parameters and presents them in JSON format
  • Destination stores the output as content in the FlowFile

attributesToJSON_config_property_tab_window

Figure 10: AttributesToJSON Configuration Property Tab Window

3. Open the processor config Settings tab, under Auto terminate relationships, check the failure checkbox. Click Apply.

4. Connect AttributesToJSON to MergeContent processor. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

4.3 MergeContent

1. Open the processor configuration properties tab. Add the properties listed in Table 10 and if their original properties already have values, update them.

Table 10: Update MergeContent Property Values

Property Value
Minimum Number of Entries 10
Maximum Number of Entries 15
Delimiter Strategy Text
Header [
Footer ]
Demarcator , {now-press-shift-enter}
  • Minimum Number of Entries takes in at least specified amount of FlowFiles, then merges them into 1 FlowFiles
  • Maximum Number of Entries takes in no more than specified amount of FlowFiles, then merges them
  • Delimiter Strategy specifies that Header, Footer and Demarcator set the formatting conditions for text in the file
  • Header inserts specified value at the beginning of the file
  • Footer inserts specified value at the end of the file
  • Demarcator inserts specified value(s) at the end of every line in the file

3. Open the processor config Settings tab, under Auto terminate relationships, check the failure and original checkboxes. Click Apply.

4. Connect MergeContent to UpdateAttribute processor. When the Create Connection window appears, verify merged checkbox is checked, if not check it. Click Add.

4.4 UpdateAttribute

1. Add a new dynamic property for NiFi expression, click on the New property button. Insert the following property name and value into your properties tab as shown in the table below:

Table 11: Add UpdateAttribute Property Value

Property Value
filename ${UUID()}
  • filename updates each FlowFile with a unique identifier

updateAttribute_config_property_tab_window

Figure 7: UpdateAttribute Configuration Property Tab Window

2. Connect UpdateAttribute to PutFile processor. When the Create Connection window appears, verify success checkbox is checked, if not check it. Click Add.

4.5 PutFile

1. Open the processor configuration properties tab. Add the property listed in Table 11 and if their original property already has a value, update it.

Table 12: Update PutFile Property Value

Property Value
Directory /tmp/nifi/output/filtered_transitLoc_data

putFile_config_property_tab_window

Figure 12: PutFile Configuration Property Tab Window

3. Open the processor config Settings tab, under Auto terminate relationships, check the failure and success checkboxes. Click Apply.

Step 5: Run the NiFi DataFlow

1. The processors are valid since the warning symbols disappeared. Notice that the processors have a red stop symbol stop_symbol_nifi_iot in the upper left corner and are ready to run. To select all processors, hold down the shift-key and drag your mouse across the entire data flow.

2. Now that all processors are selected, go to the actions toolbar in the left window labeled Operate and click the start button start_button_nifi_iot. Your screen should look like the following:

run_dataflow_lab1_nifi_learn_ropes

3. To quickly see what the processors are doing and the information on their faces, right click on the graph, click the refresh status button refresh_nifi_iot

There are two options for checking that the data stored in the destination is correct. Option 1 is to navigate by terminal to the folder where NiFi stores the data. Option 2 is to use Data Provenance to verify the data is correct.

Check Data By Terminal (Option 1)

4. To check that the data was written to a file, open your terminal or use NiFi’s Data Provenance. Make sure to SSH into your sandbox. Navigate to the directory you wrote for the PutFile processor. List the files and open one of the newly created files to view filtered transit output. In the tutorial our directory path is: /tmp/nifi/output/filtered_transitLoc_data.

cd /tmp/nifi/output/filtered_transitLoc_data
ls
vi 19bad9c1-ae98-439f-8a8b-543fb3ab0ab0

commands_enter_sandbox_shell_lab1

filtered_vehicle_locations_data_nifi_learn_ropes

Note: to exit the vi editor, press esc and then type :q.

Check Data By NiFi’s Data Provenance (Option 2)

Data Provenance is a unique feature in NiFi that enables the user to check the data from any processor or component while the FlowFiles move throughout the dataflow. Data Provenance is a great tool for troubleshooting issues that may occur in the dataflow. In this section, we will check the Data Provenance for the PutFile processor.

1. Right click on the PutFile processor. Select Data Provenance. It is the 4th item in the dropdown menu.

2. NiFi will search for provenance events. The window will load with events, select any event. An event is a FlowFile that passes through a processor and the data that is viewable at that particular time. For the tutorial, let’s select the first event by pressing on the view provenance event symbol i_symbol_nifi_lab1.

Provence Event Window:

provenance_event_window_lab1

3. Once you select the event, a Provenance Event Dialog Window will appear. It contains Details, Attributes and Content regarding the particular event. Take a few minutes to view each tab. Let’s navigate to the Content tab to view the data generated from the FlowFile. NiFi gives the user the option to download or view the content of the event. Click on the View button.

provenance_content_tab_lab1

4. NiFi gives the user the option view the data in multiple formats. We will view it in original format.

event_content_view_window_lab1

Did you receive the data you expected?

Summary

Congratulations! You made it to the end of the tutorial and built a NiFi DataFlow that reads in a live stream simulation from NextBus.com, splits the parent’s children elements from the XML file into separate FlowFiles, extracts nodes from the XML file, makes a filtering decision on the attributes and stores that newly modified data into a file. You also learned how to use NiFi’s Data Provenance feature to view data from a FlowFile that flows through a processor, a powerful feature that enables users to troubleshoot issues in real-time. Feel free to use this feature in the other tutorials. If you are interested in learning more about NiFi, view the following further reading section.

Further Reading

Appendix A: Review of the NiFi DataFlow

If you want a more in depth review of the dataflow we just built, read the information below, else continue onto the next tutorial.

Vehicle Location XML Simulation Section:
streams the contents of a file from local disk into NiFi, unpacks the zip file and transfers each file as a single FlowFile, controls the rate at which the data flows to the remaining part of the flow.

Extract Attributes From FlowFiles Section:
Splits XML message into many FlowFiles, updates each FlowFile filename attribute with unique name and User Defined XPath Expressions are evaluated against the XML Content to extract the values into user-named Attribute.

Filter Key Attributes to JSON File Section:
Routes FlowFile based on whether it contains all XPath Expressions (attributes) from the evaluation, writes JSON representation of input attributes to FlowFile as content, merges the FlowFiles by concatenating their content together into a single FlowFile and writes the contents of a FlowFile to a directory on the local filesystem.

Appendix B: Create Labels for Processor Groups

NiFi DataFlows expand to become enormous pipelines to process data. In large dataflow, it can be difficult to understand what particular actions sections of the dataflow perform on data. Therefore, Labels can be used to make it easier for users to understand the different phases of the dataflow.

Let’s create a label to signify the action that happens in the first phase of our dataflow.

1. Drag the Label icon label_icon_lab1 located in the components toolbar onto the canvas. To expand the label, hover toward the right corner, a arrow will appear, click on that edge and drag the label to surround the group of processors.

2. Right click on the label, press configure. A Configure Label window will appear. Click in the Label Value field, type the name Generate Vehicle Location XML Simulator for the first phase of our dataflow. Choose 18px for Font Size.

label_first_phase_dataflow_lab1

Reference the picture of the dataflow in Step 5 if you would like to assign similar labels used in the dataflow for this tutorial. Feel free to create your own as labels for each phase of the dataflow too.


Tutorial 2: Enrich Simple DataFlow via Places API

Introduction

In this section, you will build a geographic location enrichment for the vehicle filtering dataflow. You will obtain a deep understanding of the automated and managed flow of information between multiple systems and the facilities in NiFi for monitoring and examining the dataflow. To add this enhancement, we will incorporate Google Places Nearby API with NiFi, which will show the neighborhoods nearby the vehicle’s location as it moves. This incorporation of external API’s is a feasible design pattern because NiFi makes it easy to bring in other technologies to process the data.

In this tutorial, you will build the Geo Location Enrichment section of the dataflow:

complete_dataflow_lab2_geoEnrich

Prerequisites

  • Completed Tutorial 0: Launch NiFi HTML UI
  • Completed Tutorial 1: Build A Simple NiFi DataFlow

Outline

If you want to see the NiFi flow in action within minutes, refer to Approach 1. Else if you prefer to build the dataflow manually step-by-step, continue on to Approach 2.

Approach 1: Import Enriched NiFi Flow Via Places API

1. Download the Tutorial-2-enrich-flow-via-places-api.xml template file. Then import the template file into NiFi.

2. Refer to Step 1 in Approach 2 to obtain the Google API key and set up Google Places API: HTTP URL.

3. Replace the InvokeHTTP processor’s Remote URL property value with the new Google Places API: HTTP URL value.

4. Hit the start button start_button_nifi_iot. to activate the dataflow.

complete_dataflow_lab2_geoEnrich

Approach 2: Manually Build Enriched NiFi Flow Via Places API

Google Places API

Google Places API Web Service returns information about places: establishments, geographic locations and prominent points of interest using HTTP requests. The Places API includes six place requests: Place Searches, Place Details, Place Add, Place Photos, Place Autocomplete and Query Autocomplete. Read more about these place requests in Introducing the API.

All requests are accessed through an HTTP request and return either JSON or XML response.

What are the necessary components to use the Places API?

  • https:// protocol
  • API Key

Step 1: Obtain API Key for NiFi to Build HTTP URL

For our dataflow, our task is to enrich the data by searching for neighborhoods within proximity of a vehicle’s varying location. We will retrieve two parameters from this data: name of the neighborhoods and San Francisco Muni Transit Agency. So, we will integrate Nearby Search HTTP request with NiFi.

The Nearby Search request is an HTTP URL of the following definition, which we will need for NiFi:

https://maps.googleapis.com/maps/api/place/nearbysearch/output?parameters

The output can come in two formats: json or xml. We will use json for this tutorial.

Let’s obtain the required parameters to initiate a Nearby Search request.

1. We will need to obtain an API key, so it can identify our application for quota management and places added from the application are instantly available to our app (NiFi).

2. We will use a standard Google Places API. Click on the blue Get A Key button to activate the API Web Service.

3. A window will appear that says Enable Google Places API Web Service. Select Yes. Then Create And Enable API. Wait a few seconds for the new window to load.

4. Now a screen with your unique API key will appear similar to the screen below:

api_key

Now we have the API Key parameter for our HTTP request. We also have the other required parameters: location thanks to tutorial 1 in which we extracted longitude & latitude attributes and radius, which can be a distance that does not surpass 50,000 meters. We will use one optional parameter type to signify what type of place we are interested in searching for.

5. Let’s build our HTTP URL with the parameters below, so we can insert the URL as a property value into InvokeHTTP later in the tutorial.

  • API Key = AIzaSyDY3asGAq-ArtPl6J2v7kcO_YSRYrjTFug
  • Latitude = ${Latitude}
  • Longitude = ${Longitude}
  • radius = 500
  • type = neighborhood
https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=${Latitude},${Longitude}&radius=500&type=neighborhood&key=AIzaSyDY3asGAq-ArtPl6J2v7kcO_YSRYrjTFug

Note: Your API Key will be different than the one in the URL above.

Step 2: Build Geo Location Enrichment DataFlow Section

Six processors are needed to add geographic location enrichment to your dataflow. Each processor holds a critical role in transporting the enriched data to a destination:

  • InvokeHTTP performs an HTTP request to access data from Google Places API about places nearby a vehicle’s location
  • EvaluateJsonPath extract neighborhoods_nearby and city data elements out of JSON structure
  • RouteOnAttribute routes FlowFiles as long as their neighborhoods_nearby and city attributes do not contain empty strings
  • AttributesToJSON represents the enriched attributes in JSON structure
  • MergeContent merges FlowFiles together by concatenating their JSON content together
  • PutFile writes the enriched geographic location contents of the FlowFile to the local file system

2.1 Learning Objectives: DataFlow Geo Enrichment Addition

  • Add/Configure/Connect processors to ingest, filter and store geo enriched API data
  • Troubleshoot problems that may occur
  • Run the dataflow

InvokeHTTP

1. Add the InvokeHTTP processor onto the NiFi graph. Connect RouteOnAttribute from tutorial 1 to InvokeHTTP processor. When the Create Connection window appears, verify Filter Attributes checkbox is checked, if not check it. Click Add.

2. Open InvokeHTTP configure properties tab and add the property listed in Table 1.

Table 1: Update InvokeHTTP Property Value(s)

Property Value
Remote URL https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=${Latitude},${Longitude}&radius=500&type=neighborhood&key=AIzaSyDY3asGAq-ArtPl6J2v7kcO_YSRYrjTFug

Remote URL connects to the HTTP URL we created using Google Places API and feeds that data into the dataflow. Notice we use two NiFi expressions for location parameter. This is because those two values change as new FlowFiles pass through this processor.

invokeHTTP_config_property_tab_window

3. Navigate to the Settings tab, change the name from InvokeHTTP to GoogleNearbySearchAPI. Under Auto terminate relationships check the Failure, No Retry, Original and Retry checkboxes. Click Apply button.

EvaluateJsonPath

1. Add the EvaluateJsonPath processor onto the NiFi graph. Connect InvokeHTTP to EvaluateJsonPath processor. When the Create Connection window appears, select Response checkbox. Click Add.

2. Open EvaluateJsonPath configure properties tab and update the original properties with the properties listed in Table 2. Note: add city and neighborhoods_nearby property by clicking the New property button, then insert their values into the properties tab.

Table 2: Update and Add New EvaluateJsonPath Property Values

Property Value
Destination flowfile-attribute
Return Type json
city $.results[0].vicinity
neighborhoods_nearby $.results[*].name
  • Destination result from JSON Path Evaluation stored in FlowFile attributes.
  • 2 user-defined attributes each hold a value that is used in the NiFi Expression language filtering condition in the next processor.

evaluateJsonPath_config_property_tab_window

3. Navigate to the Settings tab. Under Auto terminate relationships check the unmatched and failure checkboxes. Click Apply button.

RouteOnAttribute

1. Add the RouteOnAttribute processor onto the NiFi graph. Connect EvaluateJsonPath to RouteOnAttribute processor. When the Create Connection window appears, select matched checkbox. Click Add.

2. Open RouteOnAttribute configure properties tab and click on New property button to add RouteNearbyNeighborhoods to property name and insert its NiFi expression value listed in Table 3.

Table 3: Add New RouteOnAttribute Property Value

Property Value
RouteNearbyNeighborhoods ${city:isEmpty():not():and(${neighborhoods_nearby:isEmpty():not()})}

RouteNearbyNeighborhoods uses the FlowFile Attribute values obtained from JSON Path Expressions to filter out any FlowFiles that have at least one empty Attribute value. Else the FlowFiles are passed to the remaining processors.

routeOnAttribute_geoEnrich_config_property_tab_window

3. Navigate to the Settings tab, change the name from RouteOnAttribute to RouteNearbyNeighborhoods. Under Auto terminate relationships check the unmatched checkbox. Click Apply button.

AttributesToJSON

1. Add the AttributesToJSON processor onto the NiFi graph. Connect RouteOnAttribute to AttributesToJSON processor. When the Create Connection window appears, select RouteNearbyNeighborhoods checkbox. Click Add.

2. Open AttributesToJSON configure properties tab and update the properties with the information listed in Table 4.

Table 4: Update AttributesToJSON Property Values

Property Value
Attributes List Vehicle_ID, city, Latitude, Longitude, neighborhoods_nearby, Last_Time
Destination flowfile-content

attributesToJSON_geoEnrich_config_property_tab_window

3. Navigate to the Settings tab, under Auto terminate relationships check the failure checkbox. Click Apply button.

MergeContent

1. Add the MergeContent processor onto the NiFi graph. Connect AttributesToJSON to MergeContent processor. When the Create Connection window appears, select success checkbox. Click Add.

2. Open MergeContent configure properties tab and update the properties with the information listed in Table 5. For the Demarcator property, type , then press shift+enter.

Table 5: Update MergeContent Property Values

Property Value
Minimum Number of Entries 10
Maximum Number of Entries 15
Delimiter Strategy Text
Header [
Footer ]
Demarcator , {press-shift+enter}

mergeContent_geoEnrich_config_property_tab_window

3. Navigate to the Settings tab, under Auto terminate relationships check the failure and original checkbox. Click Apply button.

PutFile

1. Add the PutFile processor onto the NiFi graph. Connect MergeContent to PutFile processor. When the Create Connection window appears, select merged checkbox. Click Add.

2. Open PutFile configure properties tab and update the property with the information listed in Table 6.

Table 6: Update PutFile Property Values

Property Value
Directory /tmp/nifi/output/nearby_neighborhoods_search

putFile_geoEnrich_config_property_tab_window

3. Navigate to the Settings tab, under Auto terminate relationships check the success checkbox. Click Apply button. Connect the processor to itself and when the Create Connection window appears, select failure checkbox.

Step 3: Run NiFi DataFlow

Now that we added geographic location enrichment dataflow section to our previous dataflow, let’s run the dataflow and verify if we receive the expected results in our output directory.

1. Go to the actions toolbar and click the start button start_button_nifi_iot. Your screen should look like the following:

complete_dataflow_lab2_geoEnrich

2. Let’s check the data was written to our expected directory, open your terminal. Make sure to SSH into your sandbox if on sandbox, else navigate to the output directory on your local machine. Navigate to the directory you wrote for the PutFile processor. List the files and open one of the newly created files to view geographic neighborhoods nearby transit location enrichment data output. In the tutorial our directory path is: /tmp/nifi/output/nearby_neighborhoods_search.

cd /tmp/nifi/output/nearby_neighborhoods_search
ls
vi 2ae30f7d-5ffe-4e29-92f0-8f0e7c9224b6

output_geoEnrich_nearby_neighborhoods_data

Summary

Congratulations! For the Geo Enrichment section of the dataflow, you learned to use InvokeHTTP to access geographic location of nearby places with Google Places Search API. You learned to add NiFi expression variables into InvokeHTTP property RemoteURL, so that the values for latitude and longitude constantly change in the URL when new FlowFiles pass through this processor. You learned to use EvaluateJsonPath similar to EvaluateXPath, except JSON Expression is used to extract JSON elements (neighborhoods_nearby & city) from a JSON structure. Now you know how to incorporate external API’s into NiFi further enhance the dataflow.

Further Reading


Tutorial 3: Ingest Live Vehicle Routes via NextBus API

Introduction

In this tutorial, you will replace the section of our dataflow that generates the simulation of vehicle location XML data with a new section that ingests a live stream of data from NextBus San Francisco Muni Agency on route OceanView into our NiFi DataFlow.

In this tutorial, you will build the Ingest NextBus SF Muni Live Stream section of the dataflow:

complete_dataflow_lab3_live_stream_ingestion

Prerequisites

  • Completed Tutorial 0: Launch NiFi HTML UI
  • Completed Tutorial 1: Build A Simple NiFi DataFlow
  • Completed Tutorial 2: Enrich Simple DataFlow via Places API

Outline

If you want to see the NiFi flow in action within minutes, refer to Approach 1. Else if you prefer to build the dataflow manually step-by-step, continue on to Approach 2.

Approach 1: Import Live Vehicle Routes NiFi Flow

1. Download the Tutorial-3-ingest-live-vehicle-routes-nextbus-api.xml template file. Then import the template file into NiFi.

2. Hit the start button start_button_nifi_iot. to activate the dataflow.

complete_dataflow_lab3_live_stream_ingestion

Approach 2: Manually Build Live Vehicle Routes NiFi Flow

NextBus Live Feed

NextBus Live Feed provides the public with live information regarding passenger information, such as vehicle location information, prediction times on transit vehicles, routes of vehicles and different agencies (San Francisco Muni, Unitrans City of Davis, etc). We will learn to use NextBus’s API to access the XML Live Feed Data and create an URL. In this URL we will specify parameters in a query string. The parameters for the tutorial will include the vehicle location, agency, route and time.

After viewing the Live Feed Documentation, we created the following URL for the GetHTTP processor:

http://webservices.nextbus.com/service/publicXMLFeed?command=vehicleLocations&a=sf-muni&r=M&t=0

Let’s break apart the parameters, so we can better understand how to create custom URLs. There are 4 parameters:

  • commands: command = vehicleLocations
  • agency: a=sf-muni
  • route: r=M
  • time: t=0

Refer to NextBus’s Live Feed Documentation to learn more about each parameter.

Step 1: Attach NextBus Live Stream to the DataFlow

GetHTTP

  1. Delete GetFile, UnpackContent and ControlRate processors. We will replace them with the GetHTTP processor.
  2. Add the GetHTTP processor and drag it to the place where the previous three processors were located. Connect GetHTTP to EvaluateXPath processor located above SplitXML. When the Create Connection window appears, select success checkbox. Click Add.
  3. Open GetHTTP Config Property Tab window. We will need to copy and paste Nextbus XML Live Feed URL into the property value. Add the property listed in Table 1.

Table 1: Update GetHTTP Properties Tab

Property Value
URL http://webservices.nextbus.com/service/publicXMLFeed?command=vehicleLocations&a=sf-muni&r=M&t=0
Filename vehicleLoc_SF_OceanView_${now():format("HHmmssSSS")}.xml

getHTTP_liveStream_config_property_tab_window

4. Now that each property is updated. Navigate to the Scheduling tab and change the Run Schedule from 0 sec to 4 sec, so that the processor executes a task every 1 second. Therefore, overuse of system resources is prevented.

5. Open the processor config Settings tab, change the processor’s Name from GetHTTP to NextBusXMLFeed. Click Apply button.

Modify PutFile in Geo Enrich Section

1. Open PutFile Configure Properties Tab. Change the Directory property value from the previous value to the value shown in Table 2:

Table 2: Update PutFile Properties Tab

Property Value
Directory /tmp/nifi/output/nearby_neighborhoods_liveStream

Directory is changed to a new location for the real-time data coming in from NextBus live stream.

modify_putFile_in_geo_enrich_section

2. Click Apply.

Step 2: Run the NiFi DataFlow

Now that we added NextBus San Francisco Muni Live Stream Ingestion to our dataflow , let’s run the dataflow and verify if we receive the expected results in our output directory.

1. Go to the actions toolbar and click the start button start_button_nifi_iot. Your screen should look like the following:

complete_dataflow_lab3_live_stream_ingestion

2. Let’s verify the data in output directory is correct. Navigate to the following directories and open a random one to check the data.

cd /tmp/nifi/output/nearby_neighborhoods_liveStream
ls
vi 002a7d47-b6bb-41e1-b930-0e172a54638b

Did you receive neighborhoods similar to the image below?

nextbus_liveStream_output_lab3

Summary

Congratulations! You learned how to use NextBus’s API to connect to their XML Live Feed for vehicle location data. You also learned how to use the GetHTTP processor to ingest a live stream from NextBus San Francisco Muni into NiFi!

Further Reading