Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
July 26, 2012
prev slideNext slide

Understanding Apache Hadoop’s Capacity Scheduler

As organizations continue to ramp the number of MapReduce jobs processed in their Hadoop clusters, we often get questions about how best to share clusters. I wanted to take the opportunity to explain the role of Capacity Scheduler, including covering a few common use cases.

Let me start by stating the underlying challenge that led to the development of Capacity Scheduler and similar approaches.

As organizations become more savvy with Apache Hadoop MapReduce and as their deployments mature, there is a significant pull towards consolidation of Hadoop clusters into a small number of decently sized, shared clusters. This is driven by the urge to consolidate data in HDFS, allow ever-larger processing via MapReduce and reduce operational costs & complexity of managing multiple small clusters. It is quite common today for multiple sub-organizations within a single parent organization to pool together Hadoop/IT budgets to deploy and manage shared Hadoop clusters.

Initially, Apache Hadoop MapReduce supported a simple first-in-first-out (FIFO) job scheduler that was insufficient to address the above use case.

Enter the Capacity Scheduler.

Overview of Capacity Scheduler

The Capacity Scheduler was designed to allow organizations to share Hadoop clusters in a predictable and simple manner – primarily using the very common notion of ‘job queues’. It provides capacity guarantees for queues while providing elasticity for queues’ cluster utilization in the sense that unused capacity of a queue can be harnessed by overloaded queues that have a lot of temporal demand. This results in significantly higher cluster utilization while still providing predictability for Hadoop workloads.

For example, if you set up 5 queues, each queue would then have 20% of the total capacity for processing jobs. You can define the queue for which each job is assigned.

For providing the necessary elasticity, the Capacity Scheduler allocates free resources to any queue beyond its guaranteed capacity. These excess resources can be reclaimed as necessary and assigned to other queues in order to meet capacity guarantees.

Also, Capacity Scheduler supports the notion of queue and per-user limits within the queue.

Admins can set up ‘maximum capacity’ for a queue, which provides an upper bound on the elasticity of the queue i.e. the resources it can claim from other idle queues beyond it’s guaranteed capacity.

Each queue enforces a limit on the percentage of resources that a given user can access if multiple users are accessing the queue at the same time. The user-limits are dynamic, and the actual limits depend upon the active users and their demand.

Furthermore, to further aid stability and reliability of Hadoop MapReduce clusters, the Capacity Scheduler has other limits such as those on the number of accepted/active jobs per queue, the number of pending tasks per queue, the number of accepted/active jobs per user, the number of pending tasks per user, etc.

All of these limits help ensure that a single job, user or queue cannot adversely harm a MapReduce cluster resulting in catastrophic failures.

Finally, the Capacity Scheduler also supports the notion of ‘high resource’ applications such that MapReduce job can ask for multiple-slots for every map or reduce task. This is a very useful feature for resource-heavy applications. The resources consumed by the ‘high resource’ jobs are naturally accounted for against the queue capacity. Also, it’s useful to remember that the Capacity Scheduler has mechanisms to strictly enforce the resource limits given to every map or reduce task.

Understanding Capacity Scheduler’s User Limits

Setting up user limits is an important concept to understand when using Capacity Scheduler. Here is a working example:

Let’s say that a given queue is configured to be shared amongst 5 users, with each user being given an equal 20% share.

Initially, if only a single job from a single user (user A) is active in the queue, the entire capacity of the queue will be consumed by user A’s job.

If another user (user B) creates a job, user A and user B are both given 50% of the capacity of the queue. If one of the user’s doesn’t require the full capacity to run their job(s), Capacity Scheduler automatically assigns more capacity to the other user that requires the additional capacity.

If users A and B continue to submit jobs, the behaviors don’t change.

If user C begins to submit jobs, each user’s share becomes 33.3% of the queue capacity (again, assuming there is sufficient demand from the user’s jobs). This goes on until all 5 users get their 20% share each.

Now, if a 6th user enters the picture, things change a bit. Instead of further dividing capacity, Capacity Scheduler instead attempts to complete jobs from the original 5 users. This logic was added so that organizations can prevent large volumes of users from essentially dividing up resources across too many jobs, which would spread resources too thin (e.g. local storage) to complete tasks in an acceptable time frame.

On the other hand, as the users’ applications wind down, the other users start to reclaim the share. For e.g. if 3 users A, B, C are using 33% and user B’s jobs complete, both A and C can now get 50% of the queue capacity.

Furthermore, if one of the existing active users have applications which do not need additional resources in the cluster, for e.g. at the tail end of the job, the other users get larger share of the queue.

Understanding the Resource Utilization of MapReduce Jobs & Its Impact on Sharing

One of the keys to achieving stability in MapReduce clusters is to understand that even though job tasks may not be running, they can still consume resources in the cluster. A very common use case is map-tasks outputs – i.e. even after a map task completes, it is still consuming resources to store its map outputs.

For example, if a large job consumes tens or hundreds of TB and is sharing capacity with other jobs for a long period of time (e.g. 12 hours), the cluster could have a lot of disk space tied up to map outputs. This is one of the main reasons that Capacity Scheduler attempts to finish individual jobs with a limited amount of sharing, thereby maximizing throughput and improving cluster stability.

Note that the discussion of map outputs doesn’t change if we store them locally on individual disks (current Hadoop MapReduce) or on HDFS with replica of 1, since storage consumed is still the same. Also, having an HDFS replica of map outputs >1 is not advised because it is more expensive both in terms of performance and storage. It is simply much better to re-run individual maps on failure rather than having to pay the expense on each and every job.

Other Features of Capacity Scheduler

Capacity Scheduler supports that notion of resource specifications for individual jobs – i.e. Capacity Scheduler is not limited to a one-slot-per-task paradigm. A user can request for multiple slots for each task by asking for more resources per task (specifically, memory). This is important when you have several users each running resource-intensive jobs. With this feature, the job and queue get “charged” against the queue for using more resources per task.

Another important feature is the extensive stabilization of Capacity Scheduler over the past several years. Given our team’s experience running Capacity Scheduler on very large, shared clusters, we have continually enhanced it to ensure that single jobs or single users cannot overwhelm a cluster either maliciously or because of a user bug. This is especially important because JobTracker is a single point of control. For example, in days past, we had a user error in which a crontab submitted the same job every second instead of every day. This overwhelmed the cluster with tens of thousands of jobs that were initialized, although not necessarily run. Still, initializing the jobs was consuming a large amount of JobTracker memory. Capacity Scheduler now has limits for initialized jobs/tasks, running jobs/tasks, etc. in order to prevent these and other potentially damaging scenarios.

If you find that you are continually growing your Hadoop cluster and adding new users, I strongly suggest that you consider using Capacity Scheduler.

~ Arun C. Murthy



Asaf says:

What is the definition of cluster capacity? How the capacity scheduler calculate a cluster capacity?

Michael Noll says:

Hi Arun, thanks for your informative blog post.

One question: Is there any plan to re-add a preemption feature back to the CapacityScheduler? The original pre-emption support was removed in HADOOP-5726 [1] in 2009. I am asking because in the current implementation (Hadoop 1.x) a “broken” job — i.e. a job that does not actively hand back its resources — can still stall a cluster under certain (but realistic) circumstances. Hadoop as of 1.x cannot actively take back resources from such misbehaving jobs.

Or is a more practical solution to simply upgrade to Hadoop 2.x with YARN once 2.x is considered stable?

Best wishes,


Arun C. Murthy says:

Hey Michael – great to run into you here!

Currently I don’t have plans to add preemption back to CS in hadoop-1.x, definitely plan to add it to hadoop-2.x (YARN).

OTOH, I’ll point out that the CS has a stringent set of limits per job/user/queue such that a ‘broken job’ can only cause minimal damage and, as a result, preemption isn’t very critical – speaking from experience running CS on the largest Hadoop clusters in the world! *smile*


Sujitha says:

I tried to set up 2 queues in capacity scheduler.
I added the names of these queues to the mapred-site.xml


I configured CapacityScheduler.xml as shown below.

Maximum number of jobs in the system which can be initialized,
concurrently, by the CapacityScheduler.

























The bin/ starts the following services.
17083 DataNode
17557 TaskTracker
17373 JobTracker
16902 NameNode
17279 SecondaryNameNode
17703 Jps

But after a few seconds the jobtracker and tasktracker shutsdown. What might be the solution.

venkata sridhar says:
Your comment is awaiting moderation.

The sum of capacities of 2 queues should not exceed 100. The total capacity you assign to the queues should sum up to 100.
In your configuration each queue configured with 100%.
This might be the problem.

Sujitha says:

I am research and working on Hadoop.

I tried to configure two queues in Capacity Scheduler [Hadoop].

After a few seconds the jobtracker and tasktracker shuts down.

Can you Help?

Stephen Campbell says:
Your comment is awaiting moderation.

Thanks for the article; very informative. The capacity scheduler seems to focus on memory and (soon) CPU. My question relates to throughput; if you have a 100 node cluster and a hierarchy whereby 20% of the cluster capacity is shared between (say) 5 users and they all submit jobs, how does the scheduler arbitrate disk I/O across these 5 users ? Does it have any visibility of hos much I/O specific users or applications may be consuming from time to time ?

Stephen Campbell says:
Your comment is awaiting moderation.

One more question. If you want to use HDFS federation to increase the level of isolate (say by having specific namespace for Dev and a separate namespace for Prod) can a single set of CS settings be applied across the entire cluster ? i.e. could you allocate 20% of cluster resources to Dev namespace ?



Ashok Agarwal says:

Hi Arun,

The blog is really helpful.

I read that it says it will share same cluster between departments/Business Units of same organizations (specifically it says more sharing of data of one organization).

I was thinking of sharing same cluster with 2 or more organizations in an isolated manner. Can it be possible with the current Hadoop 2.4.1. Any pointers will be very helpful.


Dipak Surya says:
Your comment is awaiting moderation.

i want step to configure the capacity scheduler or any other scheduler

Vignesh says:
Your comment is awaiting moderation.

Sujitha since you have created two queues i think the capacity of the total resources will also be divided i.e 50 for queue 1 and 50 for queue 2 and then since ur user limit percent is 100 only on job can run which will use the entire capacity of the queue i.e 50 which means 100% of resources are used..maybe you can try and change the the capacity of both the queues as 50-50 or 30-20

Facility scheduling software says:

Well, its nice article.Thanks for sharing it.

Facility Scheduling Software says:

Thanks for sharing vital information for us. If any one wants to get more info please visit:

Facility scheduling software says:

Hi,  nice post thanks for sharing wonderful info. If any one wants to get more info please visit:

Arshadullah Khan says:

Great blog post.

Leave a Reply

Your email address will not be published. Required fields are marked *