Understanding Apache Hadoop’s Capacity Scheduler

As organizations continue to ramp the number of MapReduce jobs processed in their Hadoop clusters, we often get questions about how best to share clusters. I wanted to take the opportunity to explain the role of Capacity Scheduler, including covering a few common use cases.

Let me start by stating the underlying challenge that led to the development of Capacity Scheduler and similar approaches.

As organizations become more savvy with Apache Hadoop MapReduce and as their deployments mature, there is a significant pull towards consolidation of Hadoop clusters into a small number of decently sized, shared clusters. This is driven by the urge to consolidate data in HDFS, allow ever-larger processing via MapReduce and reduce operational costs & complexity of managing multiple small clusters. It is quite common today for multiple sub-organizations within a single parent organization to pool together Hadoop/IT budgets to deploy and manage shared Hadoop clusters.

Initially, Apache Hadoop MapReduce supported a simple first-in-first-out (FIFO) job scheduler that was insufficient to address the above use case.

Enter the Capacity Scheduler.

Overview of Capacity Scheduler

The Capacity Scheduler was designed to allow organizations to share Hadoop clusters in a predictable and simple manner – primarily using the very common notion of ‘job queues’. It provides capacity guarantees for queues while providing elasticity for queues’ cluster utilization in the sense that unused capacity of a queue can be harnessed by overloaded queues that have a lot of temporal demand. This results in significantly higher cluster utilization while still providing predictability for Hadoop workloads.

For example, if you set up 5 queues, each queue would then have 20% of the total capacity for processing jobs. You can define the queue for which each job is assigned.

For providing the necessary elasticity, the Capacity Scheduler allocates free resources to any queue beyond its guaranteed capacity. These excess resources can be reclaimed as necessary and assigned to other queues in order to meet capacity guarantees.

Also, Capacity Scheduler supports the notion of queue and per-user limits within the queue.

Admins can set up ‘maximum capacity’ for a queue, which provides an upper bound on the elasticity of the queue i.e. the resources it can claim from other idle queues beyond it’s guaranteed capacity.

Each queue enforces a limit on the percentage of resources that a given user can access if multiple users are accessing the queue at the same time. The user-limits are dynamic, and the actual limits depend upon the active users and their demand.

Furthermore, to further aid stability and reliability of Hadoop MapReduce clusters, the Capacity Scheduler has other limits such as those on the number of accepted/active jobs per queue, the number of pending tasks per queue, the number of accepted/active jobs per user, the number of pending tasks per user, etc.

All of these limits help ensure that a single job, user or queue cannot adversely harm a MapReduce cluster resulting in catastrophic failures.

Finally, the Capacity Scheduler also supports the notion of ‘high resource’ applications such that MapReduce job can ask for multiple-slots for every map or reduce task. This is a very useful feature for resource-heavy applications. The resources consumed by the ‘high resource’ jobs are naturally accounted for against the queue capacity. Also, it’s useful to remember that the Capacity Scheduler has mechanisms to strictly enforce the resource limits given to every map or reduce task.

Understanding Capacity Scheduler’s User Limits

Setting up user limits is an important concept to understand when using Capacity Scheduler. Here is a working example:

Let’s say that a given queue is configured to be shared amongst 5 users, with each user being given an equal 20% share.

Initially, if only a single job from a single user (user A) is active in the queue, the entire capacity of the queue will be consumed by user A’s job.

If another user (user B) creates a job, user A and user B are both given 50% of the capacity of the queue. If one of the user’s doesn’t require the full capacity to run their job(s), Capacity Scheduler automatically assigns more capacity to the other user that requires the additional capacity.

If users A and B continue to submit jobs, the behaviors don’t change.

If user C begins to submit jobs, each user’s share becomes 33.3% of the queue capacity (again, assuming there is sufficient demand from the user’s jobs). This goes on until all 5 users get their 20% share each.

Now, if a 6th user enters the picture, things change a bit. Instead of further dividing capacity, Capacity Scheduler instead attempts to complete jobs from the original 5 users. This logic was added so that organizations can prevent large volumes of users from essentially dividing up resources across too many jobs, which would spread resources too thin (e.g. local storage) to complete tasks in an acceptable time frame.

On the other hand, as the users’ applications wind down, the other users start to reclaim the share. For e.g. if 3 users A, B, C are using 33% and user B’s jobs complete, both A and C can now get 50% of the queue capacity.

Furthermore, if one of the existing active users have applications which do not need additional resources in the cluster, for e.g. at the tail end of the job, the other users get larger share of the queue.

Understanding the Resource Utilization of MapReduce Jobs & Its Impact on Sharing

One of the keys to achieving stability in MapReduce clusters is to understand that even though job tasks may not be running, they can still consume resources in the cluster. A very common use case is map-tasks outputs – i.e. even after a map task completes, it is still consuming resources to store its map outputs.

For example, if a large job consumes tens or hundreds of TB and is sharing capacity with other jobs for a long period of time (e.g. 12 hours), the cluster could have a lot of disk space tied up to map outputs. This is one of the main reasons that Capacity Scheduler attempts to finish individual jobs with a limited amount of sharing, thereby maximizing throughput and improving cluster stability.

Note that the discussion of map outputs doesn’t change if we store them locally on individual disks (current Hadoop MapReduce) or on HDFS with replica of 1, since storage consumed is still the same. Also, having an HDFS replica of map outputs >1 is not advised because it is more expensive both in terms of performance and storage. It is simply much better to re-run individual maps on failure rather than having to pay the expense on each and every job.

Other Features of Capacity Scheduler

Capacity Scheduler supports that notion of resource specifications for individual jobs – i.e. Capacity Scheduler is not limited to a one-slot-per-task paradigm. A user can request for multiple slots for each task by asking for more resources per task (specifically, memory). This is important when you have several users each running resource-intensive jobs. With this feature, the job and queue get “charged” against the queue for using more resources per task.

Another important feature is the extensive stabilization of Capacity Scheduler over the past several years. Given our team’s experience running Capacity Scheduler on very large, shared clusters, we have continually enhanced it to ensure that single jobs or single users cannot overwhelm a cluster either maliciously or because of a user bug. This is especially important because JobTracker is a single point of control. For example, in days past, we had a user error in which a crontab submitted the same job every second instead of every day. This overwhelmed the cluster with tens of thousands of jobs that were initialized, although not necessarily run. Still, initializing the jobs was consuming a large amount of JobTracker memory. Capacity Scheduler now has limits for initialized jobs/tasks, running jobs/tasks, etc. in order to prevent these and other potentially damaging scenarios.

If you find that you are continually growing your Hadoop cluster and adding new users, I strongly suggest that you consider using Capacity Scheduler.

~ Arun C. Murthy

Categorized by :
Apache Hadoop MapReduce

Comments

Ashok Agarwal
|
August 13, 2014 at 10:09 pm
|

Hi Arun,

The blog is really helpful.

I read that it says it will share same cluster between departments/Business Units of same organizations (specifically it says more sharing of data of one organization).

I was thinking of sharing same cluster with 2 or more organizations in an isolated manner. Can it be possible with the current Hadoop 2.4.1. Any pointers will be very helpful.

Thanks
Ashok

Sujitha
|
January 9, 2013 at 12:17 am
|

I am research and working on Hadoop.

I tried to configure two queues in Capacity Scheduler [Hadoop].

After a few seconds the jobtracker and tasktracker shuts down.

Can you Help?

Sujitha
|
January 3, 2013 at 12:33 am
|

Hi,
I tried to set up 2 queues in capacity scheduler.
I added the names of these queues to the mapred-site.xml

mapred.queue.names
queue1,queue2

I configured CapacityScheduler.xml as shown below.

mapred.capacity-scheduler.maximum-system-jobs
3000
Maximum number of jobs in the system which can be initialized,
concurrently, by the CapacityScheduler.

mapred.capacity-scheduler.queue.queue1.capacity
100

mapred.capacity-scheduler.queue.queue2.capacity
100

mapred.capacity-scheduler.queue.queue1.maximum-capacity
-1

mapred.capacity-scheduler.queue.queue2.maximum-capacity
-1

mapred.capacity-scheduler.queue.queue1.supports-priority
false

mapred.capacity-scheduler.queue.queue2.supports-priority
false

mapred.capacity-scheduler.queue.queue1.minimum-user-limit-percent
100

mapred.capacity-scheduler.queue.queue2.minimum-user-limit-percent
100

mapred.capacity-scheduler.queue.queue1.user-limit-factor
1

mapred.capacity-scheduler.queue.queue2.user-limit-factor
1

mapred.capacity-scheduler.queue.queue1.maximum-initialized-active-tasks
200000

mapred.capacity-scheduler.queue.queue2.maximum-initialized-active-tasks
200000

mapred.capacity-scheduler.queue.queue1.maximum-initialized-active-tasks-per-user
100000

mapred.capacity-scheduler.queue.queue2.maximum-initialized-active-tasks-per-user
100000

mapred.capacity-scheduler.queue.queue1.init-accept-jobs-factor
10

mapred.capacity-scheduler.queue.queue2.init-accept-jobs-factor
10

mapred.capacity-scheduler.default-supports-priority
false

mapred.capacity-scheduler.default-minimum-user-limit-percent
100

mapred.capacity-scheduler.default-user-limit-factor
1

mapred.capacity-scheduler.default-maximum-active-tasks-per-queue
200000

mapred.capacity-scheduler.default-maximum-active-tasks-per-user
100000

mapred.capacity-scheduler.default-init-accept-jobs-factor
10

mapred.capacity-scheduler.init-poll-interval
5000

mapred.capacity-scheduler.init-worker-threads
5

The bin/start-all.sh starts the following services.
17083 DataNode
17557 TaskTracker
17373 JobTracker
16902 NameNode
17279 SecondaryNameNode
17703 Jps

But after a few seconds the jobtracker and tasktracker shutsdown. What might be the solution.

|
July 30, 2012 at 1:02 am
|

Hi Arun, thanks for your informative blog post.

One question: Is there any plan to re-add a preemption feature back to the CapacityScheduler? The original pre-emption support was removed in HADOOP-5726 [1] in 2009. I am asking because in the current implementation (Hadoop 1.x) a “broken” job — i.e. a job that does not actively hand back its resources — can still stall a cluster under certain (but realistic) circumstances. Hadoop as of 1.x cannot actively take back resources from such misbehaving jobs.

Or is a more practical solution to simply upgrade to Hadoop 2.x with YARN once 2.x is considered stable?

Best wishes,
Michael

[1] https://issues.apache.org/jira/browse/HADOOP-5726

    Arun C. Murthy
    |
    August 10, 2012 at 6:18 am
    |

    Hey Michael – great to run into you here!

    Currently I don’t have plans to add preemption back to CS in hadoop-1.x, definitely plan to add it to hadoop-2.x (YARN).

    OTOH, I’ll point out that the CS has a stringent set of limits per job/user/queue such that a ‘broken job’ can only cause minimal damage and, as a result, preemption isn’t very critical – speaking from experience running CS on the largest Hadoop clusters in the world! *smile*

    thanks,
    Arun

|
July 28, 2012 at 5:30 am
|

What is the definition of cluster capacity? How the capacity scheduler calculate a cluster capacity?

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Join the Webinar!

YARN Ready – Using Ambari for Management
Thursday, September 4, 2014
12:00 PM Eastern / 9:00 AM Pacific

More Webinars »

Hortonworks Data Platform
The Hortonworks Data Platform is a 100% open source distribution of Apache Hadoop that is truly enterprise grade having been built, tested and hardened with enterprise rigor.
Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.