Category Archives: HDFS


Hadoop SDK and Tutorials for Microsoft .NET Developers

Microsoft has begun to treat its developer community to a number of Hadoop-y releases related to its HDInsight (Hadoop in the cloud) service, and it’s worth rounding up the material. It’s all Alpha and Preview so YMMV but looks like fun:

  • Microsoft .NET SDK for Hadoop. This kit provides .NET API access to aspects of HDInsight including HDFS, HCatalag, Oozie and Ambari, and also some Powershell scripts for cluster management. There are also libraries for MapReduce and LINQ to Hive. The latter is really interesting as it builds on the established technology for .NET developers to access most data sources to deliver the capabilities of the de facto standard for Hadoop data query.
  • HDInsight Labs Preview. Up on Github, there is a series of 5 labs covering C#, JavaScript and F# coding for MapReduce jobs, using Hive, and then bringing that data into Excel. It also covers some Mahout use to build a recommendation engine.
  • Microsoft Hive ODBC Driver. The examples above use this preview driver to enable the connection from Hive to Excel.

If all of the above excites you our Hadoop on Windows for Developers training course also similar content in a lot of depth.

You can read more about the partnership between Hortonworks and Microsoft here, and you can download a preview of HDP for Windows here, or sign up for HDInsight over here. And if you’re hungry for more Hadoop tutorials, grab our own Hortonworks Sandbox here.

Simplifying data management: NFS access to HDFS

We are excited that another critical Enterprise Hadoop integration requirement – NFS Gateway access to HDFS – is making progress through the main Apache Hadoop trunk.  This effort is architected and designed by Brandon Li and Suresh Srinivas, and is being delivered by the community. You can track progress in Apache JIRA HDFS-4750.

With NFS access to HDFS, you can mount the HDFS cluster as a volume on client machines and have native command line, scripts or file explorer UI to view HDFS files and load data into HDFS.  NFS thus enables file-based applications to perform file read and write operations directly to Hadoop. This greatly simplifies data management in Hadoop and expands the integration of Hadoop into existing toolsets.

NFS and HDFS

Network File System (NFS) is a distributed file system protocol that allows access to files on a remote computer in a manner similar to how local file system is accessed.  With a NFS gateway for Hadoop, files can now be browsed, downloaded and written to and from HDFS as if it is local file system. These are critical enterprise requirements.

Bringing the full capability of NFS to HDFS is an important strategic initiative for us. In the first phase, we have enabled NFSv3 interface access to HDFS. This is done using NFS Gateway, a stateless daemon, that translates NFS protocol to HDFS access protocols as shown in the following diagram. Many instances of such daemon can be run to provide high throughput read/write access to HDFS from multiple clients. As a part of this work, HDFS now has a significant functionality that supports inode ID or file handles, that was done in Apache JIRA HDFS-4489.

nfs

We are excited to work with the community to enable a robust roadmap for NFS functionality, focussing on the following capabilities:

  • NFSv4 and other protocols for access to HDFS
  • Highly Available NFS Gateway
  • Secure Hadoop (Kerberos) integration

The first phase of development is complete and is undergoing rigorous testing and stabilization. This set of functionality is being run through our integrated HDP stack test suite to ensure enterprise readiness.

The NFS Gateway functionality is being made available in the community and can be tracked in JIRA HDFS-4750.

Understanding Hadoop 2.0

In this post, we’ll explain the difference between Hadoop 1.0 and 2.0. After all, what is Hadoop 2.0? What is YARN?

For starters – what is Hadoop and what is 1.0? The Apache Hadoop project is the core of an entire ecosystem of projects. It consists of four modules (see here):

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Hadoop 1.0 is based on the Hadoop .20.205 branch (it went 0.18 -> 0.19 -> 0.20 -> 0.20.2 -> 0.20.205 -> 1.0). Hard to follow? Check out this chart. Not hard for an open source developer, but obscure for an enterprise product – so everyone agreed to call 0.20.205 ’1.0′, the project having matured to that point.

Hadoop 2.0 is from the Hadoop 0.23 branch, with major components re-written to enable support for features like High Availability, and MapReduce 2.0 (YARN), and to enable Hadoop to scale out past 4,000 machines per cluster. Specifically, Hadoop 2.0 adds (see here):

  • HDFS Federation – multiple, redundant namenodes acting in congress
  • MapReduce NextGen aka YARN aka MRv2 – which transforms Hadoop into a full blown platform as a service. See here.

Hadoop 1.0 is rock-solid, Hadoop 2.0 is still in active development and is considered Alpha. Work continues to stabilize Hadoop 2.0, so stay tuned!

Proper Care and Feeding of Drives in a Hadoop Cluster: A Conversation with StackIQ’s Dr. Bruno

In a recent blog post, Hortonworks’ Steve Loughran discussed Apache Hadoop’s preference for JBOD-configured storage vs. the allure of RAID-0. As more enterprises are beginning to move beyond the science experiment stage and begin deploying Hadoop into their production environments, they are learning that Hadoop is quite different than other services in their data centers, such as web, mail, and database servers.They are learning that to achieve optimal performance, you need to pay particular attention to configuring the underlying hardware.

To find out more, we had a chat with Dr. Greg Bruno, VP of Engineering, and co-founder of StackIQ, a Hortonworks partner, about the real life implications of managing hard drives (HDDs) in a modern Hadoop cluster.

Q. Why isn’t it considered good practice to configure drives in Hadoop clusters as RAID-0 disk arrays?

A. Hadoop prefers a set of separate disks to the same set managed as a RAID-0 disk array. Read speeds are particularly important to the performance of a Hadoop cluster, and in his post, Steve makes the point that since drive speeds vary, and RAID-0 reads occur at the speed of the slowest disk in the array, a RAID-0 configuration may well be slower than a non-RAID configuration. The bigger issue, in my opinion, is reliability. If a set of disks is configured as a RAID-0 array, then one disk failure in that array will take that entire volume down, and if all the disks in a node are configured as a single RAID-0 array, then a single disk failure will take all the node’s data down. By configuring multiple disks in a RAID-0 array, you magnify the probability of that volume going offline due to a single disk failure and you maximize the amount of data that goes offline when that single failure occurs.

Q: Modern servers have a lot of disks. What’s the impact of losing a single disk when you have 12 3TB drive in each node?

A:  When a single drive fails when Hadoop is configured in its default state, the ENTIRE NODE gets taken offline. Back when servers typically had 6 x 1.5TB drives in them, losing a single disk would cause the loss of 0.02% of total storage in a typical 10PB, three-replica setup. With today’s hardware — typically 12 x 3TB drives per node, losing a single disk results in the loss of five times as much data.

Q: Aren’t today’s HDDs much more reliable than they used to be? Is it worth the extra work to handle the rare cases when a drive fails?

A: While drives are much more reliable than they used to be, they are still the cause of the lion’s share of support tickets in a Hadoop cluster. In fact, according to Bharath Mundlapudi, a Core Hadoop Engineer while working at Yahoo, disk drive failures account for fully 50% of siteops trouble tickets. That’s more than three times the next highest source of tickets.

Q: What does that represent in real terms?

A: It represents a lot of work for systems administrators. How much depends on the size and age of the cluster in question. For example, Facebook, which has some very large clusters, reports that their failure detection and automated repair system is doing the work of approximately 200 full time system administrators.

Q: OK, but not many organizations have clusters that large. What about a typical enterprise setup?

A: Our experience indicates that a 1,000 node cluster containing 12,000 drives for a total raw storage capacity of 48 peta-bytes can expect about 3 drive failures a day in its third year of operation. Drive failure rates rise as the devices age. For a 500 node cluster, you’re looking at a drive failure every 17 hours or so.

Q: Doesn’t this make it hard for the cluster operator to manage? How do they keep up?

A: Without the right tools and methodology, it is very difficult for cluster operators to manage clusters at scale. They typically have to write scripts to scan the cluster, detect disk failures, and report them. Then, once the offending drive has been replaced, commands must be run for the controller to recognize the new drive, OS commands need to be executed to format the drive, and then some Hadoop commands are required to add the disk back to the configuration.

Q: Presumably it’s not quite as challenging for StackIQ customers?

A: StackIQ’s mission is to make cluster operation as painless as possible, which is why we have developed tools to manage the entire lifecycle of the disk. While we haven’t figured out how to get our software to physically pull a bad drive and replace it with a new one, we automate the rest of it — from the initial deployment of the drive, detecting and reporting the error, and re-integrating the replacement drive into the configuration.

One of the features we’ve developed in StackIQ’s management software automatically configures chassis with LSI MegaRaid controllers into “JBODs”, that is, every disk in the chassis will be configured as an individual device.

In addition, a user can specify which disk they want in the chassis to be the boot disk via an attribute (e.g., “bootdisk0″) and if an optional secondary boot disk attribute is specified (“bootdisk1″), then our code will configure both those disks as a “mirror” (RAID1) while still making all the other non-boot disks available to Hadoop as individual disks.  A recent StackIQ customer made their purchasing decision on this feature alone, as they recently went through the painful exercise of changing a mid-size cluster’s RAID configuration by booting each server, one-by-one, catching a key press at the controller prompt, and fixing the configuration by-hand.  Not a fun exercise when you are under the gun by management to get production cluster online.

Q: With that many drive failures, clusters will be chewing through disks at a brisk rate. That could get expensive. That works out to something like 1000 drives/year X $100/drive = $100k per year just for replacement drives.

A: True, which speaks to the need for software which will make the most efficient use of your resources –  intelligent, automated cluster management software can find faulty drives automatically, and bring up a replacement drive quickly.

Q: Doesn’t automation take control out of the hands of the skilled cluster operators?

A: We believe it should be up to the cluster operator to set policies on how much automation to incorporate into their workflows. Our software reflects that philosophy, letting operators choose from a range of policies that go all the way from having the operator run all the commands manually, all the way to a fully automated repair where all the operator needs to do is push in the new drive and let StackIQ’s software do the rest.

Q: Can’t this be done with a simple command script that runs on all nodes?

A: That might be workable in a homogeneous environment, where all the nodes are the same. But in the real world, different nodes require different configurations. Even the disks are likely configured differently in nodes within the clusters. Handling those variables in a static script would be very difficult to accomplish. For example, if your cluster expands over time, you may be adding chassis with different drive configurations. Static scripts wouldn’t be able to deal with this situation. The StackIQ management software has intimate knowledge of the hardware and software in the cluster, so it knows exactly how to handle each drive in each node across the entire cluster, even in a heterogeneous environment.

Conclusion

So there you have it. The folks behind StackIQ cluster management software agree with Steve Loughran’s recommendation to forego RAID-0 for Hadoop clusters. In fact, they provide the management tools to make it easier to do. So take the advice of our experts, and configure your cluster servers as “Just a Bunch of Disks.”

For more information on StackIQ, please visit their website or follow their Twitter handle (@StackIQ). You can also follow Dr. Greg Bruno directly on his Twitter handle (@itsDrBruno).

~ Lisa Sensmeier

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

HDP Monitor The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. Moreover, for a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play in HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500]
   .name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount
   .cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class)
   .order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Why not RAID-0? It’s about Time and Snowflakes

A recurrent question on the various Hadoop mailing lists is “why does Hadoop prefer a set of separate disks to the same set managed as a RAID-0 disks array?”

It’s about time and snowflakes.

JBOD and the Allure of RAID-0

In Hadoop clusters, we recommend treating each disk separately, in a configuration that is known, somewhat disparagingly as “JBOD”: Just a Box of Disks.

In comparison RAID-0, which is a bit of misnomer, there being no redundancy, stripes data across all the disks in the array. This promises some advantages:

  • Higher IO rates on small accesses
  • Higher bandwidth on larger accesses -especially write operations
  • Eliminates a hot-spot of a single disk overloaded if it’s data is more in demand

In RAID=0, data is striped across disks. When data needs to be written, it is divided up into small blocks (64KB or more). One of these blocks is written to each disk simultaneously. When the data is read back, all the blocks can again be read from all disks simultaneously. The result of this is that your disk bandwidth increases with the size of the array. If you had eight disks mounted as RAID-0, then the theoretical maximum write and read bandwidth is eight times faster than a single disk.

With the disk controllers built into modern servers, RAID-0 is an option that can be turned on: so why not?

Reliability

Reliability is one issue.

Disks can get slower as they age, as they start to get read errors and have to retry reading bits of the disk platter. A slow disk is a warning sign that maybe you should think about replacing that disk. With Hadoop in a JBOD setting, you can unmount the disk; the Datanode will notice it is missing and report to the Namenode that all the data on it needs re-replication. If you have a RAID-0 disk, everything across all disks is missing – you need to add a new disk to bring the array back up to size, reformat all the disks, and bring up the Datanode without any storage. Over time it will pick up more data, from rebalancing and jobs run on it.

You have to do that whenever any of the disks fails – the more disks you have, the more common it is.

Before panicking – disk failures are rare. Google’s 2007 paper, Failure Trends in a Large Disk Drive Population, reported that in their datacenters, 1.7% of disks failed in the first year of their life, while three-year-old disks were failing at a rate of 8.6%. About 9% isn’t a good number. Returning to the hypothetical eight-disk server, the probability of each disk lasting the year would be:

1 – 0.086 =0.914

The probability that all disks make it to their next birthday becomes:

0.914^8 = 0.487

If those google numbers matched that of the disks in your servers – and weren’t due to a really bad batch of disks – then during that third year, about half the datanodes would lose all their data and need to be rebuilt. If you have one of the latest twelve-disk servers, things get even worse.

Hadoop copes with reliability by duplicating data across servers: if one copy of an HDFS data block (64MB of greater) is lost or corrupt, there are usually two copies elsewhere to recover.

Only now, with all the data in a server lost, the amount of data to replicate on a disk failure increases linearly with the number of disks in each server – while the probability of the server failing also increases. Whereas before, each those failing year-three disks would have a probability of failing of 0.914 %, with the amount of data being the size of the disk: 1-3 TB of data.

That eight disk cluster would have to transmit 8-24 TB of data, and do it eight times as frequently. That’s going to be slightly more noticeable.

If you do want to use RAID-0 storage, configuring an eight-disk server as four pairs of RAID-0 storage is much less risky. The IO performance could be double that of a single drive, but so the risk of either failing would be less, and the cost of recovering the data also very much reduced.

Disk failures, then, are the first reason you don’t want to use RAID-0 storage – now to the second.

Every Disk is a Unique Snowflake

Hadoop job performance depends on disk bandwidth, especially the read bandwidth.

On RAID-0 Storage the disk accesses go at the rate of the slowest disk. It’s always been believed that the disks would all start out taking the same speed, and only degrade over time. Recent research shows things are worse than this: that the current generations of hard disks vary in performance from day one.

The 2011 paper, Disks Are Like Snowflakes: No Two Are Alike, measured the performance of modern disk drives, and discovered that they can vary in data IO rates by 20%, even when they are all writing to same part of the hard disk. This is because the latest manufacturing processes produce disks with different surface characteristics – altering the density at which the disks can store data. Rather than set the disk electronics up to only support the lowest measured performance, or to discard the slowest disks, manufacturers now use a technique called Adaptive Zoning. The newly manufactured disks are calibrated to their performance, and the the controllers configured to drive each zone in the disk at the highest rate that zone supports.

This is profound – and it’s not something that the disk manufacturers have been publicising. Modern CPU’s are “binned” into parts that support different rates – but those rates are published and you get to pay more for the faster parts. Here the speed of the disk varies, and you just have to hope your parts are the fast ones. In the experiments the authors of the paper conducted, some disks could deliver 105 Megabytes of data a second, with the actual range being 90-111 MB/s.

If you have eight disks, some will be faster than the others, right from day one. And your RAID-0 storage will deliver the performance of the slowest disk right from the day you unpack it from its box and switch it on.

That is the other why we don’t recommend configuring your servers’ storage as one large RAID-0 array.

Summary

To summarize: RAID-0 storage appears to increase disk I/O times, but it will deliver data at the rate of the slowest disk in the array – an array whose disk speeds can vary by up to 20%. When a single disk eventually fails, all the data on the server is lost, forcing you to reformat all the disks and waiting for HDFS to repopulate the server with new data.

  1. We use JBOD storage for all our worker nodes – and recommend our customers to do too.
  2. If anyone does insist on RAID-0 storage, restrict it to pairs of disks – so keeping the risk and cost of failures down.

Update: Single Drive RAID-0

Some people asked us what about RAID-0 and single drives – so we’d like to clarify this: Hadoop works perfectly well if you configure each drive as a single RAID-0 volume.

This configuration comes about with disk controllers that expect everything to be RAIDed, or if the controller can support JBOD or RAID modes -and you declare one pair of disks as RAID-1. Why would anyone want do declare two disks as RAID-1 – mirrored disks? If you put the OS on that RAID volume then the failure of either of those disks will not stop the server. In the very large 500+ node clusters people don’t do this -they have enough spare servers around, and would rather have the extra storage and bandwidth. On smaller clusters, having the OS on a mirrored pair of disks downgrades a server failure from a serious problem (re-replication of all the server’s data, new disk needed in a few hours) to a task (get a replacement disk and swap it into the server when you get a chance).

The problems with RAID-0 -amplified data replication on a disk failure, performance of the slowest disk- increase with the number of disks. With a single disk, you get exactly the same numbers as you would if the disk controller considered it a JBOD drive.

HA Namenode for HDFS with Hadoop 1.0 – Part 1

Introduction

A Highly Available NameNode for HDFS has been in development since last year. That effort focused singularly on the automatic failover of the NameNode for Hadoop 2.0. During that time we have realized two things.

First, we realized we should use an outside-in approach to the HA problem: start by designing the availability of the Hadoop system as a whole and then focus on the high-availability of individual components; that work lead to the Full Stack HA Architecture.

Second, we realized that we can build an HA NameNode for Hadoop 1.0 using industry proven solutions such as Linux HA and vSphere; this is important because HDFS in Hadoop 1 is been proven to be stable and reliable, while HDFS in Hadoop 2 is just beginning beta testing. This blog describes some technical details of HDFS NameNode HA in Hadoop 1. A future blog will give some more details on Full Stack HA.

The first and foremost question in people’s mind is: What is the difference between HA Hadoop 1 and Hadoop 2? My colleague Suresh and I wrote the original design for Hadoop 2 (see HDFS-1623) and have worked closely with the community on the implementation. HA in Hadoop 1 is the direct result of our experiences during that work.

Hadoop 2 HA focused on three areas:

  • Hot failover: We have found that the difference in failover times are small between cold and hot failover for small to medium clusters; Hadoop 1 uses cold failover.
  • Automatic failover: For Hadoop 1 we have used industry proven HA framework rather than use the Hadoop 2 Failover-Controller. The figure above illustrates NameNode HA in Hadoop 1 using Linux HA.
  • Remove dependency on shared storage: this is still work in progress for Journal Daemons in trunk[6]. Both Hadoop 2 and Hadoop 1 use shared storage.

To summarize, the difference in Hadoop 1 HA is cold failover and the use of industry standard HA frameworks. Lets look at details and implications of these differences below.

Failover Times and Cold versus Hot Failover

The failover time of a high available system with active-passive failover is the sum of (1) time to detect that the active service has failed, (2) time to elect a leader and/or for the leader to make a failover decision and communicate to the other party, and (3) the time to transition the standby service to active.

The first and second items are the same for cold or hot failover: they both rely on heartbeat timeouts, monitoring probe timeouts, etc. We have observed that total combined time for failure detection and leader election to range from 30 seconds to 2.5 minutes depending on the kind of failure; the lowest times are typical when the active server’s host or host operating system fails; hung processes take longer due to the grace period needed to be confident that the process is not blocked during Garbage Collection.

For the third item, the time to transition the standby service to active, Hadoop 1 requires starting a second NameNode and for the NameNode to get out of safe mode. In our experiments we have observed the following times:

  • A 60 node cluster with 6 million blocks using 300TB raw storage, and 100K files: 30 seconds. Hence total failover time ranges from 1-3 minutes.
  • A 200 node cluster with 20 million blocks occupying 1PB raw storage and 1 million files: 110 seconds. Hence total failover time ranges from 2.5 to 4.5 minutes.

Industry Standard HA Frameworks

As we stated in the Hadoop HDFS NameNode HA design document, HDFS-1623, HDFS NameNode HA design document [HDFS-1623], the notion of having a failover controller outside the NameNode was influenced by frameworks like Linux HA[1], Red Hat HA[2] and Veritas Cluster[4,5]. Part of the decision for the Hadoop community to build our own failover controller was made because we felt it was useful for Hadoop to provide an out-of-the-box solution. Linux HA, being GPL, did not allow that.

For Hadoop 1 we decided not to back-port the failover controller from trunk, but instead use an industry standard HA frameworks. Why?

We wanted to add HA to the stable Hadoop line in as risk-free a way possible which lead us to use proven and robust HA frameworks. Many customers already have experience using these HA frameworks. These frameworks deal with monitoring timeout, service startup timeout, shutdown timeout, and have a way to flag and deal with repeated failures. Further, the industry proven frameworks offer several alternative fencing solutions including power-based fencing.

The same HA framework can be used to failover other Hadoop services such as the Job-Tracker; we have have already started the work on an HA Job-Tracker. These HA frameworks also provide the ability to share a common pool of server machines to host highly avaiolable NameNode, JobTracker and other Master daemons; the shared pools allow N-N, N-on-N and N+K failover. Finally they offer a way to perform manual switchover, coordinated shutdown of both and being able to run with one of the NameNodes down.

Using Failover solutions which keep the Namenode IP Address constant ensure that the web interfaces such as WebHDFS or the Hadoop consoles also failover. With IP failover, URLs will follow the service, regardless of where it is running. The use of mature IP failover-based solutions reduced the complexity, making it possible to implement HA on the stable Hadoop 1.0 line with a few, low risk changes to the Hadoop core.

Recently, Symantec independently described how to make the NameNode highly available using Veritas Cluster [5]. The Figure above illustrates the NameNode HA in Hadoop 1 using Linux HA that will be available along with a similar solution using vSphere HA.

FAQ

You are using cold failover and hence it is not practical.
The HDFS 2.0 HA design was driven by the needs of the very large clusters at Yahoo and Facebook. For small to medium clusters cold failover is only 30 to 120 seconds slower, as described above.

If this was so easy to do with Linux HA or other tools why didn’t the HDFS community do this earlier?
This is partly because the original HDFS team focused on very large clusters where cold failover was not practical. We assumed that Hadoop needed to provide its own built-in solution As we’ve developed this technology, we’ve heard directly from our customers that HA solutions are complex and that they prefer using their existing, well understood, solutions.

You have taken a different path from HA in Hadoop 2.
Not true – both are based on the same design principals. The difference is the focus on cold failover instead of hot failover. Further, the work is complimentary. All the work in the Hadoop 1.0 line is also being added to the Hadoop 2.0 line.

I need to use Linux HA, or another framework – isn’t that a hindrance?
Many of Hadoop users already use Linux HA, Red Hat HA, Veritas Cluster, or vSphere HA in their data centers. Linux HA is freely available and the cost of Red Hat’s HA is fairly low.

So is the Full Stack HA only a part of Hadoop 1?
No, Full Stack HA is orthogonal to the failover of a specific component such as the NameNode or the Job Tracker (see this post). Making the rest of the stack robust against transient failures of the layers underneath improves the entire stack. We will cover more detail of Full Stack HA in a up coming blog.

Does vSphere HA deal with service failures in addition to VM failures?
vSphere allows application level health checks and we have added an application-level monitor for the NameNode. A similar JobTracker-specific monitor will be available shortly as well.

When using vSphere HA solution, what is the advantage of hosting the NameNode and JobTracker each in their own vSphere VirtualMachine?
Hosting the master services in isolated vSphere VMs is an effective design. As vSphere monitors and manages each VM independently, vSphere servers can host independent VMs containing the NameNode, JobTracker, and other master services. If one service fails, that VM is killed and restarted – while the other VMs continue uninterrupted. You also gain the ease of maintenance and rollback that VMs offer.

Status

All patches to core hadoop have been committed to Apache Hadoop trunk and branch 1.1; these are also incorporated in Hortonworks HDP 1 and HDP 1.1 releases. We plan to commit these to Hadoop 2-alpha.

Our monitoring code is targeted for inclusion into the still-in-incubation Ambari project; it has already been submitted as a patch [8].

Future Outlook

Work is in progress to stabilize HDFS 2 which is currently entering beta testing and also Hadoop 2‘s HA (Hot Failover, failover controller, etc). We are also in the progress of providing failover for other Hadoop components, such as the JobTracker, in Hadoop 1.

References

  1. Linux HA: http://www.linux-ha.org/wiki/Main_Page
  2. Red Hat High Availability Add-On: http://www.redhat.com/products/enterprise-linux-add-ons/high-availability/
  3. vSphere HA: http://www.vmware.com/products/high-availability/overview.html
  4. Symantics’ Veritas Custer Framework: https://www.symantec.com/cluster-server
  5. NN HA using Symantics’ Veritas Custer Framework: http://www.symantec.com/connect/blogs/symantec-high-availability-solution-hadoop-namenode
  6. Hadoop Journal Daemon: HDFS-3092 and HDFS-3077
  7. HDFS NameNode HA design (for Hadoop 2): HDFS-1623
  8. Monitoring Library: Ambari-504
  9. Full Stack HA: http://hortonworks.com/blog/high-availability-and-hadoop-1-0-perfect-together/

Apache Hadoop YARN – Background and an Overview

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Apache Hadoop YARN – Background & Overview

Celebrating the significant milestone that was Apache Hadoop YARN being promoted to a full-fledged sub-project of Apache Hadoop in the ASF we present the first blog in a multi-part series on Apache Hadoop YARN – a general-purpose, distributed, application management framework that supersedes the classic Apache Hadoop MapReduce framework for processing data in Hadoop clusters.

MapReduce – The Paradigm

Essentially, the MapReduce model consists of a first, embarrassingly parallel, map phase where input data is split into discreet chunks to be processed. It is followed by the second and final reduce phase where the output of the map phase is aggregated to produce the desired result. The simple, and fairly restricted, nature of the programming model lends itself to very efficient and extremely large-scale implementations across thousands of cheap, commodity nodes.

Apache Hadoop MapReduce is the most popular open-source implementation of the MapReduce model.

In particular, when MapReduce is paired with a distributed file-system such as Apache Hadoop HDFS, which can provide very high aggregate I/O bandwidth across a large cluster, the economics of the system are extremely compelling – a key factor in the popularity of Hadoop.

One of the keys to this is the lack of data motion i.e. move compute to data and do not move data to the compute node via the network. Specifically, the MapReduce tasks can be scheduled on the same physical nodes on which data is resident in HDFS, which exposes the underlying storage layout across the cluster. This significantly reduces the network I/O patterns and keeps most of the I/O on the local disk or within the same rack – a core advantage.

Apache Hadoop MapReduce, circa 2011 – A Recap

Apache Hadoop MapReduce is an open-source, Apache Software Foundation project, which is an implementation of the MapReduce programming paradigm described above. Now, as someone who has spent over six years working full-time on Apache Hadoop, I normally like to point out that the Apache Hadoop MapReduce project itself can be broken down into the following major facets:

  • The end-user MapReduce API for programming the desired MapReduce application.
  • The MapReduce framework, which is the runtime implementation of various phases such as the map phase, the sort/shuffle/merge aggregation and the reduce phase.
  • The MapReduce system, which is the backend infrastructure required to run the user’s MapReduce application, manage cluster resources, schedule thousands of concurrent jobs etc.

This separation of concerns has significant benefits, particularly for the end-users – they can completely focus on the application via the API and allow the combination of the MapReduce Framework and the MapReduce System to deal with the ugly details such as resource management, fault-tolerance, scheduling etc.

The current Apache Hadoop MapReduce System is composed of the JobTracker, which is the master, and the per-node slaves called TaskTrackers.

The JobTracker is responsible for resource management (managing the worker nodes i.e. TaskTrackers), tracking resource consumption/availability and also job life-cycle management (scheduling individual tasks of the job, tracking progress, providing fault-tolerance for tasks etc).

The TaskTracker has simple responsibilities – launch/teardown tasks on orders from the JobTracker and provide task-status information to the JobTracker periodically.

For a while, we have understood that the Apache Hadoop MapReduce framework needed an overhaul. In particular, with regards to the JobTracker, we needed to address several aspects regarding scalability, cluster utilization, ability for customers to control upgrades to the stack i.e. customer agility and equally importantly, supporting workloads other than MapReduce itself.

We’ve done running repairs over time, including recent support for JobTracker availability and resiliency to HDFS issues (both of which are available in Hortonworks Data Platform v1 i.e. HDP1) but lately they’ve come at an ever-increasing maintenance cost and yet, did not address core issues such as support for non-MapReduce and customer agility.

Why support non-MapReduce workloads?

MapReduce is great for many applications, but not everything; other programming models better serve requirements such as graph processing (Google Pregel / Apache Giraph) and iterative modeling (MPI). When all the data in the enterprise is already available in Hadoop HDFS having multiple paths for processing is critical.

Furthermore, since MapReduce is essentially batch-oriented, support for real-time and near real-time processing such as stream processing and CEPFresil are emerging requirements from our customer base.

Providing these within Hadoop enables organizations to see an increased return on the Hadoop investments by lowering operational costs for administrators, reducing the need to move data between Hadoop HDFS and other storage systems etc.

Why improve scalability?

Moore’s Law… Essentially, at the same price-point, the processing power available in data-centers continues to increase rapidly. As an example, consider the following definitions of commodity servers:

  • 2009 – 8 cores, 16GB of RAM, 4x1TB disk
  • 2012 – 16+ cores, 48-96GB of RAM, 12x2TB or 12x3TB of disk.

Generally, at the same price-point, servers are twice as capable today as they were 2-3 years ago – on every single dimension.  Apache Hadoop MapReduce is known to scale to production deployments of ~5000 nodes of hardware of 2009 vintage. Thus, ongoing scalability needs are ever present given the above hardware trends.

What are the common scenarios for low cluster utilization?

In the current system, JobTracker views the cluster as composed of nodes (managed by individual TaskTrackers) with distinct map slots and reduce slots, which are not fungible.  Utilization issues occur because maps slots might be ‘full’ while reduce slots are empty (and vice-versa).  Fixing this was necessary to ensure the entire system could be used to its maximum capacity for high utilization.

What is the notion of customer agility?

In real-world deployments, Hadoop is very commonly deployed as a shared, multi-tenant system. As a result, changes to the Hadoop software stack affect a large cross-section if not the entire enterprise. Against that backdrop, customers are very keen on controlling upgrades to the software stack as it has a direct impact on their applications. Thus, allowing multiple, if limited, versions of the MapReduce framework is critical for Hadoop.

Enter Apache Hadoop YARN

The fundamental idea of YARN is to split up the two major responsibilities of the JobTracker i.e. resource management and job scheduling/monitoring, into separate daemons: a global ResourceManager and per-application ApplicationMaster (AM).

The ResourceManager and per-node slave, the NodeManager (NM), form the new, and generic, system for managing applications in a distributed manner.

The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The per-application ApplicationMaster is, in effect, a framework specific entity and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the component tasks.

The ResourceManager has a pluggable Scheduler, which is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is a pure scheduler in the sense that it performs no monitoring or tracking of status for the application, offering no guarantees on restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based on the resource requirements of the applications; it does so based on the abstract notion of a Resource Container which incorporates resource elements such as memory, cpu, disk, network etc.

The NodeManager is the per-machine slave, which is responsible for launching the applications’ containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager.

The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. From the system perspective, the ApplicationMaster itself runs as a normal container.

Here is an architectural view of YARN:

One of the crucial implementation details for MapReduce within the new YARN system that I’d like to point out is that we have reused the existing MapReduce framework without any major surgery. This was very important to ensure compatibility for existing MapReduce applications and users. More on this later.

The next post will dive further into the intricacies of the architecture and its benefits such as significantly better scaling, support for multiple data processing frameworks (MapReduce, MPI etc.) and cluster utilization.

 

Other posts in this series:
Introducing Apache Hadoop YARN
Apache Hadoop YARN – Background and an Overview
Apache Hadoop YARN – Concepts and Applications
Apache Hadoop YARN – ResourceManager
Apache Hadoop YARN – NodeManager

Thinking about the HDFS vs. Other Storage Technologies

As Apache Hadoop has risen in visibility and ubiquity we’ve seen a lot of other technologies and vendors put forth as replacements for some or all of the Hadoop stack. Recently, GigaOM listed eight technologies that can be used to replace HDFS (Hadoop Distributed File System) in some use cases. HDFS is not without flaws, but I predict a rosy future for HDFS.  Here is why…

To compare HDFS to other technologies one must first ask the question, what is HDFS good at:

  • Extreme low cost per byte
    HDFS uses commodity direct attached storage and shares the cost of the network & computers it runs on with the MapReduce / compute layers of the Hadoop stack. HDFS is open source software, so that if an organization chooses, it can be used with zero licensing and support costs. This cost advantage lets organizations store and process orders of magnitude more data per dollar than tradition SAN or NAS systems, which is the price point of many of these other systems.  In big data deployments, the cost of storage often determines the viability of the system.
  • Very high bandwidth to support MapReduce workloads
    HDFS can deliver data into the compute infrastructure at a huge data rate, which is often a requirement of big data workloads. HDFS can easily exceed 2 gigabits per second per computer into the map-reduce layer, on a very low cost shared network. Hadoop can go much faster on higher speed networks, but 10gigE, IB, SAN and other high-end technologies double the cost of a deployed cluster. These technologies are optional for HDFS.  2+ gigabits per second per computer may not sound like a lot, but this means that today’s large Hadoop clusters can easily read/write more than a terabyte of data per second continuously to the MapReduce layer.
  • Rock solid data reliability
    When deploying large distributed systems like Hadoop, the laws of probability are not on your side. Things will break every day, often in new and creative ways.  Devices will fail and data will be lost or subtly mutated. The design of HDFS is focused on taming this beast. It was designed from the ground up to correctly store and deliver data while under constant assault from the gremlins that huge scale out unleashes in your data center. And it does this in software, again at low cost. Smart design is the easy part; the difficult part is hardening a system in real use cases.  The only way you can prove a system is reliable is to run it for years against a variety of production applications at full scale.  Hadoop has been proven in thousands of different use cases and cluster sizes, from startups to Internet giants and governments.

How does the HDFS competition stack up?  
This is an article about Hadoop, so I’m not going to call out the other systems by name, but I assert that all of the systems listed in the “8 ways” article don’t compare well to Hadoop in one of the above dimensions. Let me list some of the failure modes:

  • System not designed for Hadoop’s scale
    Many systems simply don’t work at Hadoop scale. They haven’t been designed or proven to work with very large data or many commodity nodes. They often will not scale up to petabytes of data or thousands of nodes. If you have a small use-case and value other attributes, such as integration with existing apps in your enterprise, maybe this is a good trade-off, but something that works well in a 10 node test system may fail utterly as your system scales up. Other systems don’t scale operationally or rely on non-scalable hardware. Traditional NAS storage is a simple example of this problem. A NAS can replace Hadoop in a small cluster. But as the cluster scales up, cost and bandwidth issues come to the fore.
  • System that don’t use commodity hardware or open source software
    Many proprietary software / non-commodity hardware solutions are well tested and great at what they were designed to do. But, these solutions cost more than free software on commodity hardware. For small projects, this may be ok, but most activities have a finite budget and a system that allows much more data to be stored and used at the same cost often becomes the obvious choice. The disruptive cost advantage of Hadoop & HDFS is fundamental to the current success and growing popularity of the platform. Many Hadoop competitors simply don’t offer the same cost advantage.  Vendor price lists speak for themselves in this area (where the prices are even published).
  • Not designed for MapReduce’s I/O patterns
    Many of these systems are not designed from the ground up for Hadoop’s big sequential scans & writes.  Sometimes the limitation is in hardware. Sometimes it is in software. Systems that don’t organize their data for large reads cannot keep up with MapReduce’s data rates. Many databases and NoSql stores are simply not optimized for pumping data into MapReduce.
  • Unproven technology
    Hadoop is interesting because it is used in production at extreme scale in the most demanding big data use cases in the world. As a result thousands of issues have been identified and fixed. This represents several hundred person-centuries of software development investment. It is easy to design a novel alternative system. A paper, a prototype or even a history of success in a related domain or a small set of use cases does not prove that a system is ready to take on Hadoop. Tellingly, along with listing some new and interesting systems, the “8 ways” article says goodbye to some systems that have previously been considered HDFS contenders by vocal advocates. I’ve got a rolodex full of folks who used to work on such systems who are now major players in the Apache Hadoop community.

It is easy to find example use cases where some other storage system is a better choice than Hadoop. But I assert that HDFS is the best system available today to do exactly what it was built for, being Hadoop’s storage system. It delivers rock solid data reliability and very high sequential read/write bandwidth, at the lowest cost possible. As a result, HDFS is, and I predict it will remain THE storage infrastructure for the vast majority of Hadoop clusters.

~E14

High Availability and Hadoop 1.0 – Perfect Together

In Shaun Connolly’s post about balancing community innovation and enterprise stability, he discussed the importance of an open source project forging ahead with big improvements that are expected to be initially buggy and incomplete functionally but then stabilize over time. In the case of Apache Hadoop 2.0, currently in community Alpha release, the big improvements have been underway for the past 3 years and include such things as:

  1. Next-gen MapReduce (aka YARN) that opens up Hadoop’s job processing architecture to other application workloads beyond MapReduce,
  2. New HDFS pipe-line to support append and flush,
  3. HDFS federation and performance improvements that enable Hadoop to scale to 1000’s more nodes in a cluster, and
  4. High availability improvements that address some of the single point of failure issues that are often used as examples of how Hadoop may not be as enterprise-ready as it could be.

In the case of high availability (HA), it can take many months or years to get these types of solutions rock solid. While Hadoop 2.0 contains important HA-related features such as HDFS hot standby, we want to make sure we give it time to complete its community release process and allow extra time after that for bugs to be found and fixed to harden it for broad enterprise production use.

Read More

Hortonworks Data Platform v1.0 Download Now Available

If you haven’t yet noticed, we have made Hortonworks Data Platform v1.0 available for download from our website. Previously, Hortonworks Data Platform was only available for evaluation for members of the Technology Preview Program or via our Virtual Sandbox (hosted on Amazon Web Services). Moving forward and effective immediately, Hortonworks Data Platform is available to the general public.

Hortonworks Data Platform is a 100% open source data management platform, built on Apache Hadoop. As we have stated on many occasions, we are absolutely committed to the Apache Hadoop community and the Apache development process. As such, all code developed by Hortonworks has been contributed back to the respective Apache projects.

Version 1.0 of Hortonworks Data Platform includes Apache Hadoop-1.0.3, the latest stable line of Hadoop as defined by the Apache Hadoop community. In addition to the core Hadoop components (including MapReduce and HDFS), we have included the latest stable releases of essential projects including HBase 0.92.1, Hive 0.9.0, Pig 0.9.2, Sqoop 1.4.1, Oozie 3.1.3 and Zookeeper 3.3.4. All of the components have been tested and certified to work together. We have also added tools that simplify the installation and configuration steps in order to improve the experience of getting started with Apache Hadoop.

Read More

Introducing Hortonworks Data Platform v1.0

I wanted to take this opportunity to share some important news. Today, Hortonworks announced version 1.0 of the Hortonworks Data Platform, a 100% open source data management platform based on Apache Hadoop. We believe strongly that Apache Hadoop, and therefore, Hortonworks Data Platform, will become the foundation for the next generation enterprise data architecture, helping companies to load, store, process, manage and ultimately benefit from the growing volume and variety of data entering into, and flowing throughout their organizations. The imminent release of Hortonworks Data Platform v1.0 represents a major step forward for achieving this vision.

You can read the full press release here. You can also read what many of our partners have to say about this announcement here. We were extremely pleased that industry leaders such as Attunity, Dataguise, Datameer, Karmasphere, Kognitio, MarkLogic, Microsoft, NetApp, StackIQ, Syncsort, Talend, 10gen, Teradata and VMware all expressed their support and excitement for Hortonworks Data Platform.

Those who have followed Hortonworks since our initial launch already know that we are absolutely committed to open source and the Apache Software Foundation. You will be glad to know that our commitment remains the same today. We don’t hold anything back. No proprietary code is being developed at Hortonworks.

Read More

Apache Hadoop 2.0 (Alpha) Released

As the release manager for the Apache Hadoop 2.0 release, it gives me great pleasure to share that the Apache Hadoop community has just released Apache Hadoop 2.0.0 (alpha)! While only an alpha release (read: not ready to run in production), it is still an important step forward as it represents the very first release that delivers new and important capabilities, including:

Read More

Executive Video Series: Introduction to HDFS

The latest video in the Hortonworks Executive Video Series features Sanjay Radia, Hortonworks co-founder and Apache Hadoop PMC member. Sanjay is well known in the HDFS circles, having contributed to Hadoop for the past 4+ years. In this video, Sanjay gives a good overview of HDFS, the primary storage system for Hadoop, and provides some insight into both the 0.23 release as well as what can be expected from HDFS over the rest of 2012. He hits on some key elements such as federation, snapshots and improving the overall storage efficiency of HDFS.

If you would like to learn more about HDFS and where it is heading, make sure to attend Hadoop Summit next month in San Jose. At the conference, Sanjay will be presenting HDFS – What is New and Future together with Suresh Srinivas of Hortonworks, as well as Apache Hadoop and Virtual Machines together with Richard McDougall of VMWare.

Read More

Go to page:12