Category Archives: HBase


Introduction to HBase Mean Time to Recover (MTTR)

The following post is from Nicolas Liochon and Devaraj Das with thanks to all members of the HBase team.

HBase is an always-available service and remains available in the face of machine failures and rack failures. Machines in the cluster runs RegionServer daemons. When a RegionServer crashes or the machine goes offline, the regions it was hosting goes offline as well. The focus of the MTTR work in HBase is to be able to detect abnormalities and to be able to restore access to (failed) offlined regions as early as possible.

In talking with customers and users, it turned out that MTTR for HBase regions is one of the significant concerns. A lot of improvements were implemented recently. In this blog post and a couple after this one, we will go over the work the HBase team in Hortonworks, and the community at large, has done, in the area of MTTR. We will also talk about some of them at HBaseCon 2013 in June.

This blog explains how HBase manages the MTTR. In this blog, we introduce some of the settings available in the released versions of HBase and HDFS.

How HBase is resilient to failures while being consistent

HBase ensures consistency by having a single server responsible for a subset of data. Namely, a region is managed by a single region server at a time.

The resiliency to failures comes from HDFS, as data written in HDFS is replicated on several nodes:

  • HBase writes the data in HFiles, stored in HDFS. HDFS replicates the blocks of these files, by default 3 times.
  • HBase uses a commit log (or Write-Ahead-Log, WAL), and this commit log is as well written in HDFS, and as well replicated, again 3 times by default.

 Steps in the failure detection and recovery process

  • Identifying that a node is down: a node can cease to respond simply because it is overloaded or as well because it is dead.
  • Recovering the writes in progress: that’s reading the commit log and recovering the edits that were not flushed.
  • Reassigning the regions: the region server was previously handling a set of regions. This set must be reallocated to other region servers, depending on their respective workload.

What are the pain points? Until the detection and recovery steps have happened, the client is blocked – a single major pain point! Expediting the process, so that clients see less downtime of their data while preserving data consistency is what MTTR is all about.

Detecting node failures

There are multiple ways for a region server to die: It can be a clean stop, i.e., the administrator calls the stop function on the region server. This allows the region server to properly close the regions and tell the HMaster that the shutdown is in progress. In this case the commit log is purged and the HMaster starts the assignment of the regions immediately.

Another way for the region server to stop, is the silent death of the computer, for example if the network card dies or if the ethernet cable is unplugged. In this case, the region server cannot raise an alarm. This is handled in HBase with the help of ZooKeeper: each region server is connected to ZooKeeper, and the master watches these connections. ZooKeeper itself manages an heartbeat with a timeout. So, on a timeout, the HMaster declares the region server as dead, and starts the recovery process.

Recovering in-progress writes

There is a single semantic commit log consisting of multiple files for all the user regions in a region server. When a region server dies, the recovery of the commit logs happens. The recovery is done in parallel, and as a first step, random region servers picks up commit logs (from the well known commit log directory), and splits them by edits-per-region into separate files on the HDFS. The regions are then reassigned to random region servers, and each regionServer then reads the edits from the respective log split file(s) to recover the correct region state. The difficulty arises when it’s not a simple process crash, but a node failure. The region server on the crashed node would have written the blocks locally on to the local DataNode (the standard HDFS client behavior). Assuming a replication factor of three, when a box is lost, you are losing not only a region server, but as well one of the three replicas. Doing the split means reading the block. As 33% of the replicas are dead, it means that for each block you’ve got 33% chance to be directed to the wrong replica. Moreover, the split process writes new files. Each of these files will be replicated 3 times: any of these replicas can be assigned to the dead datanode: the write will fail after a timeout, and will go to another datanode, slowing the recovery.

Assigning the regions

Here, the job is to reassign as fast as possible. Assignment relies on ZooKeeper, and requires synchronisation between the master and the region servers through ZooKeeper.

The MTTR improvements

Detecting node failures

First, it’s possible to lower the default timeout value. By default, HBase is configured with a 3 minutes ZooKeeper (ZK) timeout. This ensures that the Garbage Collection (GC) won’t interfere (GC pauses leads to ZK timeouts and lead to false failure detections). For production system, it’s more sensible to configure to one minute, or 30 seconds if you do care about MTTR. A reasonable minimum is around 20 seconds, even if there are some users who reported less. So you can change hbase.zookeeper.timeout to 60000 in hbase-site.xml. You’d also need to tweak your GC settings appropriately (incremental, generational GC with good figures for the young and old generations, etc., this is a topic by itself) so that you do not have the GC pauses longer than the ZK timeout.

Recovering in-progress writes

In standard cases, there are enough surviving region servers to split in parallel all the files of the commit log. So the issue is really to get directed to the only the live HDFS replicas. The solution for this is to configure HDFS in order to have a faster failure detection in HDFS than in HBase. That is, if in HBase you have a timeout of 60s, HDFS should consider a node as dead after 20 seconds. Here we must detail how HDFS handles dead nodes: HDFS failure detection relies as well on a heartbeat and timeout, managed by the NameNode. In HDFS, when a node is declared as dead, the replicas it contained are duplicated to the surviving datanodes. It’s an expensive process, and, when multiple nodes dies simultaneously, it can trigger “replication storms”: all the replicas are replicated again, leading to an overloaded system, then to non responding nodes, then to nodes being declared dead as well, then to new blocks being replicated, and so on. For this reason, HDFS waits a long time before starting this recovery process: a little bit more than 10 minutes. This is an issue for a low latency software such as HBase: going to dead datanodes means hitting timeouts. In the last HDFS versions 1.0.4 or 1.2, and branches 2 and 3, it’s possible to use a special state: ‘stale’. An HDFS node is stale when it has not sent a heartbeat for more than a configurable amount of time. A node in this state is used only as a last resort for reads, and excluded for writes. So activating these settings will make the recovery much faster. In HDFS 1.1, only the read path takes into account the stale status, but versions 1.2, 2.0 and 3.0 use it for both reads and writes.

The way to set it has changed between the releases: they are, in hdfs-site.xml [via the Apache jiras HDFS-3912, HDFS-4350]:

<!-- stale mode - 1.2+ -->

<property>
   <name>dfs.namenode.avoid.read.stale.datanode</name>
   <value>true</value>
</property>

<property>
   <name>dfs.namenode.avoid.write.stale.datanode</name>
   <value>true</value>
</property>

<property>
   <name>dfs.namenode.write.stale.datanode.ratio</name>
   <value>1.0f</value>
</property>

<!-- stale mode - branch 1.1.1+ -->

<property>
   <name>dfs.namenode.check.stale.datanode</name>
   <value>true</value>
</property>

Assigning the regions

This is pure HBase internals. In HBase 0.94+, the assignment process has been improved to allow to assign more regions with less internal synchronisation, especially in the master [example - Apache jira HBASE-7247].

Conclusion

There are no global failures in HBase: if a region server fails, all the other regions are still available. For a given data-subset, the MTTR was often considered as around ten minutes. This rule of thumb was actually coming from a common case where the recovery was taking time because it was trying to use replicas on a dead datanode. Ten minutes would be the time taken by HDFS to declare a node as dead. With the new stale mode in HDFS, it’s not the case anymore, and the recovery is now bounded by HBase alone. If you care about MTTR, with the settings mentioned here, most cases will take less than 2 minutes between the actual failure and the data being available again in another region server.

Apache HBase 0.94.5 is out!

Last week, the HBase community released 0.94.5, which is the most stable release of HBase so far. The release includes 76 jira issues resolved, with 61 bug fixes, 8 improvements, and 2 new features.

Most of the bug fixes went against the REST server, replication, region assignment, secure client, flaky unit tests, 0.92 compatibility and various stability improvements. Some of the interesting patches in this release are:
[HBASE-3996] – Support multiple tables and scanners as input to the mapper in map/reduce jobs
[HBASE-5416] – Improve performance of scans with some kind of filters.
[HBASE-7757] – Add web UI to REST server and Thrift server
[HBASE-7748] – Add DelimitedKeyPrefixRegionSplitPolicy
[HBASE-6669] – Add BigDecimalColumnInterpreter for doing aggregations using AggregationClient
[HBASE-7728] – Deadlock occurs between hlog roller and blog syncer’

The release candidate has been extensively tested by Hortonworks and many others in the community. You can roll out the 0.94.5 bits using rolling upgrade on top of 0.92 or 0.94 releases. In addition, Apache HBase 0.94.5 will be incorporated into an upcoming update to HDP 1.2.

You can download the new release from here, and find full release notes here.

Last, but not least, we would like to thank Lars Hofhansl, who is the release manager of 0.94 branch for driving the release train, and all 30 individuals, who have contributed to this release.

Apache HBase Region Splitting and Merging

For this post, we take a technical deep-dive into one of the core areas of HBase. Specifically, we will look at how Apache HBase distributes load through regions, and manages region splitting. HBase stores rows of data in tables. Tables are split into chunks of rows called “regions”. Those regions are distributed across the cluster, hosted and made available to client processes by the RegionServer process. A region is a continuous range within the key space, meaning all rows in the table that sort between the region’s start key and end key are stored in the same region. Regions are non-overlapping, i.e. a single row key belongs to exactly one region at any point in time. A region is only served by a single region server at any point in time, which is how HBase guarantees strong consistency within a single row#. Together with the -ROOT- and .META. regions, a table’s regions effectively form a 3 level B-Tree for the purposes of locating a row within a table.

A Region in turn, consists of many “Stores”, which correspond to column families. A store contains one memstore and zero or more store files. The data for each column family is stored and accessed separately.

A table typically consists of many regions, which are in turn hosted by many region servers. Thus, regions are the physical mechanism used to distribute the write and query load across region servers. When a table is first created, HBase, by default, will allocate only one region for the table. This means that initially, all requests will go to a single region server, regardless of the number of region servers. This is the primary reason why initial phases of loading data into an empty table cannot utilize the whole capacity of the cluster.

Pre-splitting

The reason HBase creates only one region for the table is that it cannot possibly know how to create the split points within the row key space. Making such decisions is based highly on the distribution of the keys in your data. Rather than taking a guess and leaving you to deal with the consequences, HBase does provide you with tools to manage this from the client. With a process called pre-splitting, you can create a table with many regions by supplying the split points at the table creation time. Since pre-splitting will ensure that the initial load is more evenly distributed throughout the cluster, you should always consider using it if you know your key distribution beforehand. However, pre-splitting also has a risk of creating regions, that do not truly distribute the load evenly because of data skew, or in the presence of very hot or large rows. If the initial set of region split points is chosen poorly, you may end up with heterogeneous load distribution, which will in turn limit your clusters performance.

There is no short answer for the optimal number of regions for a given load, but you can start with a lower multiple of the number of region servers as number of splits, then let automated splitting take care of the rest.

One issue with pre-splitting is calculating the split points for the table. You can use the RegionSplitter utility. RegionSplitter creates the split points, by using a pluggable SplitAlgorithm. HexStringSplit and UniformSplit are two predefined algorithms. The former can be used if the row keys have a prefix for hexadecimal strings (like if you are using hashes as prefixes). The latter divides up the key space evenly assuming they are random byte arrays. You can also implement your custom SplitAlgorithm and use it from the RegionSplitter utility.

$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1

where -c 10, specifies the requested number of regions as 10, and -f specifies the column families you want in the table, separated by “:”. The tool will create a table named “test_table” with 10 regions:

13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME =&gt; 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY =&gt; '', ENDKEY =&gt; '19999999', ENCODED =&gt; acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME =&gt; 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY =&gt; '19999999', ENDKEY =&gt; '33333332', ENCODED =&gt; 37ec12df6bd0078f5573565af415c91b,}
...

If you have split points at hand, you can also use the HBase shell, to create the table with the desired split points.

hbase(main):015:0&gt; create 'test_table', 'f1', SPLITS=&gt; ['a', 'b', 'c']

or

$ echo -e  "a\nb\nc" &gt;/tmp/splits
hbase(main):015:0&gt; create 'test_table', 'f1', SPLITSFILE=&gt;'/tmp/splits'

For optimum load distribution, you should think about your data model, and key distribution for choosing the correct split algorithm or split points. Regardless of the method you chose to create the table with pre determined number of regions, you can now start loading the data into the table, and see that the load is distributed throughout your cluster. You can let automated splitting take over once data ingest starts, and continuously monitor the total number of regions for the table.

Auto splitting

Regardless of whether pre-splitting is used or not, once a region gets to a certain limit, it is automatically split into two regions. If you are using HBase 0.94 (which comes with HDP-1.2), you can configure when HBase decides to split a region, and how it calculates the split points via the pluggable RegionSplitPolicy API. There are a couple predefined region split policies: ConstantSizeRegionSplitPolicy, IncreasingToUpperBoundRegionSplitPolicy, and KeyPrefixRegionSplitPolicy.

The first one is the default and only split policy for HBase versions before 0.94. It splits the regions when the total data size for one of the stores (corresponding to a column-family) in the region gets bigger than configured “hbase.hregion.max.filesize”, which has a default value of 10GB. This split policy is ideal in cases, where you are have done pre-splitting, and are interested in getting lower number of regions per region server.

The default split policy for HBase 0.94 and trunk is IncreasingToUpperBoundRegionSplitPolicy, which does more aggressive splitting based on the number of regions hosted in the same region server. The split policy uses the max store file size based on Min (R^2 * “hbase.hregion.memstore.flush.size”, “hbase.hregion.max.filesize”), where R is the number of regions of the same table hosted on the same regionserver. So for example, with the default memstore flush size of 128MB and the default max store size of 10GB, the first region on the region server will be split just after the first flush at 128MB. As number of regions hosted in the region server increases, it will use increasing split sizes: 512MB, 1152MB, 2GB, 3.2GB, 4.6GB, 6.2GB, etc. After reaching 9 regions, the split size will go beyond the configured “hbase.hregion.max.filesize”, at which point, 10GB split size will be used from then on. For both of these algorithms, regardless of when splitting occurs, the split point used is the rowkey that corresponds to the mid point in the “block index” for the largest store file in the largest store.

KeyPrefixRegionSplitPolicy is a curious addition to the HBase arsenal. You can configure the length of the prefix for your row keys for grouping them, and this split policy ensures that the regions are not split in the middle of a group of rows having the same prefix. If you have set prefixes for your keys, then you can use this split policy to ensure that rows having the same rowkey prefix always end up in the same region. This grouping of records is sometimes referred to as “Entity Groups” or “Row Groups”. This is a key feature when considering use of the “local transactions” (alternative link) feature in your application design.

You can configure the default split policy to be used by setting the configuration “hbase.regionserver.region.split.policy”, or by configuring the table descriptor. For you brave souls, you can also implement your own custom split policy, and plug that in at table creation time, or by modifying an existing table:

HTableDescriptor tableDesc = new HTableDescriptor("example-table");
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, AwesomeSplitPolicy.class.getName());
//add columns etc
admin.createTable(tableDesc);

If you are doing pre-splitting, and want to manually manage region splits, you can also disable region splits, by setting “hbase.hregion.max.filesize” to a high number and setting the split policy to ConstantSizeRegionSplitPolicy. However, you should use a safeguard value of like 100GB, so that regions does not grow beyond a region server’s capabilities. You can consider disabling automated splitting and rely on the initial set of regions from pre-splitting for example, if you are using uniform hashes for your key prefixes, and you can ensure that the read/write load to each region as well as its size is uniform across the regions in the table.

Forced Splits

HBase also enables clients to force split an online table from the client side. For example, the HBase shell can be used to split all regions of the table, or split a region, optionally by supplying a split point.

hbase(main):024:0&gt; split 'b07d0034cbe72cb040ae9cf66300a10c', 'b'
0 row(s) in 0.1620 seconds

With careful monitoring of your HBase load distribution, if you see that some regions are getting uneven loads, you may consider manually splitting those regions to even-out the load and improve throughput. Another reason why you might want to do manual splits is when you see that the initial splits for the region turns out to be suboptimal, and you have disabled automated splits. That might happen for example, if the data distribution changes over time.

How Region Splits are implemented

As write requests are handled by the region server, they accumulate in an in-memory storage system called the “memstore”. Once the memstore fills, its content are written to disk as additional store files. This event is called a “memstore flush”. As store files accumulate, the RegionServer will “compact” them into combined, larger files. After each flush or compaction finishes, a region split request is enqueued if the RegionSplitPolicy decides that the region should be split into two. Since all data files in HBase are immutable, when a split happens, the newly created daughter regions will not rewrite all the data into new files. Instead, they will create  small sym-link like files, named Reference files, which point to either top or bottom part of the parent store file according to the split point. The reference file will be used just like a regular data file, but only half of the records. The region can only be split if there are no more references to the immutable data files of the parent region. Those reference files are cleaned gradually by compactions, so that the region will stop referring to its parents files, and can be split further.

Although splitting the region is a local decision made at the RegionServer, the split process itself must coordinate with many actors. The RegionServer notifies the Master before and after the split, updates the .META. table so that clients can discover the new daughter regions, and rearranges the directory structure and data files in HDFS. Split is a multi task process. To enable rollback in case of an error, the RegionServer keeps an in-memory journal about the execution state. The steps taken by the RegionServer to execute the split are illustrated by Figure 1. Each step is labeled with its step number. Actions from RegionServers or Master are shown in red, while actions from the clients are show in green.

1. RegionServer decides locally to split the region, and prepares the split. As a first step, it creates a znode in zookeeper under /hbase/region-in-transition/region-name in SPLITTING state.
2. The Master learns about this znode, since it has a watcher for the parent region-in-transition znode.
3. RegionServer creates a sub-directory named “.splits” under the parent’s region directory in HDFS.
4. RegionServer closes the parent region, forces a flush of the cache and marks the region as offline in its local data structures. At this point, client requests coming to the parent region will throw NotServingRegionException. The client will retry with some backoff.
5. RegionServer create the region directories under .splits directory, for daughter regions A and B, and creates necessary data structures. Then it splits the store files, in the sense that it creates two Reference files per store file in the parent region. Those reference files will point to the parent regions files.
6. RegionServer creates the actual region directory in HDFS, and moves the reference files for each daughter.
7. RegionServer sends a Put request to the .META. table, and sets the parent as offline in the .META. table and adds information about daughter regions. At this point, there won’t be individual entries in .META. for the daughters. Clients will see the parent region is split if they scan .META., but won’t know about the daughters until they appear in .META.. Also, if this Put to .META. succeeds, the parent will be effectively split. If the RegionServer fails before this RPC succeeds, Master and the next region server opening the region will clean dirty state about the region split. After the .META. update, though, the region split will be rolled-forward by Master.
8. RegionServer opens daughters in parallel to accept writes.
9. RegionServer adds the daughters A and B to .META. together with information that it hosts the regions. After this point, clients can discover the new regions, and issue requests to the new region. Clients cache the .META. entries locally, but when they make requests to the region server or .META., their caches will be invalidated, and they will learn about the new regions from .META..
10. RegionServer updates znode /hbase/region-in-transition/region-name in zookeeper to state SPLIT, so that the master can learn about it. The balancer can freely re-assign the daughter regions to other region servers if it chooses so.
11. After the split, meta and HDFS will still contain references to the parent region. Those references will be removed when compactions in daughter regions rewrite the data files. Garbage collection tasks in the master periodically checks whether the daughter regions still refer to parents files.  If not, the parent region will be removed.

Region Merges

Unlike region splitting, HBase at this point does not provide usable tools for merging regions. Although there are HMerge, and Merge tools, they are not very suited for general usage. There currently is no support for online tables, and auto-merging functionality. However, with issues like OnlineMerge, Master initiated automatic region merges, ZK-based Read/Write locks for table operations, we are working to stabilize region splits and enable better support for region merges. Stay tuned!

Conclusion

As you can see, under-the-hood HBase does a lot of housekeeping to manage regions splits and do automated sharding through regions. However, HBase also provides the necessary tools around region management, so that you can manage the splitting process. You can also control precisely when and how region splits are happening via a RegionSplitPolicy.

The number of regions in a table, and how those regions are split are crucial factors in understanding, and tuning your HBase cluster load. If you can estimate your key distribution, you should create the table with pre-splitting to get the optimum initial load performance. You can start with a lower multiple of number of region servers as a starting point for initial number of regions, and let automated splitting take over. If you cannot correctly estimate the initial split points, it is better to just create the table with one region, and start some initial load with automated splitting, and use IncreasingToUpperBoundRegionSplitPolicy. However, keep in mind that, the total number of regions will stabilize over time, and the current set of region split points will be determined from the data that the table has received so far. You may want to monitor the load distribution across the regions at all times, and if the load distribution changes over time, use manual splitting, or set more aggressive region split sizes. Lastly, you can try out the upcoming online merge feature and contribute your use case.

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

HDP Monitor The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. Moreover, for a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play in HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500]
   .name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount
   .cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class)
   .order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

HBase Futures

As we have said here, Hortonworks has been steadily increasing our investment in HBase. HBase’s adoption has been increasing in the enterprise. To continue this trend, we feel HBase needs investments in the areas of:

  1. Reliability and High Availability (all data always available, and recovery from failures is quick)
  2. Autonomous operation (minimum operator intervention)
  3. Wire compatibility (to support rolling upgrades across a couple of versions at least)
  4. Cross data-center replication (for disaster recovery)
  5. Snapshots and backups (be able to take periodic snapshots of certain/all tables and be able to restore them at a later point if required)
  6. Monitoring and Diagnostics (which regionserver is hot or what caused an outage)

Significant work has happened in each of the areas outlined above in the 0.94 and 0.96 (currently trunk) branches. For example, the MTTR (mean time to recover) work happening in HBASE-5843 will improve the data availability significantly. HBASE-5305 addresses wire compatibility. HBASE-6055 is the work underway on Snapshots. We believe by solving the above problems, HBase will gain a much wider adoption in the enterprise, and will be considered a very viable option for the use cases it supports.

Doing the above would open HBase to many of the enterprise users, and going forward, we envisage the need for:

  1. Better and improved clients (asynchronous clients, and, in multiple languages)
  2. Cell-level security (access control for every cell in a table)
  3. Multi-tenancy (HBase becomes a viable shared platform for multiple applications using it)
  4. Secondary indexing functionality

The above are some of the areas that Hortonworks is investing in as well. Stay tuned for further updates on these topics.

HBase at Hortonworks: An Update

HBase is a critical component of the Apache Hadoop ecosystem and a core component of the Hortonworks Data Platform.  HBase enables a host of low latency Hadoop use-cases; As a publishing platform, HBase exposes data refined in Hadoop to outside systems; As an online column store, HBase supports the blending of random access data read/write with application workloads whose data is directly accessible to Hadoop MapReduce.

The HBase community is moving forward aggressively, improving HBase in many ways.  We are in the process of integrating HBase 0.94 into our upcoming HDP 1.1 refresh.  This “minor upgrade” will include a lot of bug fixes (nearly 200 in number) and quite a few performance improvements and will be wire compatible with HBase 0.92 (in HDP 1.0). Here are some notable ones:

  1. HBASE-4128 – Data Block Encoding of KeyValues (aka delta encoding / prefix compression) [PERFORMANCE]
  2. HBASE-4465 – Lazy-seek optimization for StoreFile scanners [PERFORMANCE]
  3. HBASE-5074 – support checksums in HBase block cache [PERFORMANCE]
  4. HBASE-5128 – [uber hbck] Online automated repair of table integrity and region consistency problems [OPERABILITY]
  5. HBASE-3584 – Allow atomic put/delete in one call [FEATURE]
  6. HBASE-5229 – Provide basic building blocks for “multi-row” local transactions [FEATURE]

And 0.94 is only the start.  Expect to see an a huge set of additional features, bug fixes, performance and operational improvements to HBase in the coming months.  As more of our customers have deployed HBase it has become an increasingly important component of HDP 1.  As a result, we’ve really been ramping up our investment in HBase this year, with a focus on enhancing HBase stability and operability.  What follows is a summary of Hortonworkers recent HBase contributions.

1. Reliability improvements

We have established an automated test harness for testing HBase on a nightly basis. The harness involves automated deployment of HBase with a ‘production like’ configuration. After the cluster has been set up, a few heavy duty jobs are run. This has uncovered numerous bugs in the 0.92.x line.

Some of them are:

  • HBASE-5986: Clients can see holes in the META table when regions are being split
  • HBASE-6160: META entries from daughters can be deleted before parent entries
  • HBASE-6679: RegionServer aborts due to race between compaction and split
  • HBASE-6060: Regions’s in OPENING state from failed regionservers takes a long time to recover
  • HBASE-6649: TestReplication.queueFailover occasionally fails [Part-1]
  • HBASE-6758: The replication-executor should make sure the file that it is replicating is closed before declaring success on that file

2. Test Infrastructure Improvements

One of the biggest needs in the community is a good testing framework for HBase. As HBase is becoming more popular as a NoSQL data store, we need to make sure that the system is highly available and reliable in the face of common node failures, and that it is able to withstand the intense, high stress workloads users expect in production environments.

Towards this end we have been building an automated test framework inspired by Netflix’s ChaosMonkey tool. It can run a series of tests, while killing and restarting HBase servers and validate that the test results are correct. This brings to the fore the availability and reliability aspects of the system. For example, if a RegionServer is killed, another RegionServer or a set of RegionServers should pick the data that the killed RegionServer was serving.

Using the APIs provided by this testing framework, one can convert many of the tests in the HBase codebase to run in either unit test mode or in this new challenging “real cluster mode”. The test framework is part of the HBase codebase (via HBASE-6241), and many candidate tests have been identified that can be ported to use the new framework.

For details, please visit HBASE-6201. Slides are available here.

3. Windows Port of HBase

The Microsoft Windows port and certification of HBase is an ongoing joint development effort invovling Hortonworks and Microsoft engineers.  We recently reached an important milestone, getting all of the hbase-0.94 unit tests passing on Windows. Work is underway to commit all the patches to HBase mainline under the umbrella jira HBASE-6814. We are well on the way to our goal of having HBase run equally well on Windows and Unix, opening up the Apache HBase community to a whole new universe of potential users and contributors.

4. HBase with NameNode HA setup and validation

We’ve been working to validate that HBase runs well with the new Apache Hadoop 1.0 HA features.  The HBase HA testing blog is here .

5. The wire-compatibility work targeted for 0.96.x release.

We have done substantial work to move all protocols in HBase including the RPC implementation to use Google’s Protocol Buffers. Most of the work is captured in this umbrella jira – HBASE-5305.

All of the above is just what we’ve been doing recently and Hortonworkers are only a small fraction of the HBase contributor base.  When one factors in all the great contributions coming from across the Apache HBase community, we predict 2013 is going to be a great year for HBase.  HBase is maturing fast, becoming both more operationally reliable and more feature rich.

Full stack HA in Hadoop 1: HBase’s Resilience to Namenode Failover

In this blog, I’ll cover how we tested Full Stack HA with NameNode HA in Hadooop 1 with Hadoop and HBase as components of the stack.

Yes, NameNode HA is finally available in the Hadoop 1 line. The test was done with Hadoop branch-1 and HBase-0.92.x on a cluster of roughly ten nodes. The aim was to try to keep a really busy HBase cluster up in the face of the cluster’s NameNode repeatedly going up and down. Note that, HBase would be functional during the time NameNode would be down. It’d only affect those operations that requires a trip to the NameNode (for example, rolling of the WAL, or compaction, or flush), and those would affect only the relevant end users (a user using the HBase get API may not be affected if that get didn’t require a new file open, for example).

HBase was kept busy by running a load test – LoadTestTool (available in 0.92 branch), with a set of arguments (number of reader/writer threads, sizes of rows, etc.) that were selected induced significant pressure on the HBase cluster. In turn, the configuration of HBase was artificially modified so that HBase would make lots of trips to the NameNode for file operations (low flush thresholds, very low major compactions frequency). For the test, the NameNode was repeatedly brought up and down (specifically, a loop of “bring down the namenode, let it remain down for a small period of time, bring up the namenode, let it remain up for another period of time”). This stop-start-pattern had some randomness built into it.

The cluster kept up reasonably well with the above load and the failure mode. But we also saw that we were losing HBase RegionServers somewhat randomly. Upon a close analysis of the logs on the NameNode & RegionServers, what seemed to be the case was that file lengths were not recorded correctly in the edit-logs. This issue turned out to be a known issue, that was addressed in HDFS-1108. The fix was backported to Hadoop-1.0.x line. It should be noted that HA team at Hortonworks had fixed other issues and as is the usual practice for us, these fixes were applied to Apache Hadoop trunk and back ported to Hadoop 1.x line and will also be back-pported to the 2-alpha.

With the above fix in HDFS, the tests were rerun. The cluster remained up without any RegionServer losses for more than 48 hours. No glitches!

Well to be precise, the cluster started behaving weirdly since the datanodes started running out of space since the HBase load generation has successfully filled up the HDFS capacity in spite of repeated NameNode restarts. (I should file some jiras to handle that more gracefully!). While my tests were not using the automated failover of the NameNode node one can now configure the NameNode in Hadoop 1 to automatically failover using industry proven solutions as described in Sanjay’s post; the HBase community can start deploying NameNode HA along with resilience as the Namenode fails over.

Sanjay’s blog gives more details on how to deploy NameNode HA. Please get in touch with me (ddas@hortonworks.com) or Sanjay (sanjay@hortonworks.com) if you need more details on NameNode HA, Full Stack HA with respect to HBase or any part of the above tests.