As organizations continue to ramp the number of MapReduce jobs processed in their Hadoop clusters, we often get questions about how best to share clusters. I wanted to take the opportunity to explain the role of Capacity Scheduler, including covering a few common use cases.
Let me start by stating the underlying challenge that led to the development of Capacity Scheduler and similar approaches.
As organizations become more savvy with Apache Hadoop MapReduce and as their deployments mature, there is a significant pull towards consolidation of Hadoop clusters into a small number of decently sized, shared clusters. This is driven by the urge to consolidate data in HDFS, allow ever-larger processing via MapReduce and reduce operational costs & complexity of managing multiple small clusters. It is quite common today for multiple sub-organizations within a single parent organization to pool together Hadoop/IT budgets to deploy and manage shared Hadoop clusters.
Initially, Apache Hadoop MapReduce supported a simple first-in-first-out (FIFO) job scheduler that was insufficient to address the above use case.
Enter the Capacity Scheduler.
The Capacity Scheduler was designed to allow organizations to share Hadoop clusters in a predictable and simple manner – primarily using the very common notion of ‘job queues’. It provides capacity guarantees for queues while providing elasticity for queues’ cluster utilization in the sense that unused capacity of a queue can be harnessed by overloaded queues that have a lot of temporal demand. This results in significantly higher cluster utilization while still providing predictability for Hadoop workloads.
For example, if you set up 5 queues, each queue would then have 20% of the total capacity for processing jobs. You can define the queue for which each job is assigned.
For providing the necessary elasticity, the Capacity Scheduler allocates free resources to any queue beyond its guaranteed capacity. These excess resources can be reclaimed as necessary and assigned to other queues in order to meet capacity guarantees.
Also, Capacity Scheduler supports the notion of queue and per-user limits within the queue.
Admins can set up ‘maximum capacity’ for a queue, which provides an upper bound on the elasticity of the queue i.e. the resources it can claim from other idle queues beyond it’s guaranteed capacity.
Each queue enforces a limit on the percentage of resources that a given user can access if multiple users are accessing the queue at the same time. The user-limits are dynamic, and the actual limits depend upon the active users and their demand.
Furthermore, to further aid stability and reliability of Hadoop MapReduce clusters, the Capacity Scheduler has other limits such as those on the number of accepted/active jobs per queue, the number of pending tasks per queue, the number of accepted/active jobs per user, the number of pending tasks per user, etc.
All of these limits help ensure that a single job, user or queue cannot adversely harm a MapReduce cluster resulting in catastrophic failures.
Finally, the Capacity Scheduler also supports the notion of ‘high resource’ applications such that MapReduce job can ask for multiple-slots for every map or reduce task. This is a very useful feature for resource-heavy applications. The resources consumed by the ‘high resource’ jobs are naturally accounted for against the queue capacity. Also, it’s useful to remember that the Capacity Scheduler has mechanisms to strictly enforce the resource limits given to every map or reduce task.
Setting up user limits is an important concept to understand when using Capacity Scheduler. Here is a working example:
Let’s say that a given queue is configured to be shared amongst 5 users, with each user being given an equal 20% share.
Initially, if only a single job from a single user (user A) is active in the queue, the entire capacity of the queue will be consumed by user A’s job.
If another user (user B) creates a job, user A and user B are both given 50% of the capacity of the queue. If one of the user’s doesn’t require the full capacity to run their job(s), Capacity Scheduler automatically assigns more capacity to the other user that requires the additional capacity.
If users A and B continue to submit jobs, the behaviors don’t change.
If user C begins to submit jobs, each user’s share becomes 33.3% of the queue capacity (again, assuming there is sufficient demand from the user’s jobs). This goes on until all 5 users get their 20% share each.
Now, if a 6th user enters the picture, things change a bit. Instead of further dividing capacity, Capacity Scheduler instead attempts to complete jobs from the original 5 users. This logic was added so that organizations can prevent large volumes of users from essentially dividing up resources across too many jobs, which would spread resources too thin (e.g. local storage) to complete tasks in an acceptable time frame.
On the other hand, as the users’ applications wind down, the other users start to reclaim the share. For e.g. if 3 users A, B, C are using 33% and user B’s jobs complete, both A and C can now get 50% of the queue capacity.
Furthermore, if one of the existing active users have applications which do not need additional resources in the cluster, for e.g. at the tail end of the job, the other users get larger share of the queue.
One of the keys to achieving stability in MapReduce clusters is to understand that even though job tasks may not be running, they can still consume resources in the cluster. A very common use case is map-tasks outputs – i.e. even after a map task completes, it is still consuming resources to store its map outputs.
For example, if a large job consumes tens or hundreds of TB and is sharing capacity with other jobs for a long period of time (e.g. 12 hours), the cluster could have a lot of disk space tied up to map outputs. This is one of the main reasons that Capacity Scheduler attempts to finish individual jobs with a limited amount of sharing, thereby maximizing throughput and improving cluster stability.
Note that the discussion of map outputs doesn’t change if we store them locally on individual disks (current Hadoop MapReduce) or on HDFS with replica of 1, since storage consumed is still the same. Also, having an HDFS replica of map outputs >1 is not advised because it is more expensive both in terms of performance and storage. It is simply much better to re-run individual maps on failure rather than having to pay the expense on each and every job.
Capacity Scheduler supports that notion of resource specifications for individual jobs – i.e. Capacity Scheduler is not limited to a one-slot-per-task paradigm. A user can request for multiple slots for each task by asking for more resources per task (specifically, memory). This is important when you have several users each running resource-intensive jobs. With this feature, the job and queue get “charged” against the queue for using more resources per task.
Another important feature is the extensive stabilization of Capacity Scheduler over the past several years. Given our team’s experience running Capacity Scheduler on very large, shared clusters, we have continually enhanced it to ensure that single jobs or single users cannot overwhelm a cluster either maliciously or because of a user bug. This is especially important because JobTracker is a single point of control. For example, in days past, we had a user error in which a crontab submitted the same job every second instead of every day. This overwhelmed the cluster with tens of thousands of jobs that were initialized, although not necessarily run. Still, initializing the jobs was consuming a large amount of JobTracker memory. Capacity Scheduler now has limits for initialized jobs/tasks, running jobs/tasks, etc. in order to prevent these and other potentially damaging scenarios.
If you find that you are continually growing your Hadoop cluster and adding new users, I strongly suggest that you consider using Capacity Scheduler.
~ Arun C. Murthy