Everyday more and more new devices—smartphones, sensors, wearables, tablets, home appliances—connect together by joining the “Internet of Things.” Cisco predicts that by 2020, there will be 50 billion devices connected to Internet of Things. Naturally, they all will emit streams of data, in short intervals. Obviously, these data streams will have to be stored, will have to be processed, and will have to be analyzed, in real-time.
Apache Storm is the scalable, fault-tolerant realtime distributed processing engine that allows you to the handle massive streams of data in realtime, in parallel, and at scale. Handling massive data streams at scale is one thing, handling in a fault tolerant way is another thing.
In this blog, I examine how Storm’s master component, Nimbus, handles failures by examining its fault tolerant architecture.
Currently, only a single instance of Nimbus runs in a Storm cluster. Nimbus is stateless, and therefore, in most cases, the failure of Nimbus is transient. If your deployment has Nimbus running under supervision (e.g. using a watchdog process like supervisord), Nimbus can simply be restarted without major consequences. However, in certain scenarios like disk failures, restarting the Nimbus is not an option, because Nimbus can become unavailable. And during Nimbus’ unavailability, the following problems arise:
To solve these problems, you can now run Nimbus in primary and standby mode, thus ensuring that if the primary fails, one of the nodes in standby will take over. This failover and fault tolerant Nimbus mode architecture have obvious advantages:
The architecture has three main parts: Leader election, distributed state storage, and leader discovery.
In order to elect a primary Nimbus, we have a ZooKeeper based leader election mechanism. In particular, we use Curator’s LeaderLatch recipe to perform the leader election. This scheme takes care of keeping the leader status in memory and re-queuing the node for leader lock in case the ZooKeeper connection is intermittently lost. Only an elected leader Nimbus can activate, deactivate, rebalance, kill topologies or perform reassignments for existing topologies. Only Nimbus nodes that have code for all the active topologies will contend for leader lock, and if a node that does not have code for all active topologies is chosen as leader, it will relinquish leadership.
To illustrate how a leader is elected during a failure, let’s assume a) four topologies are running b) three nimbus nodes c) code-replication factor is two d) the “leader nimbus has the code for all the topologies locally and e) each of the two non-leader nodes (nonLeader_1, nonLeader_2) has code for a subset of two topologies.
With these above assumptions, we can describe the Nimbus failover in the following steps:
When this process is complete, at least one of the Nimbuses will have all the code locally, and will become the leader. The following sequence diagram illustrates how leader election and failover work with multiple components.
Storm preserves most of its state in the ZooKeeper. When a user submits a topology jar, Storm stores it onto Nimbus’ local disk, not on Zookeeper because the jar may be large. This means that when running Nimbus in primary/standby mode (unless we are using a replicated storage for storing all topology jars), there is no way for a Standby node to become Primary.
A possible solution to this problem is to use a replicated storage. To avoid requiring that the users have a replicated storage in their cluster setup, Storm includes a pluggable Storage Interface. Out of the box, storm supports two implementations of replicated storage. One uses each Nimbus host’s local disks, and another uses HDFS as replicated storage.
The following steps describe topology code replication among nimbus hosts:
The leader Nimbus can then proceed with the usual tasks that are part of the submit technology action, e.g. assigning nodes for the newly activated topology and marking the topology as active.
We originally exposed a library that read leader information from ZooKeeper, and all clients used this library for Nimbus discovery. However, ZooKeeper is one of the main bottlenecks when scaling Storm to large clusters with many topologies. Therefore, our goal was to reduce the write load on ZooKeeper as much as possible. Robert Joseph Evans (Thank you!) from Yahoo! pointed out that in our approach each client connection will be considered as an extra write to ZooKeeper, which is undesirable. To overcome this, the Nimbus summary section was added to the existing cluster summary API. Any client can call this API on any Nimbus host to get the list of current Nimbus, and the leader among them. Nimbus hosts still read this information from ZooKeeper, but now they can cache this value for a reasonable amount of time, hence reducing the load on ZooKeeper.
Set the values for the following configuration properties according to your configuration needs, and start Nimbus processes on multiple hosts.