Category Archives: MapReduce


The Fastest Path to Innovation: Community Driven Open Source

 

blogpicLast week, we outlined our approach for delivering an enterprise viable Apache Hadoop distribution in the open.  Simply put: we believe the fastest way to innovate is to do our work within the open source community, introduce enterprise feature requirements into that public domain, and to work diligently to progress existing open source projects and incubate new projects to meet those needs.

In support of our approach, this week we’ve announced the submission of two new incubation projects to the Apache Software foundation together with the launch of the “Stinger Initiative”, all aimed at enhancing the security and performance of Hadoop applications.  These efforts focus on enterprise requirements that are essential to enable broad adoption across the Hadoop ecosystem.

  • The Stinger initiative aims to dramatically speed up Apache Hive in support of interactive query use cases.
  • The Knox Gateway addresses the need for a single point of authentication and secure access for Apache Hadoop services in a cluster.
  • The Tez framework provides an alternative next-generation runtime built on Hadoop YARN that significantly improves latency and throughput of Hadoop applications.

We feel these efforts are strong examples of our commitment to driving innovation from within the open source community, and as stated in our approach blog, we do this by::

  • identifying and articulating the enterprise requirements within the community,
  • taking an active role in addressing those requirements within the community, and
  • applying enterprise rigor to the build, test and release process to ensure that the open source projects as well as the larger product distribution we provide is enterprise grade and interoperable with other elements in the enterprise.

Since it takes a community to build enterprise-class platforms like Hadoop, if you have interest in helping with Knox, Tez, or Stinger, we encourage you to work with us and the others in the Apache community!

Introducing… Tez: Accelerating processing of data stored in HDFS

 

MapReduce has served us well.  For years it has been THE processing engine for Hadoop and has been the backbone upon which a huge amount of value has been created.  While it is here to stay, new paradigms are also needed in order to enable Hadoop to serve an even greater number of usage patterns.  A key and emerging example is the need for interactive query, which today is challenged by the batch-oriented nature of MapReduce.  A key step to enabling this new world was Apache YARN and today the community proposes the next step… Tez

What is Tez?

Tez – Hindi for “speed” – (currently under incubation vote within Apache) provides a general-purpose, highly customizable framework that creates simplifies data-processing tasks across both small scale (low-latency) and large-scale (high throughput) workloads in Hadoop. It generalizes the MapReduce paradigm to a more powerful framework by providing the ability to execute a complex DAG (directed acyclic graph) of tasks for a single job so that projects in the Apache Hadoop ecosystem such as Apache Hive, Apache Pig and Cascading can meet requirements for human-interactive response times and extreme throughput at petabyte scale (clearly MapReduce has been a key driver in achieving this).

With the emergence of Apache Hadoop YARN as the basis of next generation data-processing architectures, there is a strong need for an application which can execute a complex DAG of tasks which can then be shared by Apache Pig, Apache Hive, Cascading and others.  The constrained DAG expressible in MapReduce (one set of maps followed by one set of reduces) often results in multiple MapReduce jobs which harm latency for short queries (overhead of launching multiple jobs) and throughput for large-scale queries (too much overhead for materializing intermediate job outputs to the filesystem). With Tez, we introduce a more expressive DAG of tasks, within a single application or job, that is better aligned with the required processing task – thus, for e.g., any given SQL query can be expressed as a single job using Tez.

The below graphic illustrates the advantages provided by Tez for complex SQL queries in Apache Hive or complex Apache Pig scripts.

pighivetez

Tez is critical to the Stinger Initiative and goes a long way in helping Hive support both interactive queries and batch queries. Tez provides a single underlying framework to support both latency and throughput sensitive applications, there-by obviating the need for multiple frameworks and systems to be installed, maintained and supported, a key advantage to enterprises looking to rationalize their data architectures. .

Essentially, Tez is the logical next step for Apache Hadoop after Apache Hadoop YARN. With YARN the community generalized Hadoop MapReduce to provide a general-purpose resource management framework (YARN) where-in MapReduce became merely one of the applications that could process data in your Hadoop cluster. With Tez, we build on YARN and our experience with the MapReduce to provide a more general data-processing application to the benefit of the entire ecosystem i.e. Apache Hive, Apache Pig etc.

What has been completed? Where can Tez go?

An early version of the project has been donated to the ASF as part of the initial code grant to establish the Incubation project.   Through the work done in the Stinger initiative, it is already clear that Tez enables and order of magnitude increase in the performance of Apache Hive.

The community is also designing a re-usable set of libraries of data-processing primitives such as sorting, merging, data-shuffling, intermediate data management etc. which are necessary for Tez and may be used directly by other projects.  This is just the beginning.  It is an extensible architecture that will undoubtedly be contributed to widely.

For the community, by the community

At Hortonworks we believe that innovation happens fastest by working with a community of like-minded individuals to address the requirements for Hadoop without being bounded by artificial boundaries such as employment. As such, even though the Hortonworks MapReduce/Hive/Pig team seeded the project, we’ve had the benefit of both positive feedback and constructive criticism from several leading contributors and committers across the Apache Hadoop MapReduce, Apache Hive & Apache Pig projects.  These inventors and peers are employed at Hortonworks, Yahoo, Facebook, Microsoft, Twitter and many others.  The initial committer list has 22 participants with deep domain expertise in these unique challenges and comprises a who’s who in the Hadoop world.  Of course, now that we are nearly in a position where we can co-develop via the Apache Software Foundation where we have proposed Tez as an Incubator project, we expect a very quick acceleration of project development.

When will it be available?

We plan to donate the code from our internal repository to the ASF as part of the Incubator proposal.  Also, Hortonworks will ship Tez in the next alpha release of Hortonworks Data Platform 2 (HDP2), based on Apache Hadoop 2.0, very soon to showcase some of the very exciting advances we have made for Apache Hive via Project Stinger.

We are very excited by the reception Tez has received so far, and we do hope you can join us in this initiative via the Apache Software Foundation project to make Hadoop better!

Proper Care and Feeding of Drives in a Hadoop Cluster: A Conversation with StackIQ’s Dr. Bruno

In a recent blog post, Hortonworks’ Steve Loughran discussed Apache Hadoop’s preference for JBOD-configured storage vs. the allure of RAID-0. As more enterprises are beginning to move beyond the science experiment stage and begin deploying Hadoop into their production environments, they are learning that Hadoop is quite different than other services in their data centers, such as web, mail, and database servers.They are learning that to achieve optimal performance, you need to pay particular attention to configuring the underlying hardware.

To find out more, we had a chat with Dr. Greg Bruno, VP of Engineering, and co-founder of StackIQ, a Hortonworks partner, about the real life implications of managing hard drives (HDDs) in a modern Hadoop cluster.

Q. Why isn’t it considered good practice to configure drives in Hadoop clusters as RAID-0 disk arrays?

A. Hadoop prefers a set of separate disks to the same set managed as a RAID-0 disk array. Read speeds are particularly important to the performance of a Hadoop cluster, and in his post, Steve makes the point that since drive speeds vary, and RAID-0 reads occur at the speed of the slowest disk in the array, a RAID-0 configuration may well be slower than a non-RAID configuration. The bigger issue, in my opinion, is reliability. If a set of disks is configured as a RAID-0 array, then one disk failure in that array will take that entire volume down, and if all the disks in a node are configured as a single RAID-0 array, then a single disk failure will take all the node’s data down. By configuring multiple disks in a RAID-0 array, you magnify the probability of that volume going offline due to a single disk failure and you maximize the amount of data that goes offline when that single failure occurs.

Q: Modern servers have a lot of disks. What’s the impact of losing a single disk when you have 12 3TB drive in each node?

A:  When a single drive fails when Hadoop is configured in its default state, the ENTIRE NODE gets taken offline. Back when servers typically had 6 x 1.5TB drives in them, losing a single disk would cause the loss of 0.02% of total storage in a typical 10PB, three-replica setup. With today’s hardware — typically 12 x 3TB drives per node, losing a single disk results in the loss of five times as much data.

Q: Aren’t today’s HDDs much more reliable than they used to be? Is it worth the extra work to handle the rare cases when a drive fails?

A: While drives are much more reliable than they used to be, they are still the cause of the lion’s share of support tickets in a Hadoop cluster. In fact, according to Bharath Mundlapudi, a Core Hadoop Engineer while working at Yahoo, disk drive failures account for fully 50% of siteops trouble tickets. That’s more than three times the next highest source of tickets.

Q: What does that represent in real terms?

A: It represents a lot of work for systems administrators. How much depends on the size and age of the cluster in question. For example, Facebook, which has some very large clusters, reports that their failure detection and automated repair system is doing the work of approximately 200 full time system administrators.

Q: OK, but not many organizations have clusters that large. What about a typical enterprise setup?

A: Our experience indicates that a 1,000 node cluster containing 12,000 drives for a total raw storage capacity of 48 peta-bytes can expect about 3 drive failures a day in its third year of operation. Drive failure rates rise as the devices age. For a 500 node cluster, you’re looking at a drive failure every 17 hours or so.

Q: Doesn’t this make it hard for the cluster operator to manage? How do they keep up?

A: Without the right tools and methodology, it is very difficult for cluster operators to manage clusters at scale. They typically have to write scripts to scan the cluster, detect disk failures, and report them. Then, once the offending drive has been replaced, commands must be run for the controller to recognize the new drive, OS commands need to be executed to format the drive, and then some Hadoop commands are required to add the disk back to the configuration.

Q: Presumably it’s not quite as challenging for StackIQ customers?

A: StackIQ’s mission is to make cluster operation as painless as possible, which is why we have developed tools to manage the entire lifecycle of the disk. While we haven’t figured out how to get our software to physically pull a bad drive and replace it with a new one, we automate the rest of it — from the initial deployment of the drive, detecting and reporting the error, and re-integrating the replacement drive into the configuration.

One of the features we’ve developed in StackIQ’s management software automatically configures chassis with LSI MegaRaid controllers into “JBODs”, that is, every disk in the chassis will be configured as an individual device.

In addition, a user can specify which disk they want in the chassis to be the boot disk via an attribute (e.g., “bootdisk0″) and if an optional secondary boot disk attribute is specified (“bootdisk1″), then our code will configure both those disks as a “mirror” (RAID1) while still making all the other non-boot disks available to Hadoop as individual disks.  A recent StackIQ customer made their purchasing decision on this feature alone, as they recently went through the painful exercise of changing a mid-size cluster’s RAID configuration by booting each server, one-by-one, catching a key press at the controller prompt, and fixing the configuration by-hand.  Not a fun exercise when you are under the gun by management to get production cluster online.

Q: With that many drive failures, clusters will be chewing through disks at a brisk rate. That could get expensive. That works out to something like 1000 drives/year X $100/drive = $100k per year just for replacement drives.

A: True, which speaks to the need for software which will make the most efficient use of your resources –  intelligent, automated cluster management software can find faulty drives automatically, and bring up a replacement drive quickly.

Q: Doesn’t automation take control out of the hands of the skilled cluster operators?

A: We believe it should be up to the cluster operator to set policies on how much automation to incorporate into their workflows. Our software reflects that philosophy, letting operators choose from a range of policies that go all the way from having the operator run all the commands manually, all the way to a fully automated repair where all the operator needs to do is push in the new drive and let StackIQ’s software do the rest.

Q: Can’t this be done with a simple command script that runs on all nodes?

A: That might be workable in a homogeneous environment, where all the nodes are the same. But in the real world, different nodes require different configurations. Even the disks are likely configured differently in nodes within the clusters. Handling those variables in a static script would be very difficult to accomplish. For example, if your cluster expands over time, you may be adding chassis with different drive configurations. Static scripts wouldn’t be able to deal with this situation. The StackIQ management software has intimate knowledge of the hardware and software in the cluster, so it knows exactly how to handle each drive in each node across the entire cluster, even in a heterogeneous environment.

Conclusion

So there you have it. The folks behind StackIQ cluster management software agree with Steve Loughran’s recommendation to forego RAID-0 for Hadoop clusters. In fact, they provide the management tools to make it easier to do. So take the advice of our experts, and configure your cluster servers as “Just a Bunch of Disks.”

For more information on StackIQ, please visit their website or follow their Twitter handle (@StackIQ). You can also follow Dr. Greg Bruno directly on his Twitter handle (@itsDrBruno).

~ Lisa Sensmeier

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

HDP Monitor The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. Moreover, for a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play in HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500]
   .name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount
   .cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class)
   .order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Apache Hadoop 2.0.2-alpha Released!

It gives me great pleasure to announce that the Apache Hadoop community has voted to release Apache Hadoop 2.0.2-alpha.

This is the second (alpha) release of the next generation release of Apache Hadoop 2.x and comes with significant enhancements to both the major components of Hadoop:

  • HDFS HA has undergone significant enhancements since the previous release for NameNode High Availability
  • YARN has undergone significant testing and stabilization and validation as is been heavily battle-tested since the previous release.

These are exciting times indeed for the Apache Hadoop community – personally, this is very reminiscent of the period in 2009 when we finally saw the light at the end of the tunnel during the stabilization of Apache Hadoop 1.x (then called Apache Hadoop 0.20.x). A déjà vu, if you will – albeit of the pleasant kind! Yes, we have a few miles to clock, but it feels like the hardest part is already behind us. At the time of release, YARN has already been deployed on super-sized clusters with 2,000 nodes and 3,600 nodes (totaling to nearly 6,000 nodes) at Yahoo alone*.

Going forward, I have no doubt that we are well of our way to sign-off on hadoop-2.x early next year and we are now heads down wrapping up the last of feature work since we have a reasonably stable base, such as:

  • HDFS HA without need for shared storage (already merged into Hadoop trunk sans a couple of design enhancements).
  • YARN ResourceManager availability.
  • YARN scheduling enhancements such as multi-resource scheduling (nearly complete, should be committed soon) and preemption.

Having said that, it’s critical for the developer community to get feedback on hadoop-2.x from the user community to ensure we continue to deliver great software – so, please, do go ahead, download the bits from the Apache Hadoop releases page, try the release and give us your valuable feedback – it’s a personal request! Of course, if you prefer a fully packaged and integrated stack you can browse to the Hortonworks Downloads page to try Hortonworks Data Platform 2.0 Alpha which integrates Hadoop 2.0.2-alpha with other important components such as Apache HBase, Apache Pig, Apache Hive, Apache HCatalog, Apache ZooKeeper and Apache Oozie

For more information about the HDP 2.0 alpha, you can check out our blog post from yesterday.

Acknowledgements
I’d like to thank everyone who has or continues to contribute to Apache Hadoop – everyone in the community. A special mention for Todd Lipcon for his contributions to HDFS HA and the Yahoo Hadoop team (Robert Evans, Thomas Graves, Daryn Sharp, Jason Lowe and everyone else) for their help in getting YARN to stability and large-scale deployments on their clusters.

*Yahoo is currently running hadoop-0.23.4 release which essentially is hadoop-2.0.2-alpha without HDFS high availability.

Apache Hadoop YARN – ResourceManager

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Apache Hadoop YARN – ResourceManager

As previously described, ResourceManager (RM) is the master that arbitrates all the available cluster resources and thus helps manage the distributed applications running on the YARN system. It works together with the per-node NodeManagers (NMs) and the per-application ApplicationMasters (AMs).

  1. NodeManagers take instructions from the ResourceManager and manage resources available on a single node.
  2. ApplicationMasters are responsible for negotiating resources with the ResourceManager and for working with the NodeManagers to start the containers.

Diagram of resource manager components

ResourceManager Components

The ResourceManager has the following components (see the figure above):

  1. Components interfacing RM to the clients:
    • ClientService: The client interface to the Resource Manager. This component handles all the RPC interfaces to the RM from the clients including operations like application submission, application termination, obtaining queue information, cluster statistics etc.
    • AdminService: To make sure that admin requests don’t get starved due to the normal users’ requests and to give the operators’ commands the higher priority, all the admin operations like refreshing node-list, the queues’ configuration etc. are served via this separate interface.
  2. Components connecting RM to the nodes:
    • ResourceTrackerService: This is the component that responds to RPCs from all the nodes. It is responsible for registration of new nodes, rejecting requests from any invalid/decommissioned nodes, obtain node-heartbeats and forward them over to the YarnScheduler. It works closely with NMLivelinessMonitor and NodesListManager described below.
    • NMLivelinessMonitor: To keep track of live nodes and specifically note down the dead nodes, this component keeps track of each node’s its last heartbeat time. Any node that doesn’t heartbeat within a configured interval of time, by default 10 minutes, is deemed dead and is expired by the RM. All the containers currently running on an expired node are marked as dead and no new containers are scheduling on such node.
    • NodesListManager: A collection of valid and excluded nodes. Responsible for reading the host configuration files specified via yarn.resourcemanager.nodes.include-path and yarn.resourcemanager.nodes.exclude-path and seeding the initial list of nodes based on those files. Also keeps track of nodes that are decommissioned as time progresses.
  3. Components interacting with the per-application AMs:
    • ApplicationMasterService: This is the component that responds to RPCs from all the AMs. It is responsible for registration of new AMs, termination/unregister-requests from any finishing AMs, obtaining container-allocation & deallocation requests from all running AMs and forward them over to the YarnScheduler. This works closely with AMLivelinessMonitor described below.
    • AMLivelinessMonitor: To help manage the list of live AMs and dead/non-responding AMs, this component keeps track of each AM and its last heartbeat time. Any AM that doesn’t heartbeat within a configured interval of time, by default 10 minutes, is deemed dead and is expired by the RM. All the containers currently running/allocated to an AM that gets expired are marked as dead. RM schedules the same AM to run on a new container, allowing up to a maximum of 4 such attempts by default.
  4. The core of the ResourceManager – the scheduler and related components:
    • ApplicationsManager: Responsible for maintaining a collection of submitted applications. Also keeps a cache of completed applications so as to serve users’ requests via web UI or command line long after the applications in question finished.
    • ApplicationACLsManager: RM needs to gate the user facing APIs like the client and admin requests to be accessible only to authorized users. This component maintains the ACLs lists per application and enforces them whenever an request like killing an application, viewing an application status is received.
    • ApplicationMasterLauncher: Maintains a thread-pool to launch AMs of newly submitted applications as well as applications whose previous AM attempts exited due to some reason. Also responsible for cleaning up the AM when an application has finished normally or forcefully terminated.
    • YarnScheduler: The Scheduler is responsible for allocating resources to the various running applications subject to constraints of capacities, queues etc. It performs its scheduling function based on the resource requirements of the applications such as memory, CPU, disk, network etc. Currently, only memory is supported and support for CPU is close to completion.
    • ContainerAllocationExpirer: This component is in charge of ensuring that all allocated containers are used by AMs and subsequently launched on the correspond NMs. AMs run as untrusted user code and can potentially hold on to allocations without using them, and as such can cause cluster under-utilization. To address this, ContainerAllocationExpirer maintains the list of allocated containers that are still not used on the corresponding NMs. For any container, if the corresponding NM doesn’t report to the RM that the container has started running within a configured interval of time, by default 10 minutes, the container is deemed as dead and is expired by the RM.
  5. TokenSecretManagers (for security):ResourceManager has a collection of SecretManagers which are charged with managing tokens, secret-keys that are used to authenticate/authorize requests on various RPC interfaces. A future post on YARN security will cover a more detailed descriptions of the tokens, secret-keys and the secret-managers but a brief summary follows:
    • ApplicationTokenSecretManager: To avoid arbitrary processes from sending RM scheduling requests, RM uses the per-application tokens called ApplicationTokens. This component saves each token locally in memory till application finishes and uses it to authenticate any request coming from a valid AM process.
    • ContainerTokenSecretManager: SecretManager for ContainerTokens that are special tokens issued by RM to an AM for a container on a specific node. ContainerTokens are used by AMs to create a connection to the corresponding NM where the container is allocated. This component is RM-specific, keeps track of the underlying master and secret-keys and rolls the keys every so often.
    • RMDelegationTokenSecretManager: A ResourceManager specific delegation-token secret-manager. It is responsible for generating delegation tokens to clients which can be passed on to unauthenticated processes that wish to be able to talk to RM.
  6. DelegationTokenRenewer: In secure mode, RM is Kerberos authenticated and so provides the service of renewing file-system tokens on behalf of the applications. This component renews tokens of submitted applications as long as the application runs and till the tokens can no longer be renewed.

Conclusion

In YARN, the ResourceManager is primarily limited to scheduling i.e. only arbitrating available resources in the system among the competing applications and not concerning itself with per-application state management. Because of this clear separation of responsibilities coupled with the modularity described above, and with the powerful scheduler API discussed in the previous post, RM is able to address the most important design requirements – scalability, support for alternate programming paradigms.

To allow for different policy constraints, the scheduler described above in the RM is pluggable and allows for different algorithms. In a future post of this series, we will dig deeper into various features of CapacityScheduler that schedules containers based on capacity guarantees and queues.

The next post will dive into details of the NodeManager, the component responsible for managing the containers’ life cycle and much more.

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Apache Hadoop YARN – Background and an Overview

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Apache Hadoop YARN – Background & Overview

Celebrating the significant milestone that was Apache Hadoop YARN being promoted to a full-fledged sub-project of Apache Hadoop in the ASF we present the first blog in a multi-part series on Apache Hadoop YARN – a general-purpose, distributed, application management framework that supersedes the classic Apache Hadoop MapReduce framework for processing data in Hadoop clusters.

MapReduce – The Paradigm

Essentially, the MapReduce model consists of a first, embarrassingly parallel, map phase where input data is split into discreet chunks to be processed. It is followed by the second and final reduce phase where the output of the map phase is aggregated to produce the desired result. The simple, and fairly restricted, nature of the programming model lends itself to very efficient and extremely large-scale implementations across thousands of cheap, commodity nodes.

Apache Hadoop MapReduce is the most popular open-source implementation of the MapReduce model.

In particular, when MapReduce is paired with a distributed file-system such as Apache Hadoop HDFS, which can provide very high aggregate I/O bandwidth across a large cluster, the economics of the system are extremely compelling – a key factor in the popularity of Hadoop.

One of the keys to this is the lack of data motion i.e. move compute to data and do not move data to the compute node via the network. Specifically, the MapReduce tasks can be scheduled on the same physical nodes on which data is resident in HDFS, which exposes the underlying storage layout across the cluster. This significantly reduces the network I/O patterns and keeps most of the I/O on the local disk or within the same rack – a core advantage.

Apache Hadoop MapReduce, circa 2011 – A Recap

Apache Hadoop MapReduce is an open-source, Apache Software Foundation project, which is an implementation of the MapReduce programming paradigm described above. Now, as someone who has spent over six years working full-time on Apache Hadoop, I normally like to point out that the Apache Hadoop MapReduce project itself can be broken down into the following major facets:

  • The end-user MapReduce API for programming the desired MapReduce application.
  • The MapReduce framework, which is the runtime implementation of various phases such as the map phase, the sort/shuffle/merge aggregation and the reduce phase.
  • The MapReduce system, which is the backend infrastructure required to run the user’s MapReduce application, manage cluster resources, schedule thousands of concurrent jobs etc.

This separation of concerns has significant benefits, particularly for the end-users – they can completely focus on the application via the API and allow the combination of the MapReduce Framework and the MapReduce System to deal with the ugly details such as resource management, fault-tolerance, scheduling etc.

The current Apache Hadoop MapReduce System is composed of the JobTracker, which is the master, and the per-node slaves called TaskTrackers.

The JobTracker is responsible for resource management (managing the worker nodes i.e. TaskTrackers), tracking resource consumption/availability and also job life-cycle management (scheduling individual tasks of the job, tracking progress, providing fault-tolerance for tasks etc).

The TaskTracker has simple responsibilities – launch/teardown tasks on orders from the JobTracker and provide task-status information to the JobTracker periodically.

For a while, we have understood that the Apache Hadoop MapReduce framework needed an overhaul. In particular, with regards to the JobTracker, we needed to address several aspects regarding scalability, cluster utilization, ability for customers to control upgrades to the stack i.e. customer agility and equally importantly, supporting workloads other than MapReduce itself.

We’ve done running repairs over time, including recent support for JobTracker availability and resiliency to HDFS issues (both of which are available in Hortonworks Data Platform v1 i.e. HDP1) but lately they’ve come at an ever-increasing maintenance cost and yet, did not address core issues such as support for non-MapReduce and customer agility.

Why support non-MapReduce workloads?

MapReduce is great for many applications, but not everything; other programming models better serve requirements such as graph processing (Google Pregel / Apache Giraph) and iterative modeling (MPI). When all the data in the enterprise is already available in Hadoop HDFS having multiple paths for processing is critical.

Furthermore, since MapReduce is essentially batch-oriented, support for real-time and near real-time processing such as stream processing and CEPFresil are emerging requirements from our customer base.

Providing these within Hadoop enables organizations to see an increased return on the Hadoop investments by lowering operational costs for administrators, reducing the need to move data between Hadoop HDFS and other storage systems etc.

Why improve scalability?

Moore’s Law… Essentially, at the same price-point, the processing power available in data-centers continues to increase rapidly. As an example, consider the following definitions of commodity servers:

  • 2009 – 8 cores, 16GB of RAM, 4x1TB disk
  • 2012 – 16+ cores, 48-96GB of RAM, 12x2TB or 12x3TB of disk.

Generally, at the same price-point, servers are twice as capable today as they were 2-3 years ago – on every single dimension.  Apache Hadoop MapReduce is known to scale to production deployments of ~5000 nodes of hardware of 2009 vintage. Thus, ongoing scalability needs are ever present given the above hardware trends.

What are the common scenarios for low cluster utilization?

In the current system, JobTracker views the cluster as composed of nodes (managed by individual TaskTrackers) with distinct map slots and reduce slots, which are not fungible.  Utilization issues occur because maps slots might be ‘full’ while reduce slots are empty (and vice-versa).  Fixing this was necessary to ensure the entire system could be used to its maximum capacity for high utilization.

What is the notion of customer agility?

In real-world deployments, Hadoop is very commonly deployed as a shared, multi-tenant system. As a result, changes to the Hadoop software stack affect a large cross-section if not the entire enterprise. Against that backdrop, customers are very keen on controlling upgrades to the software stack as it has a direct impact on their applications. Thus, allowing multiple, if limited, versions of the MapReduce framework is critical for Hadoop.

Enter Apache Hadoop YARN

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker i.e. resource management and job scheduling/monitoring, into separate daemons: a global ResourceManager and per-application ApplicationMaster (AM).

The ResourceManager and per-node slave, the NodeManager (NM), form the new, and generic, system for managing applications in a distributed manner.

The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The per-application ApplicationMaster is, in effect, a framework specific entity and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the component tasks.

The ResourceManager has a pluggable Scheduler, which is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is a pure scheduler in the sense that it performs no monitoring or tracking of status for the application, offering no guarantees on restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based on the resource requirements of the applications; it does so based on the abstract notion of a Resource Container which incorporates resource elements such as memory, cpu, disk, network etc.

The NodeManager is the per-machine slave, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.

The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. From the system perspective, the ApplicationMaster itself runs as a normal container.

Here is an architectural view of YARN:

One of the crucial implementation details for MapReduce within the new YARN system that I’d like to point out is that we have reused the existing MapReduce framework without any major surgery. This was very important to ensure compatibility for existing MapReduce applications and users. More on this later.

The next post will dive further into the intricacies of the architecture and its benefits such as significantly better scaling, support for multiple data processing frameworks (MapReduce, MPI etc.) and cluster utilization.

 

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Understanding Apache Hadoop’s Capacity Scheduler

As organizations continue to ramp the number of MapReduce jobs processed in their Hadoop clusters, we often get questions about how best to share clusters. I wanted to take the opportunity to explain the role of Capacity Scheduler, including covering a few common use cases.

Let me start by stating the underlying challenge that led to the development of Capacity Scheduler and similar approaches.

As organizations become more savvy with Apache Hadoop MapReduce and as their deployments mature, there is a significant pull towards consolidation of Hadoop clusters into a small number of decently sized, shared clusters. This is driven by the urge to consolidate data in HDFS, allow ever-larger processing via MapReduce and reduce operational costs & complexity of managing multiple small clusters. It is quite common today for multiple sub-organizations within a single parent organization to pool together Hadoop/IT budgets to deploy and manage shared Hadoop clusters.

Initially, Apache Hadoop MapReduce supported a simple first-in-first-out (FIFO) job scheduler that was insufficient to address the above use case.

Enter the Capacity Scheduler.

Overview of Capacity Scheduler

The Capacity Scheduler was designed to allow organizations to share Hadoop clusters in a predictable and simple manner – primarily using the very common notion of ‘job queues’. It provides capacity guarantees for queues while providing elasticity for queues’ cluster utilization in the sense that unused capacity of a queue can be harnessed by overloaded queues that have a lot of temporal demand. This results in significantly higher cluster utilization while still providing predictability for Hadoop workloads.

For example, if you set up 5 queues, each queue would then have 20% of the total capacity for processing jobs. You can define the queue for which each job is assigned.

For providing the necessary elasticity, the Capacity Scheduler allocates free resources to any queue beyond its guaranteed capacity. These excess resources can be reclaimed as necessary and assigned to other queues in order to meet capacity guarantees.

Also, Capacity Scheduler supports the notion of queue and per-user limits within the queue.

Admins can set up ‘maximum capacity’ for a queue, which provides an upper bound on the elasticity of the queue i.e. the resources it can claim from other idle queues beyond it’s guaranteed capacity.

Each queue enforces a limit on the percentage of resources that a given user can access if multiple users are accessing the queue at the same time. The user-limits are dynamic, and the actual limits depend upon the active users and their demand.

Furthermore, to further aid stability and reliability of Hadoop MapReduce clusters, the Capacity Scheduler has other limits such as those on the number of accepted/active jobs per queue, the number of pending tasks per queue, the number of accepted/active jobs per user, the number of pending tasks per user, etc.

All of these limits help ensure that a single job, user or queue cannot adversely harm a MapReduce cluster resulting in catastrophic failures.

Finally, the Capacity Scheduler also supports the notion of ‘high resource’ applications such that MapReduce job can ask for multiple-slots for every map or reduce task. This is a very useful feature for resource-heavy applications. The resources consumed by the ‘high resource’ jobs are naturally accounted for against the queue capacity. Also, it’s useful to remember that the Capacity Scheduler has mechanisms to strictly enforce the resource limits given to every map or reduce task.

Understanding Capacity Scheduler’s User Limits

Setting up user limits is an important concept to understand when using Capacity Scheduler. Here is a working example:

Let’s say that a given queue is configured to be shared amongst 5 users, with each user being given an equal 20% share.

Initially, if only a single job from a single user (user A) is active in the queue, the entire capacity of the queue will be consumed by user A’s job.

If another user (user B) creates a job, user A and user B are both given 50% of the capacity of the queue. If one of the user’s doesn’t require the full capacity to run their job(s), Capacity Scheduler automatically assigns more capacity to the other user that requires the additional capacity.

If users A and B continue to submit jobs, the behaviors don’t change.

If user C begins to submit jobs, each user’s share becomes 33.3% of the queue capacity (again, assuming there is sufficient demand from the user’s jobs). This goes on until all 5 users get their 20% share each.

Now, if a 6th user enters the picture, things change a bit. Instead of further dividing capacity, Capacity Scheduler instead attempts to complete jobs from the original 5 users. This logic was added so that organizations can prevent large volumes of users from essentially dividing up resources across too many jobs, which would spread resources too thin (e.g. local storage) to complete tasks in an acceptable time frame.

On the other hand, as the users’ applications wind down, the other users start to reclaim the share. For e.g. if 3 users A, B, C are using 33% and user B’s jobs complete, both A and C can now get 50% of the queue capacity.

Furthermore, if one of the existing active users have applications which do not need additional resources in the cluster, for e.g. at the tail end of the job, the other users get larger share of the queue.

Understanding the Resource Utilization of MapReduce Jobs & Its Impact on Sharing

One of the keys to achieving stability in MapReduce clusters is to understand that even though job tasks may not be running, they can still consume resources in the cluster. A very common use case is map-tasks outputs – i.e. even after a map task completes, it is still consuming resources to store its map outputs.

For example, if a large job consumes tens or hundreds of TB and is sharing capacity with other jobs for a long period of time (e.g. 12 hours), the cluster could have a lot of disk space tied up to map outputs. This is one of the main reasons that Capacity Scheduler attempts to finish individual jobs with a limited amount of sharing, thereby maximizing throughput and improving cluster stability.

Note that the discussion of map outputs doesn’t change if we store them locally on individual disks (current Hadoop MapReduce) or on HDFS with replica of 1, since storage consumed is still the same. Also, having an HDFS replica of map outputs >1 is not advised because it is more expensive both in terms of performance and storage. It is simply much better to re-run individual maps on failure rather than having to pay the expense on each and every job.

Other Features of Capacity Scheduler

Capacity Scheduler supports that notion of resource specifications for individual jobs – i.e. Capacity Scheduler is not limited to a one-slot-per-task paradigm. A user can request for multiple slots for each task by asking for more resources per task (specifically, memory). This is important when you have several users each running resource-intensive jobs. With this feature, the job and queue get “charged” against the queue for using more resources per task.

Another important feature is the extensive stabilization of Capacity Scheduler over the past several years. Given our team’s experience running Capacity Scheduler on very large, shared clusters, we have continually enhanced it to ensure that single jobs or single users cannot overwhelm a cluster either maliciously or because of a user bug. This is especially important because JobTracker is a single point of control. For example, in days past, we had a user error in which a crontab submitted the same job every second instead of every day. This overwhelmed the cluster with tens of thousands of jobs that were initialized, although not necessarily run. Still, initializing the jobs was consuming a large amount of JobTracker memory. Capacity Scheduler now has limits for initialized jobs/tasks, running jobs/tasks, etc. in order to prevent these and other potentially damaging scenarios.

If you find that you are continually growing your Hadoop cluster and adding new users, I strongly suggest that you consider using Capacity Scheduler.

~ Arun C. Murthy

High Availability and Hadoop 1.0 – Perfect Together

In Shaun Connolly’s post about balancing community innovation and enterprise stability, he discussed the importance of an open source project forging ahead with big improvements that are expected to be initially buggy and incomplete functionally but then stabilize over time. In the case of Apache Hadoop 2.0, currently in community Alpha release, the big improvements have been underway for the past 3 years and include such things as:

  1. Next-gen MapReduce (aka YARN) that opens up Hadoop’s job processing architecture to other application workloads beyond MapReduce,
  2. New HDFS pipe-line to support append and flush,
  3. HDFS federation and performance improvements that enable Hadoop to scale to 1000’s more nodes in a cluster, and
  4. High availability improvements that address some of the single point of failure issues that are often used as examples of how Hadoop may not be as enterprise-ready as it could be.

In the case of high availability (HA), it can take many months or years to get these types of solutions rock solid. While Hadoop 2.0 contains important HA-related features such as HDFS hot standby, we want to make sure we give it time to complete its community release process and allow extra time after that for bugs to be found and fixed to harden it for broad enterprise production use.

Read More

Hortonworks Data Platform v1.0 Download Now Available

If you haven’t yet noticed, we have made Hortonworks Data Platform v1.0 available for download from our website. Previously, Hortonworks Data Platform was only available for evaluation for members of the Technology Preview Program or via our Virtual Sandbox (hosted on Amazon Web Services). Moving forward and effective immediately, Hortonworks Data Platform is available to the general public.

Hortonworks Data Platform is a 100% open source data management platform, built on Apache Hadoop. As we have stated on many occasions, we are absolutely committed to the Apache Hadoop community and the Apache development process. As such, all code developed by Hortonworks has been contributed back to the respective Apache projects.

Version 1.0 of Hortonworks Data Platform includes Apache Hadoop-1.0.3, the latest stable line of Hadoop as defined by the Apache Hadoop community. In addition to the core Hadoop components (including MapReduce and HDFS), we have included the latest stable releases of essential projects including HBase 0.92.1, Hive 0.9.0, Pig 0.9.2, Sqoop 1.4.1, Oozie 3.1.3 and Zookeeper 3.3.4. All of the components have been tested and certified to work together. We have also added tools that simplify the installation and configuration steps in order to improve the experience of getting started with Apache Hadoop.

Read More

Introducing Hortonworks Data Platform v1.0

I wanted to take this opportunity to share some important news. Today, Hortonworks announced version 1.0 of the Hortonworks Data Platform, a 100% open source data management platform based on Apache Hadoop. We believe strongly that Apache Hadoop, and therefore, Hortonworks Data Platform, will become the foundation for the next generation enterprise data architecture, helping companies to load, store, process, manage and ultimately benefit from the growing volume and variety of data entering into, and flowing throughout their organizations. The imminent release of Hortonworks Data Platform v1.0 represents a major step forward for achieving this vision.

You can read the full press release here. You can also read what many of our partners have to say about this announcement here. We were extremely pleased that industry leaders such as Attunity, Dataguise, Datameer, Karmasphere, Kognitio, MarkLogic, Microsoft, NetApp, StackIQ, Syncsort, Talend, 10gen, Teradata and VMware all expressed their support and excitement for Hortonworks Data Platform.

Those who have followed Hortonworks since our initial launch already know that we are absolutely committed to open source and the Apache Software Foundation. You will be glad to know that our commitment remains the same today. We don’t hold anything back. No proprietary code is being developed at Hortonworks.

Read More

Apache Hadoop 2.0 (Alpha) Released

As the release manager for the Apache Hadoop 2.0 release, it gives me great pleasure to share that the Apache Hadoop community has just released Apache Hadoop 2.0.0 (alpha)! While only an alpha release (read: not ready to run in production), it is still an important step forward as it represents the very first release that delivers new and important capabilities, including:

Read More

Executive Video Series: Apache Hadoop and Next Generation MapReduce

The third installment of the Hortonworks executive video series features Arun C. Murthy, co-founder of Hortonworks and VP of Apache Hadoop for the Apache Software Foundation. In this video, Arun shares his view of the power of Apache Hadoop and provides some insight into the future direction of MapReduce, including the ability to support alternate programming paradigms.

Read More

Apache Hadoop 0.23.1 is Released!

A very short while ago, Vinod blogged about some of the significant improvements in Hadoop.Next (a.k.a hadoop-0.23.1).

To recap, the Hortonworks and Yahoo! teams have done a huge amount of work to test, validate and benchmark Hadoop.Next, the next generation of Apache Hadoop that includes HDFS Federation, NextGen MapReduce (a.k.a. YARN) and many other significant features and performance improvements.

Today, I’m very excited to announce that the Apache Hadoop community voted to release hadoop-0.23.1 and it’s now available for all to use!

Please head over to the Apache Hadoop Releases page to download and play with it. Happy Hadoop-ing!

Of course, many thanks to everyone in the community who contributed!

~Arun

Go to page:12