Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
HDP > Develop with Hadoop > Apache Spark

Introduction to Spark Streaming

cloud Ready to Get Started?

DOWNLOAD SANDBOX

Introduction

In this tutorial, we will introduce core concepts of Apache Spark Streaming and run a Word Count demo that computes an incoming list of words every two seconds.

Prerequisites

This tutorial is a part of series of hands-on tutorials to get you started with HDP using Hortonworks Sandbox. Please ensure you complete the prerequisites before proceeding with this tutorial.

Outline

Concepts

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

Spark Streaming

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming

DStream

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details).

Setup

1. Start your Sandbox

First, start your Sandbox Virtual Machine (VM) in either a VirtualBox or VMware environment and note your VM IP address.

We will refer to your VM IP address as <HOST IP> throughout this tutorial.

If you need help finding your <HOST IP> checkout Learning the Ropes.

2. Launch a “Shell in a Box”

Now that your Sandbox is running, open a web browser and go to: http://<HOST IP>:4200

Where <HOST IP> is the IP address of your Sandbox machine.

For example, the default address for VirtualBox is http://127.0.0.1:4200

Next, log into the “Shell in a Box” using the following default credentials:

User: root
Pass: hadoop

If you’re logging for the first time you will be required to change your password.

3. Download a Spark Streaming Demo to Sandbox

Now let’s download a Spark Streaming demo code to your sandbox from GitHub.

In your “Shell in a Box” execute the following two commands:

cd /tmp

and

wget https://raw.githubusercontent.com/roberthryniewicz/sample-code/master/spark-streaming-demo.py

Note: wget <url> downloads Spark Streaming code that computes a simple Word Count. Words (i.e. strings) will be coming in via a network socket connection from a simple Netcat tool introduced later.

Several things worth pointing out in the demo code you’ve just downloaded:

  1. We’ve set a 2 sec batch interval to make it easier to inspect results of each batch processed.
  2. We perform a simple word count for each batch and return the results back to the terminal screen with a pprint() function.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working threads and a batch interval of 2 seconds
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 2)

# Create a DStream
lines = ssc.socketTextStream("localhost", 3333)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print each batch
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

4. Submit a Spark Streaming Job

Now you’re ready to submit a Spark job. In your terminal window copy and paste the following and hit Enter:

/usr/hdp/current/spark-client/bin/spark-submit /tmp/spark-streaming-demo.py

You should see lots of INFO interspersed with Timestamp corresponding to each batch that is updated every 2 seconds:

===========================================
Time: 2016-03-16 01:26:22
===========================================

5. Run Netcat

Netcat (often abbreviated to nc) is a computer networking utility for reading from and writing to network connections using TCP or UDP.

In your browser, open a second tab or window, and open another “Shell in a Box” by going to http://<Host IP>:4200.

For example, http://127.0.0.1:4200 if you’re running a VirtualBox.

Login to your shell and run the following command to launch Netcat:

 nc -l localhost 3333

At this point you should be connected and you may start typing or pasting any text.

For example, if you type the following hello hello world text in the Netcat window, you should see the following output in the already running Spark Streaming job tab or window:

===========================================
Time: 2016-03-16 01:26:24
===========================================
(hello, 2)
(world, 1)

6. Stopping Spark Streaming and Netcat

When you’re done experimenting, press Ctrl + C in your shell tab or window to stop your Spark Job and/or Netcat process.

7. Suppressing INFO Messages (Optional)

If you want to remove annoying INFO messages from the Spark streaming terminal window, do the following:

Open conf/log4j.properties, for example:

vi /usr/hdp/current/spark2-client/conf/log4j.properties

and Edit log4j.properties:

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

Replace the first line:

log4j.rootCategory=INFO, console

with

log4j.rootCategory=WARN, console

Save log4j.properties and restart your spark-submit job. Now you should see only WARN messages.

Further Reading

Once you’ve completed this tutorial, checkout other Spark Tutorials.