Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics, offering information and knowledge of the Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
August 18, 2015
prev slideNext slide

Fault tolerant Nimbus in Apache Storm

Everyday more and more new devices—smartphones, sensors, wearables, tablets, home appliances—connect together by joining the “Internet of Things.” Cisco predicts that by 2020, there will be 50 billion devices connected to Internet of Things. Naturally, they all will emit streams of data, in short intervals. Obviously, these data streams will have to be stored, will have to be processed, and will have to be analyzed, in real-time.

Apache Storm is the scalable, fault-tolerant realtime distributed processing engine that allows you to the handle massive streams of data in realtime, in parallel, and at scale. Handling massive data streams at scale is one thing, handling in a fault tolerant way is another thing.

In this blog, I examine how Storm’s master component, Nimbus, handles failures by examining its fault tolerant architecture.

Problem Statement

Currently, only a single instance of Nimbus runs in a Storm cluster. Nimbus is stateless, and therefore, in most cases, the failure of Nimbus is transient. If your deployment has Nimbus running under supervision (e.g. using a watchdog process like supervisord), Nimbus can simply be restarted without major consequences. However, in certain scenarios like disk failures, restarting the Nimbus is not an option, because Nimbus can become unavailable. And during Nimbus’ unavailability, the following problems arise:

  • Existing topologies run unaffected because supervisors only talk to Nimbus when new tasks are assigned and Worker processes never talk to Nimbus.
  • No new topologies can be submitted. Existing topologies cannot be killed, deactivated, activated, or rebalanced.
  • If a supervisor node fails, task reassignment is not performed, resulting in performance degradation or topology failures.
  • When Nimbus is restarted, it will delete all existing topologies, as it does not have the topology code available locally. Users are required to resubmit their topologies, which results in downtime.

To solve these problems, you can now  run Nimbus in primary and standby mode, thus ensuring that if the primary fails, one of the nodes in standby will take over. This failover and fault tolerant Nimbus mode architecture have obvious advantages:

  • It increases the overall availability of Nimbus.
  • It allows Nimbus hosts to leave and join the cluster transparently. After joining, a new Nimbus instance will go through the sequence of steps necessary to join the list of nodes that are eligible to become leaders. During failover no topology resubmissions are required.
  • It prevents loss of active topologies.

Fault Tolerant Nimbus Architecture

The architecture has three main parts: Leader election, distributed state storage, and leader discovery.

Leader Election

In order to elect a primary Nimbus, we have a ZooKeeper based leader election mechanism. In particular, we use Curator’s LeaderLatch recipe to perform the leader election. This scheme takes care of keeping the leader status in memory and re-queuing the node for leader lock in case the ZooKeeper connection is intermittently lost. Only an elected leader Nimbus can activate, deactivate, rebalance, kill topologies or perform reassignments for existing topologies. Only Nimbus nodes that have code for all the active topologies will contend for leader lock, and if a node that does not have code for all active topologies is chosen as leader, it will relinquish leadership.

To illustrate how a leader is elected during a failure, let’s assume a) four topologies are running b) three nimbus nodes c) code-replication factor is two d) the “leader nimbus has the code for all the topologies locally and e) each of the two non-leader nodes (nonLeader_1, nonLeader_2) has code for a subset of two topologies.

With these above assumptions, we can describe the Nimbus failover in the following steps:

  1. Leader has catastrophic failure and becomes unavailable.
  2. nonLeader_1 receives a notification from ZooKeeper indicating that it is now the new leader. Before accepting the leadership role, it first checks if it has the code available for all 4 active topologies. It realizes that it only has code for 2 topologies, so it relinquishes the lock and looks up in ZooKeeper to find out from where it can download the code for the missing topologies. This lookup returns entries for the leader Nimbus and nonLeader_2. It will try downloading from both as part of its retry mechanism.
  3. nonLeader_2 code sync thread also realizes that it is missing code for 2 topologies, and follows the same missing topologies discovery process described above.

When this process is complete, at least one of the Nimbuses will have all the code locally, and will become the leader. The following sequence diagram illustrates how leader election and failover work with multiple components.

storm_nimbus_1

Distributed State Storage

Storm preserves most of its state in the ZooKeeper. When a user submits a topology jar, Storm stores it onto Nimbus’ local disk, not on Zookeeper because the jar may be large. This means that when running Nimbus in primary/standby mode (unless we are using a replicated storage for storing all topology jars), there is no way for a Standby node to become Primary.

A possible solution to this problem is to use a replicated storage. To avoid requiring that the users have a replicated storage in their cluster setup, Storm includes a pluggable Storage Interface. Out of the box, storm supports two implementations of replicated storage. One uses each Nimbus host’s local disks, and another uses HDFS as replicated storage.

The following steps describe topology code replication among nimbus hosts:

  • When a client submits a topology, the leader Nimbus calls the code distributor’s upload function, which will create a metafile locally on the leader Nimbus. The leader Nimbus writes new entries in ZooKeeper to notify all non-leaders Nimbus that they should download the newly submitted topology code.
  • The leader Nimbus waits until at least N non-leader nimbus nodes have the code replicated, or a user configurable timeout expires. N is the user configurable replication factor.
  • When a non leader Nimbus receives the notification that a new topology has been submitted, it downloads the meta file from the leader Nimbus, and then it downloads the code binaries by calling the code distributor’s download function with the metafile as input.
  • Once a non-leader node finishes downloading the code, it will write an entry in ZooKeeper to indicate that it is now one of hosts from where it is possible to download the code for this topology, in case the leader nimbus dies.

The leader Nimbus can then proceed with the usual tasks that are part of the submit technology action, e.g. assigning nodes for the newly activated topology and marking the topology as active.

storm_nimbus_2

Leader Discovery

We originally exposed a library that read leader information from ZooKeeper, and all clients used this library for Nimbus discovery. However, ZooKeeper is one of the main bottlenecks when scaling Storm to large clusters with many topologies. Therefore, our goal was to reduce the write load on ZooKeeper as much as possible. Robert Joseph Evans (Thank you!) from Yahoo! pointed out that in our approach each client connection will be considered as an extra write to ZooKeeper, which is undesirable. To overcome this, the Nimbus summary section was added to the existing cluster summary API. Any client can call this API on any Nimbus host to get the list of current Nimbus, and the leader among them. Nimbus hosts still read this information from ZooKeeper, but now they can cache this value for a reasonable amount of time, hence reducing the load on ZooKeeper.

Quick Start Guide

Set the values for the following configuration properties according to your configuration needs, and start Nimbus processes on multiple hosts.

  • codedistributor.class : String representing the fully qualified class name of a class that implements “backtype.storm.codedistributor.ICodeDistributor”. The default is set to “backtype.storm.codedistributor.LocalFileSystemCodeDistributor”. This class leverages local file system to store both meta files and code/configs.
  • min.replication.count : Minimum number of nimbus hosts where the topology code must be replicated before the leader Nimbus can mark the topology as active, and create assignments. The default is 1. For topologies that need high availability, we recommend this value to be set to floor(number_of_nimbus_hosts/2 + 1).
  • max.replication.wait.time.sec : Maximum wait time for the nimbus host replication to achieve the nimbus.min.replication.count. Once this time is elapsed nimbus will go ahead and perform topology activation tasks even if required nimbus.min.replication.count is not achieved. The default is 60 seconds. -1 indicates that Nimbus must wait forever and this is the value that topologies requiring high availability should use.
  • code.sync.freq.secs : running frequency of the Nimbus background thread responsible for syncing code for locally missing topologies. The default is 5 minutes.

Some Storm Facts and Features

Acknowledgements:

            Special thanks to Robert Joseph Evans , Taylor Goetz, Sriharsha and the entire Storm Community, who helped with reviewing the design, code and testing the final feature.

Tags:

Comments

  • Maybe a bit unrelated to the core of this post, but can you please provide some inputs on the potential issues one could face if Nimbus is not running under supervision?

    “If your deployment has Nimbus running under supervision (e.g. using a watchdog process like supervisord), Nimbus can simply be restarted without major consequences.”

    • If nimbus is not running under supervision once the nimbus crashes
      * Your UI will start showing 500
      * None of the storm commands will work like kill, activate, deactivate, rebalance. No new topology can be submitted.
      * If a supervisor node crashes , its work will not be reassigned. Depending on your topology this can mean down time or performance degradation.

  • Leave a Reply

    Your email address will not be published. Required fields are marked *

    If you have specific technical questions, please post them in the Forums

    You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>