Apache Spark & Hadoop

Spark adds in-Memory Compute for ETL, Machine Learning and Data Science Workloads to Hadoop

Apache Spark brings fast, in-memory data processing to Hadoop. Elegant and expressive development APIs in Scala, Java, R, and Python allow data workers to efficiently execute streaming, machine learning or SQL workloads for fast iterative access to datasets.

What Apache Spark Does

Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs to allow data workers to efficiently execute streaming, machine learning or SQL workloads that require fast iterative access to datasets. With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Hadoop.

The Hadoop YARN-based architecture provides the foundation that enables Spark and other applications to share a common cluster and dataset while ensuring consistent levels of service and response. Spark is now one of many data access engines that work with YARN in HDP.

Arun Murthy : Hadoop & Spark : Perfect Together : Spark Summit 2015

Apache Spark consists of Spark Core and a set of libraries. The core is the distributed execution engine and the Java, Scala, and Python APIs offer a platform for distributed ETL application development.

Spark screen shot 3

Additional libraries, built atop the core, allow diverse workloads for streaming, SQL, and machine learning.

Spark is designed for data science and its abstraction makes data science easier.   Data scientists commonly use machine learning – a set of techniques and algorithms that can learn from data. These algorithms are often iterative, and Spark’s ability to cache the dataset in memory greatly speeds up such iterative data processing, making Spark an ideal processing engine for implementing such algorithms.

Spark also includes MLlib, a library that provides a growing set of machine algorithms for common data science techniques: Classification, Regression, Collaborative Filtering, Clustering and Dimensionality Reduction.

Spark’s ML Pipeline API is a high level abstraction to model an entire data science workflow.   The ML pipeline package in Spark models a typical machine learning workflow and provides abstractions like Transformer, Estimator, Pipeline & Parameters.  This is an abstraction layer that makes data scientists more productive.

Spark Use Cases

As Apache Spark’s momentum continues to grow we are seeing customers across all industries get real value from using it with the Hortonworks Data Platform (HDP).  Customers are using Spark to improve their businesses by detecting patterns and providing actionable insight which is driving organizational change and also starting to change some facets of life.  The following table provides a few examples, from insurance to internet companies, of how Spark is being used:

Insurance Optimize their claims reimbursements process by using Spark’s machine learning capabilities to process and analyze all claims.
Healthcare Build a Patient Care System using Spark Core, Streaming and SQL.
Retail Use Spark to analyze point-of-sale data and coupon usage.
Internet Use Spark’s ML capability to identify fake profiles and enhance products matches that they show their customers.

Many customers are using Cloudbreak and Ambari to spin up clusters in the cloud for ad-hoc, self-service data science.

Spark & HDP

Spark is certified as YARN Ready and is a part of HDP. Memory and CPU-intensive Spark-based applications can coexist with other workloads deployed in a YARN-enabled cluster. This approach avoids the need to create and manage dedicated Spark clusters and allows for more efficient resource use within a single cluster.

spark screen shot

Spark is integrated within Hortonworks Data Platform

HDP also provides consistent governance, security and management policies for Spark applications, just as it does for the other data processing engines within HDP.

Hortonworks Focus on Spark: Vertical and Horizontal Integration with HDP


Hortonworks approached Spark in the same way we approached other data access engines like Storm, Hive, and HBase. We outline a strategy, rally the community, and contribute key features within the Apache Software Foundation’s process.

Below is a summary of the various integration points that make Spark and HDP enterprise-ready.

  • Support for the ORC File Format
    As part of the Stinger Initiative, the Hive community introduced the Optimized Row Columnar (ORC) file format. ORC is a columnar storage format that is tightly integrated with HDFS and provides optimizations for both read performance and data compression. It is rapidly becoming the defacto storage format for Hive. Hortonworks contributed to SPARK-2883, which provides basic support of ORC file format in Spark.  We worked with the community to bring ORC support to Spark and now this work is GA with Spark 1.4.1 (SPARK-2883, SPARK-10623).
  • Security
    Many of our customers’ initial use cases for Spark run on Hadoop clusters which either do not contain sensitive data or are dedicated for a single application and so they are not subject to broad security requirements. But users plan to deploy Spark-based applications alongside other applications in a single cluster, so we worked to integrate Spark with the security constructs of the broader Hadoop platform. We hear a common request that Spark runs effectively on a secure Hadoop cluster and can leverage authorization offered by HDFS.Also to improve security we have worked within the community to ensure that Spark runs on a Kerberos-enabled cluster. This means that only authenticated users can submit Spark jobs.
  • Operations
    Hortonworks continues to focus on streamlining operations for Spark through the 100% open source Apache Ambari. Our customers use Ambari to provision, manage and monitor their HDP clusters, and many Hortonworks partners, such as Microsoft, Teradata, Pivotal and HP have all taken advantage and backed this foundational Hadoop project. Currently, our partners leverage Ambari Stacks to rapidly define new components/services and add those within a Hadoop cluster. With Stacks, Spark component(s) and services can be managed by Ambari so that you can install, start, stop and configure to fine-tune a Spark deployment all via a single interface that is used for all engines in your Hadoop cluster. The Quick Links feature of Ambari will allow for the cluster operator to access the native Spark User Interface.To simplify the operational experience, HDP 2.2.4 also allows Spark to be installed and be managed by Apache Ambari 2.0. Ambari allows the cluster administrator to manage the configuration of Spark and Spark daemons life cycles.
  • Seamless Data Access
    Most of our customers who use Spark also use Hive. Last year we worked with the community to upgrade the version of Hive in Spark to 0.13.1 and this year we have upgraded Spark to use Hive 1.2.1. We have also streamlined the integration between Hive & Spark. This integration enables customers who have embraced the data lake concept to access their data from Hive or from Spark without running into Hive version incompatibility issues. In addition, this work lays the foundation for both Hive and Spark  to evolve independently while also allowing Spark to leverage Hive.
  • Improved Reliability and Scale of Spark-on-YARN
    The Spark API allows developers to create both iterative and in-memory applications on Apache Hadoop YARN. With the community interest behind it Spark is making great strides in efficient cluster resource usage. With Dynamic executor Allocation on YARN, Spark only uses Executors within a bound. We continue to believe Spark can use the cluster resources more efficiently and are working with the community to promote a better resource usage.
  • YARN ATS Integration
    From an operations perspective, Hortonworks has integrated Spark with the YARN Application Timeline Server (ATS). ATS provides generic storage and retrieval of applications’ current and historic information. This permits a common integration point for certain classes of operational information and metrics. With this integration, the cluster operator can take advantage of information already available from YARN to gain additional visibility into the health and execution status of the Spark jobs.

Fundamentally, our strategy continues to focus on innovating at the core of Hadoop and we look forward to continuing to support our customers and partners by contributing to a vibrant Hadoop ecosystem that includes Apache Spark as yet another data access application running in YARN.


Recent Spark Releases

For additional details about this release review the following resources:

Spark  Version Notable Enhancements
1.5.1 TP
  • Adds several new machine learning algorithms and utilities, and extends Spark’s new R API
  • Adds web visualization of SQL and DataFrame query plans
  • Delivers first major pieces of Project Tungsten
  • Adds backpressure support to Spark Streaming

Spark 1.5.1. TP

Spark 1.5.1 Technical Preview – with HDP

This technical preview allows you to evaluate Apache Spark 1.5.1 on YARN with HDP 2.3.

With YARN, Hadoop supports various types of workloads. Spark on YARN becomes yet another workload running against the same set of hardware resources.

This technical preview describes how to:

  • Run Spark on YARN and run the canonical Spark examples: SparkPi and Wordcount.
  • Run Spark 1.5.1 on HDP 2.3.
  • Use the Spark DataFrame API.
  • Read/write data from Hive.
  • Use SparkSQL thrift JDBC/ODBC Server.
  • Use ORC files with Spark, with examples.
  • Use SparkR.

HDP Cluster Requirements:

This technical preview can be installed on any HDP 2.3.x cluster, whether it is a multi-node cluster or a single-node HDP Sandbox.


The Spark 1.5.1 Technical Preview is provided in RPM and DEB package formats. The following instructions assume RPM packaging:

  1. Download the Spark 1.5.1 repository:For CentOS:
    wget -nv  http://private-repo-1.hortonworks.com/HDP/centos6/2.x/updates/  -O /etc/yum.repos.d/HDP-TP.repo

    For Ubuntu:

  2. Install the Spark Package:
    Download the Spark 1.5.1 RPM (and pySpark, if desired) and set it up on your HDP 2.3 cluster:

    yum install spark_2_3_2_1_12-master -y

    If you want to use pySpark, install it as follows and ensure Python is installed on all nodes.

    yum install spark_2_3_2_1_12-python  -y

    The RPM installer will also download core Hadoop dependencies.

  3. Set JAVA_HOME:
    Make sure that you set JAVA_HOME before you launch the Spark Shell or thrift server.

    export JAVA_HOME=<path to JDK 1.8>
  4. Create hive-site in Spark conf.
    As user root, create the file SPARK_HOME/conf/hive-site.xml. Edit the file to contain only the following configuration setting:

        <!--Make sure that <value> points to the Hive Metastore URI in your cluster -->
        <description>URI for client to contact metastore server</description>

Run the Spark Pi Example

To test compute-intensive tasks in Spark, the Pi example calculates pi by “throwing darts” at a circle — it generates points in the unit square ((0,0) to (1,1)) and counts how many points fall within the unit circle within the square. The result approximates  pi/4, which is used to estimate Pi.

  1. Change to your Spark directory and switch to the spark OS user:
    cd /usr/hdp/
    su spark
  2. Run the Spark Pi example in yarn-client mode:
    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10

    Note: The Pi job should complete without any failure messages. It should produce output similar to the following. Note the value of pi near the end of the output.

    15/11/10 14:28:35 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 1.721177 s
    Pi is roughly 3.141296
    15/11/10 14:28:35 INFO spark.ContextCleaner: Cleaned accumulator 1

Using WordCount with Spark

First, copy the input file for the Spark WordCount Example.

Upload the input file you want to use in WordCount to HDFS. You can use any text file as input. In the following example, log4j.properties is used as an example.

As user spark:

hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data

To run WordCount:

  1. Run the Spark shell:
    ./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

    Output similar to the following will be displayed, followed by the “scala>” REPL prompt:

    Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
    Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40)
    Type in expressions to have them evaluated.
    Type :help for more information.
    15/11/10 14:33:49 INFO spark.SparkContext: Running Spark version 1.5.1
  2. At the Scala REPL prompt, enter:
    val file = sc.textFile("/tmp/data")
    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
  3. Review output
    To view WordCount output in the scala shell:

    scala > counts.count()

    To print the full output of the WordCount job:

    scala > counts.toArray().foreach(println)

    To view WordCount output using HDFS:

    1. Exit the scala shell.
      scala > exit
    2. View WordCount Results:
      hadoop fs -ls /tmp/wordcount

      It should display output similar to:

    3. Use the HDFS cat command to see the WordCount output. For example:
      hadoop fs -cat /tmp/wordcount/part-00000

Using the Spark DataFrame API

The DataFrame API provides easier access to data, because it looks conceptually like a table. Developers familiar with the Python Pandas library or R are familiar with DataFrames.

  1. As user spark, upload the people.txt file to HDFS:
    cd /usr/hdp/
    su spark
    hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt
    hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
  2. As user spark, launch the Spark Shell:
    cd /usr/hdp/
    su spark
    ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
  3. At the Spark Shell, type the following:
    scala>val df = sqlContext.jsonFile("people.json")
  4. Using df.show, display the contents of the DataFrame:
    15/11/10 13:24:10 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool
    | age|   name|
    |  30|   Andy|
    |  19| Justin|

Additional DataFrame API examples

scala>import org.apache.spark.sql.functions._
// Select everybody, but increment the age by 1
scala>df.select(df("name"), df("age") + 1).show()
// Select people older than 21
scala>df.filter(df("age") > 21).show()
// Count people by age

Programmatically Specifying Schema

import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)

This will produce output similar to the following:

15/11/10 14:36:49 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool 
15/11/10 14:36:49 INFO scheduler.DAGScheduler: ResultStage 13 (collect at :33) finished in 0.129 s
15/11/10 14:36:49 INFO scheduler.DAGScheduler: Job 10 finished: collect at :33, took 0.162827 s
Name: Michael
Name: Andy
Name: Justin

Running Hive Examples

The following example reads and writes to HDFS under Hive directories. In a production environment one needs appropriate HDFS permissions, but for evaluation purposes you can run this example as hdfs user.

  1. Launch the Spark Shell on YARN cluster:
    su hdfs
    ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
  2. Create Hive Context:
    scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

    You should see output similar to the following:

    hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@7d9b2e8d
  3. Create a Hive Table:
    scala> hiveContext.sql("CREATE TABLE IF NOT EXISTS TestTable (key INT, value STRING)")

    You should see output similar to the following:

    15/11/10 14:40:02 INFO log.PerfLogger: &lt/PERFLOG method=Driver.run start=1447184401403 end=1447184402898 duration=1495 from=org.apache.hadoop.hive.ql.Driver&gt
    res8: org.apache.spark.sql.DataFrame = [result: string]
  4. Load example KV value data into Table:
    scala> hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE TestTable")
  5. Invoke Hive collect_list UDF:
    scala> hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order by key").collect.foreach(println)

Reading & Writing ORC Files

Hortonworks worked in the community to bring full ORC support to Spark. Recently we blogged about using ORC with Spark. See the blog post for all ORC examples, with advances such as partition pruning and predicate pushdown.

Accessing the SparkSQL Thrift Server for JDBC/ODBC

With this Tech Preview, SparkSQL’s thrift server provides JDBC access to SparkSQL.

  1. Start the Thrift Server
    First, as root user, create a logs directory and make the spark user its owner:

    sudo root
    mkdir logs
    chown spark:hadoop logs

    Then, from SPARK_HOME, start the SparkSQL thrift server. Specify the port value of the thrift JDBC server.

    su spark ./sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10001
  2. Connect to the Thrift Server over beeline
    Launch beeline from SPARK_HOME:

    su spark
    cd /usr/hdp/
  3. Connect to the Thrift Server and Issue SQL commands
    On the beeline prompt:

    beeline>!connect jdbc:hive2://localhost:10001

    – This example does not have security enabled, so any username password should work.
    – The connection might take a few seconds to be available in a Sandbox environment. Try the show tables command after waiting 10-15 seconds.

    0: jdbc:hive2://localhost:10001> show tables;
    | tableName | isTemporary   |
    | orc_table | false         |
    | testtable | false         |
    2 rows selected (1.275 seconds)
    0: jdbc:hive2://localhost:10001> exit
  4. Stop the Thrift Server:

Running SparkR

Before you run SparkR, make sure R is installed on all nodes. For information about how to install R on CentOS, see http://www.jason-french.com/blog/2013/03/11/installing-r-in-linux/. Be sure to set JAVA_HOME.

  1. Launch SparkR:
    su spark
    cd /usr/hdp/

    This will show output similar to the following:

    Welcome to
        ____              __ 
       / __/__  ___ _____/ /__ 
      _\ \/ _ \/ _ `/ __/  '_/ 
     /___/ .__/\_,_/_/ /_/\_\   version  1.5.1 
    Spark context is available as sc, SQL context is available as sqlContext
  2. At the R prompt, create a DataFrame and list the first few lines:
    sqlContext <- sparkRSQL.init(sc)
    df <- createDataFrame(sqlContext, faithful)

    This will show output similar to the following:

     eruptions waiting
    1     3.600      79
    2     1.800      54
    3     3.333      74
    4     2.283      62
    5     4.533      85
    6     2.883      55
  3. Read the “people” DataFrame:
    people <- read.df(sqlContext, "people.json", "json")

    This will produce output similar to the following:

     age    name
    1  NA Michael
    2  30    Andy
    3  19  Justin

For additional SparkR examples, see https://spark.apache.org/docs/latest/sparkr.html.

Running the Machine Learning Spark Application

When running the Machine Learning application, make sure all of your nodemanager nodes have gfortran library. If not, you will need to install it on all of your nodemanager nodes:

sudo yum install gcc-gfortran

Note: It is usually available in the update repo for CentOS. For example:

sudo yum install gcc-gfortran --enablerepo=update

MLlib throws a linking error if it cannot detect these libraries automatically. For example, if you try to do Collaborative Filtering without gfortran runtime library installed, you will see the following linking error:

java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dposv(CII[DII[DII)I
    at org.jblas.NativeBlas.dposv(Native Method)
    at org.jblas.SimpleBlas.posv(SimpleBlas.java:369)
    at org.jblas.Solve.solvePositive(Solve.java:68)

Visit http://spark.apache.org/docs/latest/mllib-guide.html for Spark ML examples.

For the latest discussions about Spark, visit the forum:


Hortonworks Focus for Spark

Hortonworks continues to invest in Spark for Open Enterprise Hadoop so users can deploy Spark-based applications alongside other Hadoop workloads in a consistent, predictable and robust way.  At Hortonworks we believe that Spark & Hadoop are perfect together and our focus is on:

  • Data Science Acceleration
    • Improve data science productivity by improving Apache Zeppelin and by contributing additional Spark algorithms and packages to ease the development of key solutions.
  • Seamless Data Access
    • There are additional opportunities for Hortonworks to contribute to and maximize the value of technologies that interact with Spark.
    • We are focused on improving Spark’s integration with YARN, HDFS, Hive, HBase and ORC.
    •  Specifically, we believe that we can further optimize data access via the new DataSources API. For example, this should allow SparkSQL users to take full advantage of the following capabilities:
      • ORC File instantiation as a table
      • Column pruning
      • Language integrated queries
      • Predicate pushdown
  • Innovate at the Core
    • Contribute additional machine learning algorithms and enhance Spark’s enterprise security, governance, operations, and readiness.

You can read further details of our focus from the Spark and Hadoop Perfect Together Blog.

Getting Started with Spark

For developers new to Spark, our conversations typically revolve around two stages in their journey building Spark-based applications:

Stage 1 – Explore and Develop in Spark Local Mode

The first stage starts with a local mode of Spark where Spark runs on a single node. The developer uses this system to learn Spark and starts to build a prototype of the their application leveraging the Spark API. Using Spark Shells (Scala REPL & PySpark), a developer rapidly prototypes and packages a Spark application with tools such as Maven or Scala Build Tool (SBT). Even though the dataset is typically small (so that it fits on a developer machine), a developer can easily debug the application on a single node.

Stage 2 – Deploy Production Spark Applications

The second stage involves running the prototype application against a much larger dataset to fine tune it and get it ready for a production deployment. Typically, this means running Spark on YARN as another workload in the enterprise data lake and allowing it to read data from HDFS. The developer takes the custom application created against a local mode of Spark and submits the application as a Spark job to a staging or production cluster.

Data Science with Spark

For data scientists, Spark is a highly effective data processing tool. It offers first class support for machine learning algorithms and provides an expressive and higher-level API abstraction for transforming or iterating over datasets. Put simply, Apache Spark makes it easier to build machine learning pipelines compared to other approaches.
Data scientists often use tools such as Notebooks (e.g. iPython) to quickly create prototypes and share their work. Many of data scientists love R, and the Spark community is hard at work to deliver R integration with SparkR. We are excited about this emerging capability.

For ease of use, Apache Zeppelin is an emerging tool that provides Notebook features for Spark. We have been exploring Zeppelin and discovered that it makes Spark more accessible and useful.

Here is a screenshot that provides a view into the compelling user interface that Zeppelin can provide for Spark.

Zeppelin Image

With the release of Spark 1.3, it brought forth new features such as a DataFrames API and Direct Kafka support in Spark Streaming. With Spark 1.4, SparkR gives R support. Given the pace with which these capabilities continue to appear, we plan to continue to provide updates via tech previews between our major releases to allow customers to keep up with the speed of innovation in Spark.

Learn, Try, and Do

The next step is to traverse the tutorials tabs and try some tutorials that demonstrate how to load data into HDFS, create Hive tables, process data in memory, and query data, all using Spark APIs and Spark shell.

To get started download Spark and Zeppelin in the Hortonworks Sandbox and then take a look at the following Spark tutorials:

Spark Tutorials

Spark in our Blog

Spark in the Press

Webinars & Presentations


to create new topics or reply. | New User Registration

This forum contains 89 topics and 165 replies, and was last updated by  Srinivasarao Daruna 7 hours, 5 minutes ago.

Viewing 20 topics - 1 through 20 (of 89 total)
Viewing 20 topics - 1 through 20 (of 89 total)

You must be to create new topics. | Create Account

Hortonworks Data Platform
The Hortonworks Data Platform is a 100% open source distribution of Apache Hadoop that is truly enterprise grade having been built, tested and hardened with enterprise rigor.
Get started with Sandbox
Hortonworks Sandbox is a self-contained virtual machine with Apache Hadoop pre-configured alongside a set of hands-on, step-by-step Hadoop tutorials.
Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
Stay up to date!
Developer updates!