Apache Hadoop YARN: Resilience of YARN applications across ResourceManager Restart – Phase 1
This is the second in our series on the motivations and architecture for improvements to the Apache Hadoop YARN’s Resource Manager Restart resiliency. Other in the series are:
Introduction: Phase I – Preserve Application-queues
In the introductory blog, we previewed what RM Restart Phase I entails. In essence, we preserve the application-queue state into a persistent store and reread it upon RM restart, eliminating the need for users to resubmit their applications. Instead, the RM merely restarts them from scratch. This seamless restart of YARN apps in a production environment upon RM restart is imperative—it’s a necessity.
As you may know, YARN in Hadoop 2 is an architectural upgrade of the MapReduce centric compute platform in Hadoop 1.x. The JobTracker of Hadoop 1.x is a single point of failure for the whole cluster. Any time the JobTracker is rebooted, it will essentially bring down the whole cluster. Even though clients persist their job specifications to HDFS, the states of their running jobs are completely lost.
A feature to address this limitation was implemented via HADOOP-3245 to let jobs survive JobTracker restarts, but it never attained production usage because of its instability. The complexity of that feature partly arose from the fact that JobTracker had to track and store too much information – both about the cluster state, as well as the scheduling status within each job.
The state of the art of JobTracker resiliency is thus shown below
The concern about separation of responsibilities is completely resolved with YARN’s distributed architecture ResourceManager. It is now only responsible for recovering application-queues and cluster status. Each application is responsible for persisting and recovering any application-level state. The separated concerns are obvious from the YARN’s control flow below:
YARN’s distributed architecture
As stated before, RM-restart Phase I lets YARN continue functioning across RM restarts. RM may need to restart for various reasons – because of unexpected situations like bugs, hardware failures or because of deliberate and planned down-time during a cluster upgrade. RM restart now happens in a transparent manner. As such, users don’t need to explicitly monitor for such events and then manually resubmit jobs.
There are a number of components and entities that facilitate a seamless RM resilience and restart.
In a persistent store called RMStateStore, the RM persists the application-queue state together with application-attempt state information. When RM restarts, it loads the state back into its memory. Additionally, RM also stores relevant security tokens or keys, so that the application’s previous security state is restored.
Today, the available state-store implementations are
- ZKRMStateStore: A ZooKeeper-based state-store implementation
- FileSystemRMStateStore: A state-store built on top of any FileSystems that are implemented using the Hadoop File-system interface, like for e.g. HDFS.
Below we describe what steps the RM follows to persist and restore an application state using a pluggable state-store. During the life-cycle of an application, RM does the following:
- When an application is submitted or when an application-attempt is started, RM will first write the application/attempt state into the RMStateStore.
- Similarly, when an application/attempt finishes, RM will record the final state of the application/attempt into the RMStateStore akin to write-ahead logging.
And then, specifically in RM restart Phase I, the RM does the following:
- Any previously running applications are instructed to be shutdown or killed.
- RM starts a new attempt of this application by taking the user’s original ApplicationSubmissionContext persisted on state-store and then runs a new instance of the ApplicationMaster.
- Each restart, then, causes a new creation of an application-attempt. Therefore, AM-Max-Retry (the number of times RM can potentially relaunch the ApplicationMaster for any given application) count needs to be configured properly by users in their ApplicationSubmissionContext to allow the creation of a new attempt after RM restarts.
In the future, after RM restart Phase II, no new application-attempt will be created – the earlier running ApplicationMaster will just resync with the newly started RM and will resume its work.
When RM restarts, the clients are affected. In the following discourse, we attempt to explain how to handle them.
RMProxy – a library to connect to a ResourceManager
During a RM downtime, all the entities that were communicating with the RM – including ApplicationMaster, NodeManagers and the users’ clients – should be able to wait and retry until RM comes back up. We have written a library – RMProxy – that provides such an abstraction to wrap retry implementation. The retry behavior is configurable. But fundamentally, there is a wait interval between each retry, followed by more retries with a limit on the maximum wait-time. User applications can also take advantage of this proxy to communicate with RM instead of writing their own retry implementation.
The java client libraries YarnClient and AMRMClient already are modified to use the same proxy utility. So applications using these client-libraries get the above retry-functionality without any code changes!
When an RM is down, all the NodeManagers (NM) in the cluster wait till the RM comes back up. Upon RM’s restart, NMs are notified to reinitialize via a resync command from RM. In Phase I, NodeManagers handle this command by killing all the running containers (no need to preserve work) and re-register with RM. From the perspective of the newly started RM, these re-registered NMs will be treated just like newly joining NMs.
This is a chief reason why, in Phase I, running work of all the existing application is lost after RM restarts. In Phase II, NMs will instead not kill any running containers and simply report back the current container-statuses upon re-register with RM. This avoids the trouble of losing work, the overhead of launching new containers, reducing potential impact and observable losses in performance when you have a large cluster with many containers.
Similarly to NMs, during the downtime of RM, ApplicationMasters (AM)—using the client-libraries or the RMProxy described above—will spin and retry until RM comes up. After RM is back to running as usual, all existing AMs will receive a resnyc command. The AMs today are expected to shutdown immediately when they receive a resync command.
Since an AM also runs on one of the containers managed by NM, it is also possible that, before an AM has a chance to receive the resync command, it is forcefully killed by NM’s kill signal while NM is resyncing with RM. In Phase I, users should be aware about this scenario that AM cannot totally rely on the resync command to perform RM restart handling logic. RM restart may cause the earlier running AM to be forcefully killed.
To enable Phase 1 RM Restart, one can make the following config changes:
yarn.resourcemanager.recovery.enabled: The flag to enable/disable this feature. Defaults to false by default. If this configuration property is set to true, RM will enable the RM-restart functionality.
- yarn.resourcemanager.store.class: Specifies the choice of the underlying state-store implementation for storing application and application-attempt state and other credential information to enable restart in a secure environment. The available state-store implementations are
- org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore—a ZooKeeper-based state-store implementation describe above and
- org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore – a state-store implementation persisting state on to any FileSystem like HDFS
- yarn.resourcemanager.am.max-attempts: This configuration has an impact beyond the RM-restart functionality. It specifies the upper limit on number of application-attempts any application may have. Each application can specify its own individual limit on the number of application-attempts via the submission API ApplicationSubmissionContext.setMaxAppAttempts(int maxAppAttempts) to be a value greater than or equal to one, but the individual preference cannot be more than the global upper bound defined by this configuration property.
It has specific implications w.r.t RM-restart. As described earlier, in the Phase I implementation, every time RM restarts, it will kill the previously running application’s attempt (or more specifically the AM) and then will create a new attempt to re-kick the previously running application. As such, every RM restart will increase the attempt count for a running application by 1.
For MapReduce applications, users can set ‘mapreduce.am.max-attempts’ to be greater than 1 in their configuration.
Configurations for ZKRMStateStore only
- yarn.resourcemanager.zk.state-store.address: Comma separated list of Host:Port pairs, each corresponding to a server in ZooKeeper cluster where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class. For example: 127.0.0.1:2181
- yarn.resourcemanager.zk.state-store.parent-path: Full path of the ZooKeeper znode where RM state will be stored.
- yarn.resourcemanager.zk.state-store.timeout.ms: Session timeout in milliseconds for ZooKeeper-client running inside the RM while connecting to ZooKeeper. This configuration is used by the ZooKeeper server to determine when a client session expires. Session expiration happens when the server does not hear from the client (i.e. no heartbeat) within the session timeout period specified by this configuration. Default value is 10 seconds.
- yarn.resourcemanager.zk.state-store.num-retries: Number of times the ZooKeeper-client running inside the ZKRMStateStore tries to connect to ZooKeeper in case of connection timeouts. Default value is 500.
- yarn.resourcemanager.zk-retry-interval-ms: The interval in milliseconds between each retry of the ZooKeeper client running inside the ZKRMStateStore when connecting to a ZooKeeper server. Default value is 2 seconds.
- yarn.resourcemanager.zk.state-store.acl: ACL’s to be used for ZooKeeper znodes so that only the RM can read and write to the corresponding zk-node structure.
Configurations for FileSystemStateStore only
- yarn.resourcemanager.fs.state-store.uri: URI pointing to the location of the FileSystem path where RM state should be stored. This must be set on the RM when FileSystemRMStateStore is configured to the state-store. An example of setting this for HDFS is hdfs://localhost:9000/rmstore which is writable and readable only by the RM.
If you point this to local file system like file:///tmp/yarn/rmstate, RM will choose the local file system as underlying persistent store. Note that, as discussed before, this will not be a useful setup for high-availability.
Conclusion and future-work
In this post, we have given an overview of the RM restart mechanism, how it preserves application queues as part of the Phase I implementation, how user-land applications or frameworks are impacted, and the related configurations. RM-restart is a critical feature that allows YARN to be able to continue functioning without being the choke point of failure.
The Hadoop community has also validated the integration of ecosystem projects beyond MapReduce, like Apache Hive, Apache Pig, Apache Oozie, etc. such that user queries, scripts or workflows continue to run without interruption in the event of a RM restart.
RM restart Phase I sets up the groundwork for RM restart Phase II (work-preserving restart) and RM High Availability, which we’ll explore in subsequent posts shortly.