Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics, offering information and knowledge of the Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
April 26, 2017
prev slideNext slide

Livy: A REST Interface for Apache Spark

Apache Spark is a powerful framework for data processing and analysis. Spark provides two modes for data exploration:

  • Interactive: provided by spark-shell, pySpark, and SparkR REPLs
  • Batch: using spark-submit to submit a Spark application to cluster without interaction in the middle of run-time.

While these two modes look different on the surface, deep down they are actually unified. When the user executes a shell command to launch a client to submit Spark application, after the Spark application is launched, this client could either keep serving as REPL (interactive mode) or exit silently (batch mode).

Regardless of the mode the user chooses to launch Spark application, the user has two choices. First, is to log into to a cluster gateway machines and launch applications. The second choice is to launch from local dev machine. Both choices have several flaws:

  1. Centralize the resource overheads and possibilities of failure to gateway machines.
  2. Difficult to introduce sophisticated access control mechanism, and to integrate into existing security services like Knox, Ranger.
  3. Unnecessarily exposing deployment details to the users.

In order to overcome the current shortcomings of executing Spark applications, and to introduce additional features, we introduce Livy – a REST based Spark interface to run statements, jobs, and applications.

What is Livy?

Livy is an open source REST interface for interacting with Apache Spark from anywhere. It supports executing snippets of code or programs in a Spark context that runs locally or in Apache Hadoop YARN. Livy provides the following features:

  • Interactive Scala, Python, and R shells
  • Batch submissions in Scala, Java, Python
  • Multiple users can share the same server (impersonation support)
  • Can be used for submitting jobs from anywhere with REST
  • Does not require any code change to your programs
  • Support Spark1/ Spark2, Scala 2.10/2.11 within one build.

Core Functionalities

Livy offers three modes to run Spark jobs:

  1. Using programmatic API
  2. Running interactive statements through REST API
  3. Submitting batch applications with REST API

In the following sections, I will provide the details of these 3 modes.

Using Programmatic API

Using programmatic API to run Spark jobs is very similar to original Spark applications. Let’s refer to a very simple example of calculating Pi. Here the left column is written using Spark’s API, and right using Livy API.

 

Spark API Livy API

def sample(p):

    x = random() * 2 – 1
   y = random() * 2 – 1
   return 1 if x ** 2 + y ** 2 <= 1 else 0

count = sc.parallelize(xrange(0,samples)).map(sample) \
            .reduce(lambda a, b: a + b)

return 4.0 * count / samples

def sample(_):
   x = random() * 2 – 1
   y = random() * 2 – 1
   return 1 if x ** 2 + y ** 2 <= 1 else 0def pi_job(context):
   count = context.sc.parallelize(range(0, samples), slices).map(sample).reduce(add)
   return 4.0 * count / samples

As you can see, despite the difference of getting entry point – SparkContext, the main logic is the same for these two codes. So it is seamless to transplant the existing code to run using Livy.

There are two major differences between these two APIs:

  1. When Using Spark API, the entry point, SparkContext, is created by the user who wrote code, while in Livy API, the SparkContext is offered by the framework, the user doesn’t need to create it.
  2. The code is submitted through REST API from client to Livy Server, Livy Server will again send the codes to specific Spark cluster to run.

Here is a diagram to show the architecture of Livy:

The client side will create a remote Spark cluster in initialization, and submit jobs through REST APIs. Livy Server will unwrap, wrap this job and send through RPC to remote SparkContext, also the code will be executed there. In the meanwhile, the client is blocked to wait for the result of this job backing from same path.

The full code of Spark Pi job using Livy API could be referred to here. Livy also offers Java and Python APIs to write and submit jobs, you could explore the Livy examples to review the details.

Running with Interactive Session

Running the Interactive session is similar to using Spark-shell or Pyspark.  The difference is that this shell is not running locally, instead, it runs in a cluster and transfers the data back and forth through the network.

Livy offers REST APIs related to interactive session. Let’s introduce some of them to get an overview of interactive session APIs.

Create an interactive session

POST /sessions

host = ‘http://localhost:8998’
data = {‘kind’: ‘spark’}
headers = {‘Content-Type’: ‘application/json’}
r = requests.post(host + ‘/sessions’, data=json.dumps(data), headers=headers)
r.json()

{u’state’: u’starting’, u’id’: 0, u’kind’: u’spark’}

This will create an interactive Spark session, and the returned id could be used for further querying. Under the cover, this POST request will bring up a new Spark cluster with a remote Spark interpreter, this remote Spark interpreter is used to receive, execute code snippets and return back the result.

Submit a statement

POST /sessions/{sessionId}/statements

data = {‘code’: ‘sc.parallelize(1 to 10).count()’}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()

{u’output’: None, u’state’: u’running’, u’id’: 0}

This request will submit a code snippet to execute remotely and return a statement id to query the result later after the execution is finished.Get the result of statement

GET /sessions/{sessionId}/statements/{statementId}

statement_url = host + r.headers[‘location’]

r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{u’id’: 0,
 u’output’: {u’data’: {u’text/plain’: u’res0: Long = 10′},
             u’execution_count’: 0,
             u’status’: u’ok’},
 u’state’: u’available’}

The result is returned with JSON structure, the user could parse it to extract the actual result.

A more powerful way to use interactive session is to integrate with Notebook tools like Apache Zeppelin, Hue or Jupyter. Now user could code in a notebook which will offload the code execution to Livy. Here is the architecture diagram:

HDP 2.5 already supports Zeppelin + Livy + Spark. In HDP 2.6 additional improvements and bug fixes are introduced for Zeppelin + Livy, which makes the usability and stability more robust.

Submitting Spark Application

Spark provides a spark-submit command to submit a Spark application. In Livy, Spark also provides an equivalent command to submit Spark application through the REST API. Let’s see how can we transplant a command-line into a POST request.

Here is a command to submit SparkPi application with spark-submit command:

./bin/spark-submit \
 –class org.apache.spark.examples.SparkPi \
 –master yarn \

 –deploy-mode cluster /
 –executor-memory 20G \
 /path/to/examples.jar \
 1000

So in Livy, we should form a JSON structure with the necessary information like spark-submit. Here is the JSON structure:

{

 “className”: “org.apache.spark.examples.SparkPi”,

 “executorMemory”: “20g”,

 “args”: [2000],

 “file”: “/path/to/examples.jar”

}

With the POST request POST /batches user could submit a SparkPi application to Livy server. Livy server will then help to launch this application in the cluster.

You might notice that we didn’t specify master and deploy mode in JSON structure since this is configured in livy.conf. Also, one thing that should be noted that this POST request doesn’t help to upload local jars to cluster, so you should put the dependencies into HDFS in advance, this is the major difference compared to spark-submit. 

Spark2 Support

From Livy 0.3.0 onwards Spark1 and Spark2 are supported out-of-the-box in one build. The user simply needs to point SPARK_HOME to Spark 1 or Spark2 to use. Livy internally will differentiate Spark1 or Spark2. Here is the configuration in livy-env.sh:

export SPARK_HOME=<path-to>/spark-2.1.0-bin-hadoop2.6

Livy also supports different Scala versions, such as Scala 2.10 and 2.11. If a user chooses Spark with default Scala build (like Spark 1.6 + Scala 2.10, Spark 2.0 + Scala 2.11), then Livy will automatically detect the right Scala version and pick the correct jars. If a user requires a different build of Spark, like Spark 2.0 + Scala 2.10, then the user could configure “livy.spark.scalaVersion” to let Livy pick the right jars.

Also from an API level, Livy 0.3.0 already supports the full functionality of Spark2 including SparkSession, SparkSession with Hive-enabled, and so on.

Security

Security is always an important topic, especially in a multi-user environment. Livy supports Kerberos authentication and wire encryption.

  1. REST APIs are backed by SPNEGO authentication, which requires the requested user get authenticated by Kerberos first.
  2. RPCs between Livy Server and Remote SparkContext are encrypted with SASL.
  3. Livy server uses keytab to authenticate itself to Kerberos.

Livy Server also supports user impersonation, which means Livy Server can submit applications on behalf of the user who submits the request This is very important in a multi-user environment and avoids unnecessary permission escalation.

High Availability

As can be seen from the diagram above, when Livy Server fails, all the connected Spark Clusters are also terminated, which means that all the jobs and data will disappear immediately. In HDP 2.6 (Livy 0.3.0) we introduced a session recovery mechanism to make sure Spark cluster is still alive when the server is not available, and after restart the server can connect back to the existing sessions to roll-back to the state before failing. To see the design details and configurations please check here.

To enable this feature manually, user should add below 3 configurations to livy.conf:

livy.server.recovery.mode

Recovery mode of Livy. Possible values:

off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.

recovery: Livy persists session info to the state store. When Livy restarts, it recovers previous sessions from the state store.

 

livy.server.recovery.state-store

Where Livy should store state to for recovery. Possible values:

<empty>: Default. State store disabled.

filesystem: Store state on a file system.

zookeeper: Store state in a Zookeeper instance.

 

livy.server.recovery.state-store.url

For filesystem state store, the path of the state store directory. Please don’t use a filesystem that doesn’t support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.

For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2

Conclusion

With HDP 2.6, Livy has become more stable and feature-ready. The combination of Zeppelin + Livy + Spark has improved a great deal, not only in terms of the feature it supports but also for stability and scalability. Compared to the traditional ways of running Spark jobs, Livy offers a more scalable, secure and integrated way to run Spark. We highly recommend you to take a try.

Leave a Reply

Your email address will not be published. Required fields are marked *