Category Archives: Apache Hadoop


Hortonworks Sandbox — the Fastest On Ramp to Apache Hadoop

Go from Zero to Big Data in 15 Minutes!

Today Hortonworks announced the availability of the Hortonworks Sandbox, an easy-to-use, flexible and comprehensive learning environment that will provide you with fastest on-ramp to learning and exploring enterprise Apache Hadoop.

The Hortonworks Sandbox is:

  • A free download
  • A complete, self contained virtual machine with Apache Hadoop pre-configured
  • A personal, portable and standalone Hadoop environment
  • A set of hands-on, step-by-step tutorials that allow you to learn and explore Hadoop on your own

The Hortonworks Sandbox is designed to help close the gap between people wanting to learn and evaluate Hadoop, and the complexities of spinning up an evaluation cluster of Hadoop. The Hortonworks Sandbox provides a powerful combination of hands-on, step-by-step tutorials paired with an easy to use Web interface designed to lower the learning curve for people who just want to explore and evaluate Hadoop, as quickly as possible.

One of our key focus areas is enabling Hadoop as an enterprise-viable platform that is easy to use and consume by our customers and the broader ecosystem. Over the past year or so, we have seen the complex and disjointed experience people face trying to learn Hadoop, and with the Sandbox, it allows you to have the fastest onramp to Apache Hadoop. We want the Sandbox to deliver an integrated, easy-to-use, easily updateable learning environment. Ongoing updates to the tutorials are planned, delivering new, interesting hands-on exercises, exploring different features and use cases.

These tutorials are built based on the experience gained training thousands of people in our Hortonworks University Training classes. As we continue to build out the Sandbox, we will provide additional levels of sophistication – think of it as the Hadoop 101, 201 and 301 levels of learning. And, the process of updating the tutorials is easy through the click of the “Update” button, initiating a lightweight download of just the tutorial content.

The Sandbox is a single node implementation of the Hortonworks Data Platform (HDP) 1.2 that behaves just like a normal Hadoop environment, which allows you to add your own datasets in an isolated protected environment to evaluate the use of Hadoop in your own data architectures.

Use the Sandbox to:

  • Explore Hadoop on your own
  • Plan out the integration points of your proof of concept project
  • Prepare for a more complex pilot deployment

When you are ready, you can download and deploy the Hortonworks Data Platform with the confidence that you have thought through exactly how and where Hadoop can help.

What can you expect from us in the coming months with the Hortonworks Sandbox?

  1. Join us for a special launch webinar on February 5, “Go from Zero to Big Data in 15 Minutes“. I will be hosting this webinar with one of our awesome Solution Engineers who will give you a sneak peek at some cool use cases for the Sandbox.
  2. New tutorials released on roughly a monthly basis.
  3. Demos and exercises of the integration with the tools and applications from our eco-system partners like Teradata, Alteryx, Datameer, and Microsoft. How cool would it be to run Excel on top of a personal Hadoop environment?? Well, that’s coming, so check back often.

I’m excited that you will be able to go from Zero to Big Data in 15 Minutes in a simple, easy-to-use fashion. And, I’m eager to hear your feedback – please let me know what you think of the Sandbox, what kinds of tutorials you would like to see and I would love to hear about your creative uses of the Sandbox. Leave your comments on this blog, Tweet out using #hwsandbox, comment in the Sandbox Forum, or email. The Hortonworks Sandbox is free and available for download here.

Hadoop in Perspective: Systems for Scientific Computing

When the term scientific computing comes up in a conversation it’s usually just the occasional science geek who shows signs of recognition. But although most people have little or no knowledge of the field’s existence, it has been around since the second half of the twentieth century and has played an increasingly important role in many technological and scientific developments. Internet search engines, DNA analysis, weather forecasting, seismic analysis, renewable energy, and aircraft modeling are just a small number of examples where scientific computing is nowadays indispensible.

Apache Hadoop is a newcomer in scientific computing, and is welcomed as a great new addition to already existing systems. In this post I mean to give an introduction to systems for scientific computing, and I make an attempt at giving Hadoop a place in this picture. I start by discussing arguably the most important concept in scientific computing: parallel computing; what is it, how does it work, and what tools are available? Then I give an overview of the systems that are available for scientific computing at SURFsara, the Dutch center for academic IT and home to some of the world’s most powerful computing systems. I end with a short discussion on the questions that arise when there’s many different systems to choose from.

Read More

Apache Hive 0.10.0 is Now Available

We are pleased to announce the the release of Apache Hive version 0.10.0. More than 350 JIRA issues have been fixed with this release. A few of the most important fixes include:

Cube and Rollup: Hive now has support for creating cubes with rollups. Thanks to Namit!

List Bucketing: This is an optimization that lets you better handle skew in your tables. Thanks to Gang!

Better Windows Support: Several Hive 0.10.0 fixes support running Hive natively on Windows. There is no more cygwin dependency. Thanks to Kanna!

Explain’ Adds More Info: Now you can do an explain dependency and the explain plan will contain all the tables and partitions touched upon by the query. Thanks to Sambavi!

Improved Authorization: The metastore can now optionally do authorization checks on the server side instead of on the client, providing you with a better security profile. Thanks to Sushanth!

Faster Simple Queries: Some simple queries that don’t require aggregations, and therefore MapReduce jobs, can now run faster.Thanks to Navis!

Better YARN Support: This release contains additional work aimed at making Hive work well with Hadoop YARN. While not all test cases are passing yet, there has been a lot of good progress made with this release. Thanks to Zhenxiao!

Union Optimization: Hive queries with unions will now result in a lower number of MapReduce jobs under certain conditions. Thanks to Namit!

Undo Your Drop Table: While not really truly ‘undo’, you can now reinstate your table after dropping it. Thanks to Andrew!

Show Create Table: The lets you see how you created your table. Thanks to Feng!

Support for Avro Data: Hive now has built-in support for reading/writing Avro data. Thanks to Jakob!

Skewed Joins: Hive’s support for joins involving skewed data is now improved. Thanks to Namit!

Robust Connection Handling at the Metastore Layer: Connection handling between a metastore client and server  and also between a metastore server and the database layer has been improved. Thanks to Bhushan and Jean!

More Statistics: Its now possible to collect and store scalar-valued statistics for your tables and partitions. This will enable better query planning in upcoming releases. Thanks to Shreepadma!

Better-Looking HWI : HWI now uses a bootstrap javascript library. It looks really slick. Thanks to Hugo!

If you are excited about some of these new features, I recommend that you download hive-0.10 from: Hive 0.10 Release.

The full Release Notes are available here: Hive 0.10.0 Release Notes

This release saw contributions from many different people. We have numerous folks reporting bugs, writing patches for new features, fixing bugs, testing patches, helping users on mailing lists etc. We would like to give a big thank you to everyone who made hive-0.10 possible.

-Ashutosh Chauhan

Apache Pig 0.10.1 Released

We are pleased to announce that Apache Pig 0.10.1 was recently released. This is primarily a maintenance release focused on stability and bug fixes. In fact, Pig 0.10.1 includes 42 new JIRA fixes since the Pig 0.10.0 release.

Some of the notable changes include:

  • Source code-only distribution

In the download section for Pig 10.0.1, you will now find a source-only tarball (pig-0.10.1-src.tar.gz) alongside the traditional full tarball, rpm and deb distributions.

  • Better support for Apache Hadoop 0.23.x/2.x

Starting with Pig 0.10.1, the Pig team will now publish Maven artifacts for Hadoop 0.23.x/2.x (PIG-2907). Note that if you are using Hadoop 0.23.x/2.x, you will need to get different Pig Maven artifacts than from Hadoop 0.20.x/1.x. Here is the information to retrieve the Pig Maven artifacts for Hadoop 0.23.x/2.x:

<dependency>
 
 <groupId>org.apache.pig</groupId>
 
 <artifactId>pig</artifactId>
 
 <version>0.10.1</version>
 
 <classifier>h2</classifier>
 
</dependency>

In addition, the Pig team fixed a number of bugs specific to Hadoop 0.23.x/2.x (including PIG-3035, PIG-2783, PIG-2761, PIG-2912, and PIG-2791).

  • Better support for Oracle JDK 7

All unit tests for Pig 0.10.1 now pass with Oracle JDK7 (PIG-2908).

  •  End-to-End (e2e) tests and unit tests fixes

We continue to improve Pig e2e testing. With the latest enhancements, we are able to significantly reduce runtime for Pig e2e tests (PIG-2711). We are trying hard to make e2e tests pass on all platforms (PIG-2859, PIG-2783, PIG-2745).

We have also included some fixes for unit tests (PIG-2908, PIG-2650, PIG-3099, PIG-2960) to make sure unit tests pass on all currently supported platforms.

  • Other fixes

There are a number of other important bug fixes in the core Pig code, UDF and documentation. Details can be found in this document.

Special thanks for the Apache Pig community for doing all of this great work to make these improvements happen!

~ Daniel Dai

Big Data Security Part Three: PacketPig Finding Zero Day Attacks

Introduction

This is part three of a Big Data Security blog series. You can read the previous two posts here: Part One / Part Two.

When Russell Jurney and I first teamed up to write these posts we wanted to do something that no one had done before to demonstrate the power of Big Data, the simplicity of Pig and the kind of Big Data Security Analytics we perform at Packetloop. Packetpig was modified to support Amazon’s Elastic Map Reduce (EMR) so that we could process a 600GB set of full packet captures. All that we needed was a canonical Zero Day attack to analyse. We were in luck!

In August 2012 a vulnerability in Oracle JRE 1.7 created huge publicity when it was disclosed that a number of Zero Day attacks had been report to Oracle in April but had still not been addressed in late August 2012. To make matters worse Oracle’s scheduled patch for JRE was months away (October 16). This position subsequently changed and a number of out-of-band patches for JRE were released for what became known as CVE-2012-4681 on the 30th of August.

The vulnerability exposed around 1 Billion systems to exploitation and the exploit was 100% effective on Windows, Mac OSX and Linux. A number of security researchers were already seeing the exploit in the wild as it was incorporated into exploit packs for the delivery of malware.

What is a Zero Day?

Put simply it’s any vulnerability that can be exploited without an available mitigation. The mitigation most people measure Zero Days by is a patch from the software vendor (in this case Oracle).

If we look at the timeline of this exploit you can see how long it was Zero Day for;

  • The Bug was introduced to JRE on July 28th 2011.
  • It was Disclosed to the public on April 2nd 2012.
  • The Exploit was available in the Metasploit Framework on August 26th 2012. With other PoC’s publicly available around the same time.
  • Detection was available via Snort IDS/IPS on August 28th 2012.
  • Lastly a Patch was available from Oracle on 30th August 2012.

If you compare the date the Bug was introduced and the date of the Patch the Zero Day time is 399 days. Comparing the date of Disclosure with the Patch date is still a staggering 150 days. To put this in perspective, a software bug that affects around 1 Billion devices was able to be exploited for well over a year and certainly was being seen in the wild. Whether you take the view that the Zero Day period is around 150 days (from disclosure)  or over a year (from introduction) both are extremely scary.

So how can you tell whether you were exploited using this JRE bug in the last 6 months or year? How can you prove your network or important systems haven’t been exploited using this vulnerability?

Finding Zero Day attacks

Packetpig provides you with the ability to search vast amounts of network packet captures for Zero Day attacks. To demonstrate this I executed the Metasploit Exploit for the JRE bug against a Windows XP workstation and recorded the packet capture. I then went and hid this 500KB capture amongst 600GB of Full Packet Captures from a system we monitor on the Internet. Every packet is captured to an S3 bucket so we can quickly scan the S3 bucket for Zero Days using Amazon’s Elastic Map Reduce.

So for the purpose of this demonstration as soon as the Snort Signatures were updated on the 28th of August I downloaded them. This allowed me to scan the 600GB of packet captures with the old signatures (in this case 2905) and then again with the new signatures (in this case 2931).

Let’s run through the Packetpig job ‘snort_comparison.pig‘ to see how this was done. The key to understanding the job is that we use the Packetpig SnortLoader() to scan the network packet captures with the old signatures and again with the new signatures. Anything in the old signature scan is removed from the new signature scan leaving only the Zero Day attacks.

In the same way as our last post we setup a number of variables using an include.pig file. After that we define old_snort_conf and new_snort_conf;

%DEFAULT includepath pig/include.pig
RUN $includepath;
 
%DEFAULT time 60
 
-- for local mode: uncomment the next line and comment the one after that
--%DEFAULT old_snort_conf 'lib/snort-2905/etc/snort.conf'
%DEFAULT old_snort_conf '/mnt/var/lib/snort-2905/etc/snort.conf'
 
-- for local mode: uncomment the next line and comment the one after that
--%DEFAULT new_snort_conf 'lib/snort-2931/etc/snort.conf'
%DEFAULT new_snort_conf '/mnt/var/lib/snort-2931/etc/snort.conf'

The SnortLoader() is used with the old snort.conf and the new snort.conf to scan the packet captures;

snort_old_alerts =
    LOAD '$pcap'
    USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$old_snort_conf')
    AS (
        ts:long,
        sig:chararray,
        priority:int,
        message:chararray,
        proto:chararray,
        src:chararray,
        sport:int,
        dst:chararray,
        dport:int
);
 
snort_new_alerts =
    LOAD '$pcap'
    USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$new_snort_conf')
    AS (
        ts:long,
        sig:chararray,
        priority:int,
        message:chararray,
        proto:chararray,
        src:chararray,
        sport:int,
        dst:chararray,
        dport:int
);
Next we group (COGROUP) the old and the new Snort scans and we filter out any signatures that appear in both;

snort_joined = COGROUP snort_old_alerts BY sig, snort_new_alerts BY sig;
new_only_filtered = FILTER snort_joined BY (COUNT(snort_old_alerts) == 0);

Lastly we re-project the data and then store it. The snort_comparison_new/part-r-00000 file is a verbose version of snort_comparison/summary/part-r-00000.

new_only_flattened = FOREACH new_only_filtered GENERATE FLATTEN(snort_new_alerts);
new_only_summary = FOREACH new_only_filtered GENERATE group, COUNT(snort_new_alerts);
 
STORE new_only_flattened INTO '$output/snort_comparison_new';
STORE new_only_summary INTO '$output/snort_comparison_summary';

To demonstrate this in practice I test the job on a small number of packet captures on my local development laptop. Watch the video to see how to do it.

Next I take it to the cloud and use 80 x m2.4large instances to process 600GB of full packet captures to find the Oracle JRE 1.7 attack. The 80 nodes spin up, install all the Packetpig software (bootstrap) and then go to work crunching the network packet captures. Check out the video to see the full process.

Proper Care and Feeding of Drives in a Hadoop Cluster: A Conversation with StackIQ’s Dr. Bruno

In a recent blog post, Hortonworks’ Steve Loughran discussed Apache Hadoop’s preference for JBOD-configured storage vs. the allure of RAID-0. As more enterprises are beginning to move beyond the science experiment stage and begin deploying Hadoop into their production environments, they are learning that Hadoop is quite different than other services in their data centers, such as web, mail, and database servers.They are learning that to achieve optimal performance, you need to pay particular attention to configuring the underlying hardware.

To find out more, we had a chat with Dr. Greg Bruno, VP of Engineering, and co-founder of StackIQ, a Hortonworks partner, about the real life implications of managing hard drives (HDDs) in a modern Hadoop cluster.

Q. Why isn’t it considered good practice to configure drives in Hadoop clusters as RAID-0 disk arrays?

A. Hadoop prefers a set of separate disks to the same set managed as a RAID-0 disk array. Read speeds are particularly important to the performance of a Hadoop cluster, and in his post, Steve makes the point that since drive speeds vary, and RAID-0 reads occur at the speed of the slowest disk in the array, a RAID-0 configuration may well be slower than a non-RAID configuration. The bigger issue, in my opinion, is reliability. If a set of disks is configured as a RAID-0 array, then one disk failure in that array will take that entire volume down, and if all the disks in a node are configured as a single RAID-0 array, then a single disk failure will take all the node’s data down. By configuring multiple disks in a RAID-0 array, you magnify the probability of that volume going offline due to a single disk failure and you maximize the amount of data that goes offline when that single failure occurs.

Q: Modern servers have a lot of disks. What’s the impact of losing a single disk when you have 12 3TB drive in each node?

A:  When a single drive fails when Hadoop is configured in its default state, the ENTIRE NODE gets taken offline. Back when servers typically had 6 x 1.5TB drives in them, losing a single disk would cause the loss of 0.02% of total storage in a typical 10PB, three-replica setup. With today’s hardware — typically 12 x 3TB drives per node, losing a single disk results in the loss of five times as much data.

Q: Aren’t today’s HDDs much more reliable than they used to be? Is it worth the extra work to handle the rare cases when a drive fails?

A: While drives are much more reliable than they used to be, they are still the cause of the lion’s share of support tickets in a Hadoop cluster. In fact, according to Bharath Mundlapudi, a Core Hadoop Engineer while working at Yahoo, disk drive failures account for fully 50% of siteops trouble tickets. That’s more than three times the next highest source of tickets.

Q: What does that represent in real terms?

A: It represents a lot of work for systems administrators. How much depends on the size and age of the cluster in question. For example, Facebook, which has some very large clusters, reports that their failure detection and automated repair system is doing the work of approximately 200 full time system administrators.

Q: OK, but not many organizations have clusters that large. What about a typical enterprise setup?

A: Our experience indicates that a 1,000 node cluster containing 12,000 drives for a total raw storage capacity of 48 peta-bytes can expect about 3 drive failures a day in its third year of operation. Drive failure rates rise as the devices age. For a 500 node cluster, you’re looking at a drive failure every 17 hours or so.

Q: Doesn’t this make it hard for the cluster operator to manage? How do they keep up?

A: Without the right tools and methodology, it is very difficult for cluster operators to manage clusters at scale. They typically have to write scripts to scan the cluster, detect disk failures, and report them. Then, once the offending drive has been replaced, commands must be run for the controller to recognize the new drive, OS commands need to be executed to format the drive, and then some Hadoop commands are required to add the disk back to the configuration.

Q: Presumably it’s not quite as challenging for StackIQ customers?

A: StackIQ’s mission is to make cluster operation as painless as possible, which is why we have developed tools to manage the entire lifecycle of the disk. While we haven’t figured out how to get our software to physically pull a bad drive and replace it with a new one, we automate the rest of it — from the initial deployment of the drive, detecting and reporting the error, and re-integrating the replacement drive into the configuration.

One of the features we’ve developed in StackIQ’s management software automatically configures chassis with LSI MegaRaid controllers into “JBODs”, that is, every disk in the chassis will be configured as an individual device.

In addition, a user can specify which disk they want in the chassis to be the boot disk via an attribute (e.g., “bootdisk0″) and if an optional secondary boot disk attribute is specified (“bootdisk1″), then our code will configure both those disks as a “mirror” (RAID1) while still making all the other non-boot disks available to Hadoop as individual disks.  A recent StackIQ customer made their purchasing decision on this feature alone, as they recently went through the painful exercise of changing a mid-size cluster’s RAID configuration by booting each server, one-by-one, catching a key press at the controller prompt, and fixing the configuration by-hand.  Not a fun exercise when you are under the gun by management to get production cluster online.

Q: With that many drive failures, clusters will be chewing through disks at a brisk rate. That could get expensive. That works out to something like 1000 drives/year X $100/drive = $100k per year just for replacement drives.

A: True, which speaks to the need for software which will make the most efficient use of your resources –  intelligent, automated cluster management software can find faulty drives automatically, and bring up a replacement drive quickly.

Q: Doesn’t automation take control out of the hands of the skilled cluster operators?

A: We believe it should be up to the cluster operator to set policies on how much automation to incorporate into their workflows. Our software reflects that philosophy, letting operators choose from a range of policies that go all the way from having the operator run all the commands manually, all the way to a fully automated repair where all the operator needs to do is push in the new drive and let StackIQ’s software do the rest.

Q: Can’t this be done with a simple command script that runs on all nodes?

A: That might be workable in a homogeneous environment, where all the nodes are the same. But in the real world, different nodes require different configurations. Even the disks are likely configured differently in nodes within the clusters. Handling those variables in a static script would be very difficult to accomplish. For example, if your cluster expands over time, you may be adding chassis with different drive configurations. Static scripts wouldn’t be able to deal with this situation. The StackIQ management software has intimate knowledge of the hardware and software in the cluster, so it knows exactly how to handle each drive in each node across the entire cluster, even in a heterogeneous environment.

Conclusion

So there you have it. The folks behind StackIQ cluster management software agree with Steve Loughran’s recommendation to forego RAID-0 for Hadoop clusters. In fact, they provide the management tools to make it easier to do. So take the advice of our experts, and configure your cluster servers as “Just a Bunch of Disks.”

For more information on StackIQ, please visit their website or follow their Twitter handle (@StackIQ). You can also follow Dr. Greg Bruno directly on his Twitter handle (@itsDrBruno).

~ Lisa Sensmeier

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

HDP Monitor The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. Moreover, for a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play in HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500]
   .name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount
   .cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class)
   .order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Hadoop Summit Europe Call for Papers Ends this Friday, November 30th

The Hadoop Summit Europe official call for papers ends this Friday, November 30th – so be sure to get your session submissions in this week!

Hadoop Summit Europe is March 20, 21 at the Beurs van Berlage in Amsterdam, Netherlands. You still have time to submit an abstract now!

The four content tracks are:

Applied Hadoop

Sessions in this track focus on applications, tools, algorithms and data science as well as areas of advanced research and emerging applications that use and extend the Hadoop platform. Sessions will cover examples of innovative data processing applications and algorithms for performing the most common statistical analysis as well as supporting the latest advances in artificial intelligence and machine learning.

Operating Hadoop

This track focuses on the deployment and operations of Hadoop clusters with an emphasis on tips, tricks, and best practices. Sessions will cover the full deployment lifecycle from installation, configuration, and initial production deployment to large-scale roll out. Reference architectures that maximize performance while minimizing costs will also be covered.

Hadoop Futures

This track takes a technical look at the key open source projects and research efforts driving innovation in and around the Hadoop platform. Attendees will hear from the technical leads, committers, and expert users who are actively driving the roadmaps, key features, and advanced technology research.

Integrating Hadoop

For many, Hadoop success will largely depend on the ability to integrate with existing data-driven and data management technologies. No matter if it is streaming, batch or real time interaction, these integration points are what exposes the value of Hadoop to the rest of the enterprise. This track This track focuses on Hadoop + enterprise (in particular databases, data warehouses, NoSQL, etc.). Sessions will explore these key integration points and will provide deployment and production examples of successful Hadoop integration within the enterprise today.

Announcing Chairs for Hadoop Summit Europe

Track Chairs have been named for Hadoop Summit Europe. Track Chairs will, in turn, select their track committees who, as a team, will decide which sessions are to be presented at Hadoop Summit Europe. They are as follow:

Operating Hadoop – Evert Lammerts, SARA

I joined Sara as a technical consultant in October 2008. In 2009 I started experimenting with non-traditional distributed processing and storage platforms, mainly Hadoop. I’m currently the lead Hadoop and related big data services. I’m also the organizer of the Dutch Hadoop User Group meetup, jury member for the 2012-2013 Norvig Web Data Science Award, and chair of the Operating Hadoop track of the first European Hadoop Summit.

Before joining Sara I lived in Hungary for four years, where I finished my studies Software Engineering at MTA SZTAKI. During those years I also worked as a short-term expert for the Dutch ministry of Agriculture, Nature Management, and Fisheries, in a Twinning project with our counterpart in Serbia. Right now I’m back in The Netherlands.

Applied Hadoop – Isabel Drost-Fromm, Mahout

Isabel Drost is member of the Apache Software Foundation. She is founder of the Berlin Buzzwords Conference, of the Apache Hadoop Get Together in Berlin, and co-organised of the first European NoSQL meetup. Isabel co-founded Apache Mahout and is active Apache Mahout committer. Isabel is actively engaged with communities of various Apache projects, e.g. Lucene and Hadoop. She is regular speaker at renown conferences on topics related to free software development, scalability, big data, Hadoop and Mahout. Currently Isabel Drost works for Nokia Gate 5 GmbH as Software Developer.

Integrating Hadoop – Lars George, Cloudera

Lars George has been involved with HBase since 2007, and became a HBase committer in 2009. He has spoken at many Hadoop User Group meetings, and conferences such as FOSDEM, QCon, and Hadoop World. He also started the Munich OpenHUG meetings. He now works for Cloudera to support Hadoop and HBase in and around Europe through technical support, consulting work, and training. He is also the author of O’Reilly’s “HBase – The Definitive Guide”.

Hadoop Futures – Steve Loughran, Hortonworks, Hadoop

Steve Loughran is a member of technical staff at Hortonworks, where he works on leading-edge developments within the Hadoop ecosystem, including service availability, cloud infrastructure integration, and emerging layers in the Hadoop stack.

Previously, he worked at HP Laboratories on large-scale distributed systems, including cloud computing infrastructures, dynamic Hadoop clusters and configuration management.

He is the author of Ant in Action, a member of the Apache Software Foundation, an active committer on the Hadoop core projects; an inactive committer on Apache Ant and Axis.

He lives and works in Bristol, England.

Call for Abstract is now Open!

The call for abstracts is now open. To submit an abstract, go here: http://hadoopsummit.org/amsterdam/call-for-papers/. The deadline for submission is November 30th, so hurry now!

Big Data Security Part Two: Introduction to PacketPig

Introduction

Packetpig is the tool behind Packetloop. In Part One of the Introduction to Packetpig I discussed the background and motivation behind the Packetpig project and problems Big Data Security Analytics can solve. In this post I want to focus on the code and teach you how to use our building blocks to start writing your own jobs.

The ‘building blocks’ are the Packetpig custom loaders that allow you to access specific information in packet captures. There are a number of them but two I will focus in this post are;

  • Packetloader() allows you to access protocol information (Layer-3 and Layer-4) from packet captures.
  • SnortLoader() inspects traffic using Snort Intrusion Detection software.

Calculating Bandwidth and Binning Time

The Packetloader() provides access to IP, TCP and UDP headers for each packet in the capture. A great example of it’s use is the ‘binning.pig‘ script. This script allows you to calculate the bandwidth used by TCP and UDP packets as well as total bandwidth at any period you define. You might want to calculate these totals every minute, hour, day, week or month to produce a graph.

Firstly run the binning script using the following command.

./pigrun.py -x local -r data/web.pcap -f pig/examples/binning.pig

Then open up output/binning/part-r-00000 in a text editor to see the output.

Now let’s walk through the script. Firstly let’s include all the jar’s required for Packetpig and binning.pig to run;

%DEFAULT includepath pig/include.pig
RUN $includepath;
Then the amount of time you want to bin your values into. In this case I want to output the values every minute (60 seconds) but I could easily change this to an hour (3600 seconds) by commenting and uncommenting the following lines;
%DEFAULT time 60
--%DEFAULT time 3600
Then I load the data out of the packet captures into quite a large schema using the Packetloader();
packets = load '$pcap' using com.packetloop.packetpig.loaders.pcap.packet.PacketLoader() AS (
    ts,
    ip_version:int,
    ip_header_length:int,
    ip_tos:int,
    ip_total_length:int,
    ip_id:int,
    ip_flags:int,
    ip_frag_offset:int,
    ip_ttl:int,
    ip_proto:int,
    ip_checksum:int,
    ip_src:chararray,
    ip_dst:chararray,
    tcp_sport:int,
    tcp_dport:int,
    tcp_seq_id:long,
    tcp_ack_id:long,
    tcp_offset:int,
    tcp_ns:int,
    tcp_cwr:int,
    tcp_ece:int,
    tcp_urg:int,
    tcp_ack:int,
    tcp_psh:int,
    tcp_rst:int,
    tcp_syn:int,
    tcp_fin:int,
    tcp_window:int,
    tcp_len:int,
    udp_sport:int,
    udp_dport:int,
    udp_len:int,
    udp_checksum:chararray
);

This is a very rich data model and through leveraging the timestamp (ts), size of the IP packet (ip_total_length), and size of the TCP (tcp_len) and UDP (udp_len) we can calculate total and respective bandwidths at any interval.  The beauty of pig is that I could easily hone in on specific hosts by grouping on the Source IP, Destination IP and Destination Port – but let’s keep things simple in this post.

The ip_proto field allows be to filter all packets based on protocol. TCP is IP protocol 6 and UDP is IP protocol 17.

tcp = FILTER packets BY ip_proto == 6;
udp = FILTER packets BY ip_proto == 17;

Once filtered we can bin each packet into a time period and then project a summary of the data with the size of all TCP packets in that time period (bin) summed.

tcp_grouped = GROUP tcp BY (ts / $time * $time);
tcp_summary = FOREACH tcp_grouped GENERATE group, SUM(tcp.tcp_len) AS tcp_len;

And then the same for UDP.

udp_grouped = GROUP udp BY (ts / $time * $time);
udp_summary = FOREACH udp_grouped GENERATE group, SUM(udp.udp_len) AS udp_len;

To get calculate total bandwidth of all IP packets we bin all packets using the same time period and then sum ip_total_length.

bw_grouped = GROUP packets BY (ts / $time * $time);
bw_summary = FOREACH bw_grouped GENERATE group, SUM(packets.ip_total_length) AS bw;
The output we were looking for is basically comma separated values for timestamp, tcp bandwidth, udp bandwidth and total bandwidth. This is produced by a final join and projection.
joined = JOIN tcp_summary BY group, udp_summary BY group, bw_summary BY group;
summary = FOREACH joined GENERATE tcp_summary::group, tcp_len, udp_len, bw;

It may seem a little cryptic but basically the JOIN statement is joining using the group that all the summaries share which is the time period. If you ILLUSTRATE the joined variable you will see the data is there but not in the format we are looking for.

| joined | tcp_summary::group:int | tcp_summary::tcp_len:long | udp_summary::group:int | udp_summary::udp_len:long | bw_summary::group:int | bw_summary::bw:long |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 1322644980 | 2080 | 1322644980 | 81 | 1322644980 | 2305 |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

However the summary projection generates the output the way we want it and we store that in a CSV format using PigStorage(‘,’).

STORE summary INTO '$output/binning' USING PigStorage(',');

Threat Detection

The SnortLoader() can be used to replay all conversations through Snort IDS and output attacks that it finds. The SnortLoader() can also take a snort.conf as a parameter so you can scan packet captures with specific Snort versions.

Run the basic snort.pig script to get an idea of the output.

./pigrun.py -x local -r data/web.pcap -f pig/examples/snort.pig

Now let’s run through the snort.pig script. Again we include all the jar’s we need for Packetpig.

%DEFAULT includepath pig/include.pig
RUN $includepath;

The script is constructed so that you can pass parameters to either scan all traffic for attacks or zero in on specific source and destination IP addresses. By leaving most of these null we inspect all traffic. Also note we are again binning time every 60 seconds. Lastly the Packetpig includes a number of versions of Snort. The default snort.conf we include ensures you use the latest one.

%DEFAULT time 60
%DEFAULT src null
%DEFAULT dst null
%DEFAULT sport null
%DEFAULT dport null
%DEFAULT snortconfig 'lib/snort/etc/snort.conf'

The SnortLoader() receives the snortconfig paramter and starts inspection the packet capture for attacks and provides them back to you in defined schema.

snort_alerts =
  LOAD '$pcap'
  USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$snortconfig')
  AS (
    ts:long,
    sig:chararray,
    priority:int,
    message:chararray,
    proto:chararray,
    src:chararray,
    sport:int,
    dst:chararray,
    dport:int
  );

Using this schema you can access the timestamp (ts), Snort Signature ID (sig), Severity/Priority (priority), Description of the attack (message) and the Source (src), Source Port (sport), Destination (dst) and Destination port (dport) of the attack.

If you ran the script and opened up output/snort/part-m-00000 you will see a number of attacks matching the schema output of the SnortLoader(). One thing to note is Snort using Priority 1 for the highest severity, Priority 2 for the next highest etc.

1322645240 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 184.84.221.18 80 192.168.0.19 34299
1322645387 139_1 2 (spp_sdf) SDF Combination Alert DIVERT 184.84.221.18 0 192.168.0.19 0
1322645603 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 74.125.237.27 80 192.168.0.19 41791
1322645907 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 199.181.254.21 80 192.168.0.19 54222
1322645689 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 74.125.237.123 80 192.168.0.19 42514
1322645739 138_5 2 SENSITIVE-DATA Email Addresses TCP 74.125.237.123 80 192.168.0.19 42514

The snort.pig script is our most basic example but hopefully you are already thinking about what you could filter on (e.g. Severity) as well as re projecting the data you access out of SnortLoader() to find the top ten attackers and top ten victims.

In my next post I will show you how to find Zero Day attacks in past network packet captures.

Apache ZooKeeper 3.4.5 Released!

Zookeeper Logo

Apache ZooKeeper™ release 3.4.5 is now available. This is a bug fix release including 3 bug fixes. Following is a summary of the critical issues fixed in the release.

ZOOKEEPER-1550: ZooKeeperSaslClient does not finish anonymous login on OpenJDK

ZOOKEEPER-1376: zkServer.sh does not correctly check for $SERVER_JVMFLAGS

ZOOKEEPER-1560: Zookeeper client hangs on creation of large nodes.

Stability of 3.4.5

Note that Apache ZooKeeper™ 3.4.5 is marked as the current stable release. This release fixes the disconnect issue (ZOOKEEPER-1550) that unfortunately got introduced in ZooKeeper™ 3.4.4 release.

Acknowledgements

Thanks to everyone who contributed towards the release including our users who reported the bugs in 3.4.5.

Learn More

You can learn more about Zookeeper™ on the Zookeeper™ Wiki, or in the documentation for Zookeeper™ 3.4.5.

Why not RAID-0? It’s about Time and Snowflakes

A recurrent question on the various Hadoop mailing lists is “why does Hadoop prefer a set of separate disks to the same set managed as a RAID-0 disks array?”

It’s about time and snowflakes.

JBOD and the Allure of RAID-0

In Hadoop clusters, we recommend treating each disk separately, in a configuration that is known, somewhat disparagingly as “JBOD”: Just a Box of Disks.

In comparison RAID-0, which is a bit of misnomer, there being no redundancy, stripes data across all the disks in the array. This promises some advantages:

  • Higher IO rates on small accesses
  • Higher bandwidth on larger accesses -especially write operations
  • Eliminates a hot-spot of a single disk overloaded if it’s data is more in demand

In RAID=0, data is striped across disks. When data needs to be written, it is divided up into small blocks (64KB or more). One of these blocks is written to each disk simultaneously. When the data is read back, all the blocks can again be read from all disks simultaneously. The result of this is that your disk bandwidth increases with the size of the array. If you had eight disks mounted as RAID-0, then the theoretical maximum write and read bandwidth is eight times faster than a single disk.

With the disk controllers built into modern servers, RAID-0 is an option that can be turned on: so why not?

Reliability

Reliability is one issue.

Disks can get slower as they age, as they start to get read errors and have to retry reading bits of the disk platter. A slow disk is a warning sign that maybe you should think about replacing that disk. With Hadoop in a JBOD setting, you can unmount the disk; the Datanode will notice it is missing and report to the Namenode that all the data on it needs re-replication. If you have a RAID-0 disk, everything across all disks is missing – you need to add a new disk to bring the array back up to size, reformat all the disks, and bring up the Datanode without any storage. Over time it will pick up more data, from rebalancing and jobs run on it.

You have to do that whenever any of the disks fails – the more disks you have, the more common it is.

Before panicking – disk failures are rare. Google’s 2007 paper, Failure Trends in a Large Disk Drive Population, reported that in their datacenters, 1.7% of disks failed in the first year of their life, while three-year-old disks were failing at a rate of 8.6%. About 9% isn’t a good number. Returning to the hypothetical eight-disk server, the probability of each disk lasting the year would be:

1 – 0.086 =0.914

The probability that all disks make it to their next birthday becomes:

0.914^8 = 0.487

If those google numbers matched that of the disks in your servers – and weren’t due to a really bad batch of disks – then during that third year, about half the datanodes would lose all their data and need to be rebuilt. If you have one of the latest twelve-disk servers, things get even worse.

Hadoop copes with reliability by duplicating data across servers: if one copy of an HDFS data block (64MB of greater) is lost or corrupt, there are usually two copies elsewhere to recover.

Only now, with all the data in a server lost, the amount of data to replicate on a disk failure increases linearly with the number of disks in each server – while the probability of the server failing also increases. Whereas before, each those failing year-three disks would have a probability of failing of 0.914 %, with the amount of data being the size of the disk: 1-3 TB of data.

That eight disk cluster would have to transmit 8-24 TB of data, and do it eight times as frequently. That’s going to be slightly more noticeable.

If you do want to use RAID-0 storage, configuring an eight-disk server as four pairs of RAID-0 storage is much less risky. The IO performance could be double that of a single drive, but so the risk of either failing would be less, and the cost of recovering the data also very much reduced.

Disk failures, then, are the first reason you don’t want to use RAID-0 storage – now to the second.

Every Disk is a Unique Snowflake

Hadoop job performance depends on disk bandwidth, especially the read bandwidth.

On RAID-0 Storage the disk accesses go at the rate of the slowest disk. It’s always been believed that the disks would all start out taking the same speed, and only degrade over time. Recent research shows things are worse than this: that the current generations of hard disks vary in performance from day one.

The 2011 paper, Disks Are Like Snowflakes: No Two Are Alike, measured the performance of modern disk drives, and discovered that they can vary in data IO rates by 20%, even when they are all writing to same part of the hard disk. This is because the latest manufacturing processes produce disks with different surface characteristics – altering the density at which the disks can store data. Rather than set the disk electronics up to only support the lowest measured performance, or to discard the slowest disks, manufacturers now use a technique called Adaptive Zoning. The newly manufactured disks are calibrated to their performance, and the the controllers configured to drive each zone in the disk at the highest rate that zone supports.

This is profound – and it’s not something that the disk manufacturers have been publicising. Modern CPU’s are “binned” into parts that support different rates – but those rates are published and you get to pay more for the faster parts. Here the speed of the disk varies, and you just have to hope your parts are the fast ones. In the experiments the authors of the paper conducted, some disks could deliver 105 Megabytes of data a second, with the actual range being 90-111 MB/s.

If you have eight disks, some will be faster than the others, right from day one. And your RAID-0 storage will deliver the performance of the slowest disk right from the day you unpack it from its box and switch it on.

That is the other why we don’t recommend configuring your servers’ storage as one large RAID-0 array.

Summary

To summarize: RAID-0 storage appears to increase disk I/O times, but it will deliver data at the rate of the slowest disk in the array – an array whose disk speeds can vary by up to 20%. When a single disk eventually fails, all the data on the server is lost, forcing you to reformat all the disks and waiting for HDFS to repopulate the server with new data.

  1. We use JBOD storage for all our worker nodes – and recommend our customers to do too.
  2. If anyone does insist on RAID-0 storage, restrict it to pairs of disks – so keeping the risk and cost of failures down.

Update: Single Drive RAID-0

Some people asked us what about RAID-0 and single drives – so we’d like to clarify this: Hadoop works perfectly well if you configure each drive as a single RAID-0 volume.

This configuration comes about with disk controllers that expect everything to be RAIDed, or if the controller can support JBOD or RAID modes -and you declare one pair of disks as RAID-1. Why would anyone want do declare two disks as RAID-1 – mirrored disks? If you put the OS on that RAID volume then the failure of either of those disks will not stop the server. In the very large 500+ node clusters people don’t do this -they have enough spare servers around, and would rather have the extra storage and bandwidth. On smaller clusters, having the OS on a mirrored pair of disks downgrades a server failure from a serious problem (re-replication of all the server’s data, new disk needed in a few hours) to a task (get a replacement disk and swap it into the server when you get a chance).

The problems with RAID-0 -amplified data replication on a disk failure, performance of the slowest disk- increase with the number of disks. With a single disk, you get exactly the same numbers as you would if the disk controller considered it a JBOD drive.

ApacheCon EU Day One Roundup – Part 1

Hackathon and Aeromuseum Reception

ApacheCon Europe kicked off yesterday with an all-day Hackathon followed by a committer’s reception at the Sinsheim Technik Museum, which has – among other large aircraft, a Concorde in Air France livery. My favorite was the diesel engine from a U-Boat – and its enormous drive-shaft and pistons.

Taking the Guesswork out of Hadoop Infrastructure

Winding a rented Opal through its gears along village roads for half an hour from my hotel-out-of-a-black-forest-fairy-tale, I made it to ApacheCon EU’s first day of sessions mid-way through the first talk by Steve Watt, ‘Taking the Guesswork out of Hadoop Infrastructure.’ Steve talked about the harsh reality of fitting hardware to a given workload using Hadoop with the quote: “We’ve profiled our Hadoop applications so we know what type of infrastructure we need.” — Said No One, Ever. Steve covered ways to instrument your cluster and outlined practical ways to test and tune your Hadoop and HBase clusters.

He also discussed ‘System on a Chip and Hadoop,’ which brings to mind the recent debate about Hadoop-specific hardware solutions.

Discussions in the hallways centered around long-term trends and shifting economics around cluster computing. With the PC rapidly being replaced by mobile devices and tablets, will the economies of scale for large clusters of PCs change? Will the growth of cloud-computing replace the desktop PC and continue to drive economy of scale? Or, will custom solutions start to make headway over commodity hardware over the next five years as the desktop and notebook PC disappear, driving up the cost of PC-based servers and making custom hardware more competitive? Will the economies of scale and power-efficiency of mobile and tablet chips replace the PC processor in Hadoop clusters? Fun stuff to contemplate!



Chart from MobileRodie.

The chart below would indicate that PC nodes will remain competitive, but that mobile-derived hardware may get cheap enough to compete as well! Or perhaps I’m dreaming :)

Enabling Elastic, Multi-tenant, Highly Available Hadoop on Demand

Next up was Richard McDougall with Enabling Elastic, Multi-tenant, Highly Available Hadoop on Demand which covered the ins and outs of Hadoop with virtualization. We’ve talked previously on the Hortonworks blog about virtualization as a part of Hadoop NameNode HA on Hadoop 1.

Virtualizing Hadoop data nodes on Amazon EC2 or VMWare has posed a major tradeoff in performance in the past, and VMWare is hard at work getting that penalty down to 10% for VMWare virtualized Hadoop clusters. Project Serengeti was founded with this goal in mind.

Extending lifespan with Hadoop and R

Radek Maciaszek presented Extending lifespan with Hadoop and R, which covered his project to identify aging related genes using R and Hadoop at the UCL Institute of Healthy Aging.

Inside Hadoop Development

Hortonworks’ own Steve Loughran presented Inside Hadoop Development.

Thats it for now, I’ll summarize the rest of the day, up next!

Agile Data European Megatour, then Home to Atlanta!

Agile Data hits the road this month, crossing Europe with the good news about Hadoop and teaching Hadoop users how build value from data using Hadoop to build analytics applications.

We’ll be giving out discount coupons to Hadoop Summit Europe, which is March 20-21st in Amsterdam!

  1. 11/3 – Agile Data @ The Warsaw Hadoop Users Group
  2. 11/5 to 11/6 – Attending ApacheCon Europe 2012 in Sinsheim, Germany. Say hello!
  3. 11/7 – Agile Data @ The France Hadoop Users Group in Paris
  4. 11/8 – Agile Data @ Netherlands Hadoop Users Group in Utrecht
  5. 11/12 – Agile Data @ Hadoop Users Group UK in London.
  6. 11/13 – Agile Data @ HP Labs in Bristol, England.
  7. 11/15 – Agile Data @ The combined Data Science ATL / Atlanta Hadoop Users Group

  8. 11/16 – Agile Data @ The Emory Library
  9. 11/19 – Agile Data @ The Atlanta MongoDB Users Group

I’m writing this from Warsaw, the first stop on my tour. This is my first time in Poland, and I’m excited to be speaking tonight at the Warsaw HUG and look forward to hearing about Hadoop in Poland. Tomorrow I’ll be checking out the sites, so let me know if you’d like to volunteer as tourguide in exchange for free, on the spot consulting!

You can view the incomplete book on O’Reilly OFPS here – I’ll be updating it daily for the next three weeks, so check Chapter 10, where I use Pig to build a graphical model in an attempt to improve my wife’s response rate to my emails :) . Code examples for the book are available here on github.

If you can’t make one of the talks, check out the slides below from my DC-HUG presentation, and help spread the good news!



DINOSAURS ARE REAL: Microsoft WOWs audience with HDInsight at Strata NYC (Hortonworks Inside)

You don’t see many demos like the one given by Shawn Bice (Microsoft) today in the Regent Parlor of the New York Hilton, at Strata NYC. “Drive Smarter Decisions with Microsoft Big Data,” was different.

For starters – everything worked like clockwork. Live demos of new products are notorious for failing on-stage, even if they work in production. And although Microsoft was presenting about a Java-based platform at a largely open-source event… it was standing room only, with the crowd overflowing out the doors.

Shawn demonstrated working with Apache Hadoop from Excel, through Power Pivot, to Hive (with sampling-driven early results!?) and out to import third party data-sets. To get the full effect of what he did, you’re going to have to view a screencast or try it out but to give you the idea of what the first proper interface on Hadoop feels like…

There was a comedian who had a bit about… remember when you first saw Jurassic Park for the first time? No matter how old you were, your child-like response was, “DINOSAURS ARE REAL!!!!!!$!!$##!” Our reaction to Jurassic Park was CGI technology disrupting cinema, provoking the same kind of reaction early cinema had on viewers who felt real concern that the horse or train approaching would run them over. At least thats what I learned wasting a lottery-funded academic scholarship on film classes at a state university before having the good sense to fail out and use my time productively.

That feeling you got when you saw your first CGI raptor is what Microsoft’s demo was like, except it went… “HADOOP IS IN EXCEL!!$%!%!%!$????!!!”

This is a serious thing for me, because I hooked up Pig and Excel years ago:

Which is a crappy demo of Hadoop connecting to Excel, but which gives me mucho moral authority to state that Microsoft’s demo was the right way to hook data to Excel. Take it from someone that spent half of his twenties trying to build web applications that could compete against Excel: until data is in Excel… it ain’t real. With Microsoft’s new offering… big data just got real.

To put this into perspective:

And just so you know I’m not bullshitting you about Hadoop and Big Data and Raptors and next thing you know you’re checking for your wallet and nodding awkwardly and trying to find a pause in this lunatic rant to get the hell out of here, I’ll just come out and tell you:

I have a raptor named lame-o-saurus in a Cowboy Curtis hat permanently tattood on my body. Again, we resort to visualization (mind the hair):

To summarize:

  1. I am the world’s primary authority on the wrong way to hook Hadoop to Excel.
  2. I have strange tattoos which affirm the validity of my metaphors.
  3. Microsoft has fundamentally altered Big Data with their HDInsights offering.
  4. Yesterday, a breakthrough happened in the Regent Parlor of the Hilton, NYC.

Visicalc… we’ve come such a long way.

Go to page:« First...23456...10...Last »