Posts by Russell Jurney:


Big Data Defined

‘Big Data’ has become a hot buzzword, but a poorly defined one. Here we will define it.

Wikipedia defines Big Data in terms of the problems posed by the awkwardness of legacy tools in supporting massive datasets:

In information technology, big data[1][2] is a collection of data sets so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications.

It is better to define ‘Big Data’ in terms of opportunity, in terms of transformative economics. Big Data is the opportunity space created by new open source, distributed systems from the consumer internet space.

Specifically, a Big Data system has four properties:

  • It uses local storage to be fast but inexpensive
  • It uses clusters of commodity hardware to be inexpensive
  • It uses free software to be inexpensive
  • It is open source to avoid expensive vendor lock-in

Cheap storage means logging enormous volumes of data to many disks is easy. Processing this data is less so. Distributed systems which have the above four properties are disruptive because they are approximately 100 times cheaper than other systems for processing large volumes of data, and because they deliver high I/O performance for the buck.

Apache Hadoop is one such system. Hadoop ties together a cluster of commodity machines with local storage using free and open source software to store and process vast amounts of data at a fraction of the cost of any other system.

SAN Storage NAS Filers Local Storage
$2-10/GB $1-5/GB $0.05/GB

It is out of this cost differential that our opportunity arises: to log every shred of data we can in the cheapest place possible. To provide access to this data across the organization. To mine our data for value. To undergo the transformative processes that unabridged access to data provides, enabling bigger, better, faster more profound insight than ever before.

This is a working definition of Big Data.

What do you think? What is your definition of Big Data?

Understanding Hadoop 2.0

In this post, we’ll explain the difference between Hadoop 1.0 and 2.0. After all, what is Hadoop 2.0? What is YARN?

For starters – what is Hadoop and what is 1.0? The Apache Hadoop project is the core of an entire ecosystem of projects. It consists of four modules (see here):

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Hadoop 1.0 is based on the Hadoop .20.205 branch (it went 0.18 -> 0.19 -> 0.20 -> 0.20.2 -> 0.20.205 -> 1.0). Hard to follow? Check out this chart. Not hard for an open source developer, but obscure for an enterprise product – so everyone agreed to call 0.20.205 ’1.0′, the project having matured to that point.

Hadoop 2.0 is from the Hadoop 0.23 branch, with major components re-written to enable support for features like High Availability, and MapReduce 2.0 (YARN), and to enable Hadoop to scale out past 4,000 machines per cluster. Specifically, Hadoop 2.0 adds (see here):

  • HDFS Federation – multiple, redundant namenodes acting in congress
  • MapReduce NextGen aka YARN aka MRv2 – which transforms Hadoop into a full blown platform as a service. See here.

Hadoop 1.0 is rock-solid, Hadoop 2.0 is still in active development and is considered Alpha. Work continues to stabilize Hadoop 2.0, so stay tuned!

HOWTO use Hive to SQLize your own Tweets – Part Two: Loading Hive, SQL Queries

In part one of this series, we covered how to download your tweet archive from Twitter, ETL it into json/newline format, and to extract a Hive schema. In this post, we will load our tweets into Hive and query them to learn about our little world.

To load our tweet-JSON into Hive, we’ll use the rcongiu Hive-JSON-Serde. Download and build it via:

wget http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar
mvn install:install-file -DgroupId=javax.jdo -DartifactId=jdo2-api \
  -Dversion=2.3-ec -Dpackaging=jar -Dfile=jdo2-api-2.3-ec.jar
mvn package

Find the jar it generated via:

find .|grep jar
./target/json-serde-1.1.4-jar-with-dependencies.jar
./target/json-serde-1.1.4.jar

Run hive, and create our table with the following commands:

add jar /path/to/my/Hive-Json-Serde/target/json-serde-1.1.4-jar-with-dependencies.jar;

create table tweets (
   created_at string,
   entities struct <
      hashtags: array ,
            text: string>>,
      media: array ,
            media_url: string,
            media_url_https: string,
            sizes: array >,
            url: string>>,
      urls: array ,
            url: string>>,
      user_mentions: array ,
            name: string,
            screen_name: string>>>,
   geo struct <
      coordinates: array ,
      type: string>,
   id bigint,
   id_str string,
   in_reply_to_screen_name string,
   in_reply_to_status_id bigint,
   in_reply_to_status_id_str string,
   in_reply_to_user_id int,
   in_reply_to_user_id_str string,
   retweeted_status struct <
      created_at: string,
      entities: struct <
         hashtags: array ,
               text: string>>,
         media: array ,
               media_url: string,
               media_url_https: string,
               sizes: array >,
               url: string>>,
         urls: array ,
               url: string>>,
         user_mentions: array ,
               name: string,
               screen_name: string>>>,
      geo: struct <
         coordinates: array ,
         type: string>,
      id: bigint,
      id_str: string,
      in_reply_to_screen_name: string,
      in_reply_to_status_id: bigint,
      in_reply_to_status_id_str: string,
      in_reply_to_user_id: int,
      in_reply_to_user_id_str: string,
      source: string,
      text: string,
      user: struct <
         id: int,
         id_str: string,
         name: string,
         profile_image_url_https: string,
         protected: boolean,
         screen_name: string,
         verified: boolean>>,
   source string,
   text string,
   user struct <
      id: int,
      id_str: binary,
      name: string,
      profile_image_url_https: string,
      protected: boolean,
      screen_name: string,
      verified: boolean>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;

Load it full of data from the tweet JSON file we created last tutorial:

LOAD DATA LOCAL INPATH '/path/to/all_tweets.json' OVERWRITE INTO TABLE tweets;

Verify our data loaded with a count:

SELECT COUNT(*) from tweets;
OK
24655

Our tweets are loaded! Some fun queries to run:

    • Sample some tweets
SELECT text from tweets limit 5

Which gets us:

OK
Paddled out, tried to psyche myself into wave for 30 minutes...
Waves twice as tall as me are scary
No waves here yet, nap time
Doin 80 on i10w
Gustav and panama city beach here I come
    • Top people we reply to:
SELECT in_reply_to_screen_name, 
  COUNT(*) as total from tweets 
  GROUP BY in_reply_to_screen_name 
  ORDER BY total DESC 
  LIMIT 30;

Which gets us the top N people I reply to:

OK
NULL	13447
sanjay	356
Urvaksh	282
ChrisDiehl	268
pfreet	230
mikeschinkel	222
mmealling	193
keithmcgreggor	191
peteskomoroch	183
semil	183
...

Hive has some builtin n-gram analysis utilities, documented here that we can use. For example:

SELECT sentences(lower(text)) FROM tweets;
[["dear","twitter","send","me","my","tweets","plz","you","promised","me"]]
[["pig","eye","for","the","sql","guy","http","t.co","vjx4rcugix","via","sharethis"]]
[["rt","hortonworks","pig","eye","for","the","sql","guy","with","mortardata","http","t.co","vnkwsswnkv","hadoop"]]

We can use these to do n-gram analysis:

SELECT ngrams(sentences(lower(text), 3, 10) FROM tweets;

Which is kind of amusing:

[{"ngram":["http","instagr.am","p"],"estfrequency":136.0},
{"ngram":["i","want","to"],"estfrequency":100.0},
{"ngram":["on","hacker","news"],"estfrequency":92.0},
{"ngram":["you","have","to"],"estfrequency":66.0},
{"ngram":["a","lot","of"],"estfrequency":65.0},
{"ngram":["i","need","to"],"estfrequency":63.0},
{"ngram":["is","looking","for"],"estfrequency":59.0},
{"ngram":["hortonworks","is","looking"],"estfrequency":59.0},
{"ngram":["there","is","no"],"estfrequency":56.0},{"ngram":["is","there","a"],"estfrequency":53.0}]

You can see common phrases, as well as hortonworks job offerings that are auto-tweeted, and of course – ‘on hacker news’ – talking about Hacker News :)

We can also check out our tweets that are RTs.

SELECT retweeted_status.user.screen_name, COUNT(*) as total 
  FROM tweets 
  WHERE retweeted_status.user is not null 
  GROUP BY retweeted_status.user.screen_name 
  ORDER BY total desc 
  LIMIT 20;

This gets me:

OK
peteskomoroch	99
hortonworks	97
ChrisDiehl	56
newsycombinator	55
newsyc20	38
adamnash	31
bradfordcross	29
kjurney	29

Once we have our tweets in Hive, there’s no limit to what we can do to them! This is what Hive excels at.

HOWTO use Hive to SQLize your own Tweets – Part One: ETL and Schema Discovery

Note: Continued in part two

Your Twitter Archive

Twitter has a new feature, Your Twitter Archive, that enables any user to download their tweets as an archive. To view this feature, look at the bottom of the page at your account settings page. There should be an option for ‘Your Twitter archive,’ which will generate your tweets as a json/javascript web application and send them to you in email as a zip file.

download_tweets_email
Be patient: this process can take several days, in particular if you’ve lots of tweets (I personally have 24K tweets, and it took 4-5 days to get my tweets).

save_tweets
After a few hours or days, you’ll receive an email with a download link. Download your tweets, and unzip them to reveal their contents.

tweet_files

Digging In: ETL

There is a file called tweets.csv, but that is not the file we are interested in. It has very little detail. The files we are interested is in, which contain all the tweet data, are in the data/js/tweets directory. There is one file per month, laid out like this:

2008_08.js	2009_03.js	2009_10.js	2010_05.js	2010_12.js	2011_07.js	2012_02.js	2012_09.js
2008_09.js	2009_04.js	2009_11.js	2010_06.js	2011_01.js	2011_08.js	2012_03.js	2012_10.js
2008_10.js	2009_05.js	2009_12.js	2010_07.js	2011_02.js	2011_09.js	2012_04.js	2012_11.js
2008_11.js	2009_06.js	2010_01.js	2010_08.js	2011_03.js	2011_10.js	2012_05.js	2012_12.js
2008_12.js	2009_07.js	2010_02.js	2010_09.js	2011_04.js	2011_11.js	2012_06.js	2013_01.js
2009_01.js	2009_08.js	2010_03.js	2010_10.js	2011_05.js	2011_12.js	2012_07.js	2013_02.js
2009_02.js	2009_09.js	2010_04.js	2010_11.js	2011_06.js	2012_01.js	2012_08.js	2013_03.js

Inconveniently, the first line of each file is javascript:

Grailbird.data.tweets_2008_08 =

The first thing we’ll need to do us remove that line. Once we do so, the file is a large json array. Once we have an array, we can easily convert to the JSON format that Hive expects: one json object per newline.

I’ve created a python script that removes the first line of text, and converts and prints a one-json-object-per-newline format here: convert.py.

#!/usr/bin/env python

import sys, os, glob
import json

tweet_base = sys.argv[1]

# Open tweets
os.chdir(tweet_base + "/data/js/tweets")
for filename in glob.glob("*.js"):
  f = open(filename)
  lines = f.readlines()

  # Chop the first line (its javascript), parse the resulting array of tweets
  entire = ''
  for line in lines[1:]:
    entire += line
  tweets = json.loads(entire)

  # Now print, one json object per line
  for tweet in tweets:
    print json.dumps(tweet)

# Done - we've printed the tweets, one json record per line

To run the code:

convert.py ~/Downloads/tweets > all_tweets.json

Amusingly, my first tweet is about being terrified surfing Hurricane Gustav:

{
    "entities": {
        "user_mentions": [],
        "media": [],
        "hashtags": [],
        "urls": []
    },
    "text": "Paddled out, tried to psyche myself into wave for 30 minutes, then was afraid to come in for 30 more. \"I jusy want to go home.\"",
    "created_at": "Mon Sep 01 01:16:47 +0000 2008",
    "source": "Twinkle",
    "id_str": "905266904",
    "geo": {},
    "id": 905266904,
    "user": {
        "verified": false,
        "name": "Russell Jurney",
        "profile_image_url_https": "http://si0.twimg.com/profile_images/2964060639/9a98c1eb08f57472743caa4a5ae3260b_normal.jpeg",
        "protected": false,
        "id_str": "15831927",
        "id": 15831927,
        "screen_name": "rjurney"
    }
}

Schema Discovery

To load our tweets into Hive, we need a schema. There is no explicit schema for json data, we must infer it. Along these lines, Hortonworks co-founder Owen O’Malley created a tool, available on github as hive-json, that extracts a Hive schema from a collection of JSON documents. Given a collection of schemas, the output schema will be the superset of them all, which creates a reasonable SQL schema: optional fields are often null.

bin/find-json-schema ~/Downloads/all_tweets.json

Reading /Users/rjurney/Software/hive_tweets/tweets.json
24655 records read

create table tbl (
   created_at string,
   entities struct <
      hashtags: array ,
            text: string>>,
      media: array ,
            media_url: string,
            media_url_https: string,
            sizes: array >,
            url: string>>,
      urls: array ,
            url: string>>,
      user_mentions: array ,
            name: string,
            screen_name: string>>>,
   geo struct <
      coordinates: array ,
      type: string>,
   id bigint,
   id_str string,
   in_reply_to_screen_name string,
   in_reply_to_status_id bigint,
   in_reply_to_status_id_str string,
   in_reply_to_user_id int,
   in_reply_to_user_id_str string,
   retweeted_status struct <
      created_at: string,
      entities: struct <
         hashtags: array ,
               text: string>>,
         media: array ,
               media_url: string,
               media_url_https: string,
               sizes: array >,
               url: string>>,
         urls: array ,
               url: string>>,
         user_mentions: array ,
               name: string,
               screen_name: string>>>,
      geo: struct <
         coordinates: array ,
         type: string>,
      id: bigint,
      id_str: string,
      in_reply_to_screen_name: string,
      in_reply_to_status_id: bigint,
      in_reply_to_status_id_str: string,
      in_reply_to_user_id: int,
      in_reply_to_user_id_str: string,
      source: string,
      text: string,
      user: struct <
         id: int,
         id_str: string,
         name: string,
         profile_image_url_https: string,
         protected: boolean,
         screen_name: string,
         verified: boolean>>,
   source string,
   text string,
   user struct <
      id: int,
      id_str: binary,
      name: string,
      profile_image_url_https: string,
      protected: boolean,
      screen_name: string,
      verified: boolean>
)

So we’ve got JSON/newline version of our tweets, and a schema for them in Hive. In our next post, we’ll use the Hive-JSON-Serde to load the tweets and begin our analysis!

Continued in part two

Touring Ambari

Hot on the heels of the release of the new version of Sandbox, I thought it would be worth a look at Ambari as it is now integrated into the Sandbox VM. You can download the Hortonworks Sandbox and try it out for yourself!

Apache Ambari is a web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters. It greatly simplifies and reduces the complexity of running Apache Hadoop. Ambari is a fully open-source, Apache project and graphical interface to Hadoop.

ambari_dashboard

The Ambari Dashboard serves as a home page for your cluster, defining key metrics and linking you through to particular services on the cluster.

ambari_heatmap

Heatmaps show which parts of your cluster are the least or most active, which can help with capacity and load management.

ambari_services

The Ambari Services interface lets you monitor cluster-wide services on your Hadoop cluster.

ambari_hosts

The Ambari Hosts interface lets you drill down to individual hosts that make up your cluster.

ambari_jobs

The Ambari Jobs interface lets you examine the individual applications and jobs that makeup your Hadoop workload.

ambari_users

The Ambari Users interface helps you administer new users on your Hadoop cluster. You can try it out by downloading the new Hortonworks Sandbox. We hope you enjoyed this post, please let us know by commenting!

HOWTO install Hadoop on Windows

Installing the Hortonworks Data Platform for Windows couldn’t be easier. Lets take a look at how to install a one node cluster on your Windows Server 2012 machine. to let us know if you’d like more content like this.

msi_download
To start, download the HDP for Windows MSI at http://hortonworks.com/thankyou-hdp11-win/. It is about 460MB, and will take a moment to download. Documentation for the download is available here.

As indicated in the documentation here, first we must install Microsoft Visual C++ 2010 Redistributable Package (x64), available here.

Download and install .NET from here if you haven’t already.

We need to setup Java, which you can get here. We need to setup JAVA_HOME, which Hadoop requires. Make sure to install Java to somewhere without a space in the path, “Program Files” will not work!

To setup JAVA_HOME, in the file browsers -> right click computer -> Properties -> Advanced System Settings -> Environment variables. Then setup a new System variable called JAVA_HOME that points to your Java install (in this case, C:\java\jdk1.6.0_31).

JAVA_HOME

Finally, we need to download python from here and set the Path environment variable as we did JAVA_HOME. Go to Computer -> Properties -> Advanced System Settings -> Environment variables. Then append the install path to Python, for example C:\Python27, to this path after a ‘;’.

python_path

Verify your path is setup by entering a new shell and typing: python, which should run the python interpreter. Type quit() to exit. Now we’re ready for our configuration.

Next, notepad the file clusterproperties.txt, which we will setup for a simple, one node cluster operation. Note: first we need to discover our hostname, and enter it into our config instead of something generic like ‘localhost.’ Use the hostname command, for example:

hostname
WIN-4VLBRQK8FA8

We then place this hostname in our config. Be sure the replace the example value with your own hostname!

#Log directory
HDP_LOG_DIR=c:\hadoop\logs

#Data directory
HDP_DATA_DIR=c:\hadoop\data

#Hosts
NAMENODE_HOST=WIN-4VLBRQK8FA8
SECONDARY_NAMENODE_HOST=WIN-4VLBRQK8FA8
JOBTRACKER_HOST=WIN-4VLBRQK8FA8
HIVE_SERVER_HOST=WIN-4VLBRQK8FA8
OOZIE_SERVER_HOST=WIN-4VLBRQK8FA8
TEMPLETON_HOST=WIN-4VLBRQK8FA8
SLAVE_HOSTS=WIN-4VLBRQK8FA8

#Database host
DB_FLAVOR=derby
DB_HOSTNAME=WIN-4VLBRQK8FA8

#Hive properties
HIVE_DB_NAME=hive
HIVE_DB_USERNAME=hive
HIVE_DB_PASSWORD=hive

#Oozie properties
OOZIE_DB_NAME=oozie
OOZIE_DB_USERNAME=oozie
OOZIE_DB_PASSWORD=oozie

And finally, install HDP for Windows:

msiexec.exe /i "hdp-1.1.0-160.winpkg.msi" /lv install.log \
HDP_LAYOUT=c:\Users\Administrator\Downloads\clusterproperties.txt HDP_DIR=c:\HDP DESTROY_DATA="yes"

This will bring up an MSI install window. When it is done, to verify your installation, check the HDP_DIR it was installed to:

dir c:\HDP

You should see files, such as ‘start_local_hdp_services.cmd’. Run this file:

.\start_local_hdp_services.cmd

With services up, you’re in good shape to run the SmokeTests.

Run-SmokeTests.cmd

Which will fire off a mapreduce job right away. Congratulations, you’re Hadooping on Windows!

mapreduce

If you’d like to learn more about Hadoop, check out the Hortonworks Sandbox, a fully capable virtual machine for you to learn Hadoop with.

Apache Pig 0.11 Released!

Apache Pig version 0.11 was released last week. An Apache Pig blog post summarized the release. New features include:

  • A DateTime datatype, documentation here.
  • A RANK function, documentation here.
  • A CUBE operator, documentation here.
  • Groovy UDFs, documentation here.

And many improvements. Oink it up for Pig 0.11! Hortonworks’ Daniel Dai gave a talk on Pig 0.11 at Strata NY, check it out:

Pig, ToJson, and Redis to publish data with Flask


Pig can easily stuff Redis full of data. To do so, we’ll need to convert our data to JSON. We’ve previously talked about pig-to-json in JSONize anything in Pig with ToJson. Once we convert our data to json, we can use the pig-redis project to load redis.

Build the pig to json project:

git clone git@github.com:rjurney/pig-to-json.git
ant

Then run our Pig code:

/* Load Avro jars and define shortcut */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

register /me/Software/pig-to-json/dist/lib/pig-to-json.jar
register /me/Software/pig-redis/dist/pig-redis.jar

-- Enron emails are available at https://s3.amazonaws.com/rjurney_public_web/hadoop/enron.avro
emails = load '/me/Data/enron.avro' using AvroStorage();

json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;

store json_test into 'dummy-name' using com.hackdiary.pig.RedisStorer('kv', 'localhost');

Now run our Flask web server:

python server.py

redis-pig

Code for this post is available here: https://github.com/rjurney/enron-pig-tojson-redis-node.

Imperative and Declarative Hadoop: TPC-H in Pig and Hive

According to the Transaction Processing Council, TPC-H is:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

TPC-H was implemented for Hive in HIVE-600 and for Pig in PIG-2397 by Hortonworks intern Jie Li. In going over this work, I was struck by how it outlined differences between Pig and SQL.

There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.

Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:

DROP TABLE lineitem;
DROP TABLE q1_pricing_summary_report;

-- create tables and load data
Create external table lineitem (
    L_ORDERKEY INT, L_PARTKEY INT, 
    L_SUPPKEY INT, 
    L_LINENUMBER INT, 
    L_QUANTITY DOUBLE, 
    L_EXTENDEDPRICE DOUBLE, 
    L_DISCOUNT DOUBLE, 
    L_TAX DOUBLE, 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    L_SHIPDATE STRING, 
    L_COMMITDATE STRING, 
    L_RECEIPTDATE STRING, 
    L_SHIPINSTRUCT STRING, 
    L_SHIPMODE STRING, 
    L_COMMENT STRING) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem';

-- create the target table
CREATE TABLE q1_pricing_summary_report ( 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    SUM_QTY DOUBLE, 
    SUM_BASE_PRICE DOUBLE, 
    SUM_DISC_PRICE DOUBLE, 
    SUM_CHARGE DOUBLE, 
    AVE_QTY DOUBLE, 
    AVE_PRICE DOUBLE, 
    AVE_DISC DOUBLE, 
    COUNT_ORDER INT);

set mapred.min.split.size=536870912;

-- the query
INSERT OVERWRITE TABLE q1_pricing_summary_report 
SELECT 
    L_RETURNFLAG, 
    L_LINESTATUS, 
    SUM(L_QUANTITY), 
    SUM(L_EXTENDEDPRICE), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), 
    AVG(L_QUANTITY),
    AVG(L_EXTENDEDPRICE), 
    AVG(L_DISCOUNT), 
    COUNT(1) 
FROM 
  lineitem 
WHERE 
  L_SHIPDATE<='1998-09-02' 
GROUP BY L_RETURNFLAG, L_LINESTATUS 
ORDER BY L_RETURNFLAG, L_LINESTATUS;

One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:

 SET default_parallel $reducers;

LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (
    orderkey:long, 
    partkey:long, 
    suppkey:long, 
    linenumber:long, 
    quantity:double, 
    extendedprice:double, 
    discount:double, 
    tax:double, 
    returnflag, 
    linestatus, 
    shipdate, 
    commitdate, 
    receiptdate, 
    shipinstruct, 
    shipmode, 
    comment);

SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02';

SubLine = FOREACH SubLineItems GENERATE 
    returnflag, 
    linestatus, 
    quantity, 
    extendedprice, 
    extendedprice * (1-discount) AS disc_price, 
    extendedprice * (1-discount) * (1+tax) AS charge, 
    discount;

StatusGroup = GROUP SubLine BY (returnflag, linestatus);

PriceSummary = FOREACH StatusGroup GENERATE 
    group.returnflag AS returnflag, 
    group.linestatus AS linestatus, 
    SUM(SubLine.quantity) AS sum_qty, 
    SUM(SubLine.extendedprice) AS sum_base_price, 
    SUM(SubLine.disc_price) as sum_disc_price, 
    SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, 
    AVG(SubLine.extendedprice) as avg_price, 
    AVG(SubLine.discount) as avg_disc, 
    COUNT(SubLine) as count_order;

SortedSummary = ORDER PriceSummary BY returnflag, linestatus;

STORE SortedSummary INTO '$output/Q1out';

Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:

DROP TABLE customer;
DROP TABLE orders;
DROP TABLE q22_customer_tmp;
DROP TABLE q22_customer_tmp1;
DROP TABLE q22_orders_tmp;
DROP TABLE q22_global_sales_opportunity;

-- create tables and load data
create external table customer (
    C_CUSTKEY INT, 
    C_NAME STRING, 
    C_ADDRESS STRING, 
    C_NATIONKEY INT, 
    C_PHONE STRING, 
    C_ACCTBAL DOUBLE, 
    C_MKTSEGMENT STRING, 
    C_COMMENT STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer';

create external table orders (
    O_ORDERKEY INT, 
    O_CUSTKEY INT, 
    O_ORDERSTATUS STRING, 
    O_TOTALPRICE DOUBLE, 
    O_ORDERDATE STRING, 
    O_ORDERPRIORITY STRING, 
    O_CLERK STRING, 
    O_SHIPPRIORITY INT, 
    O_COMMENT STRING) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders';

-- create target tables
create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string);
create table q22_customer_tmp1(avg_acctbal double);
create table q22_orders_tmp(o_custkey int);
create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double);

-- the query
insert overwrite table q22_customer_tmp
select 
  c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode
from 
  customer
where 
  substr(c_phone, 1, 2) = '13' or
  substr(c_phone, 1, 2) = '31' or
  substr(c_phone, 1, 2) = '23' or
  substr(c_phone, 1, 2) = '29' or
  substr(c_phone, 1, 2) = '30' or
  substr(c_phone, 1, 2) = '18' or
  substr(c_phone, 1, 2) = '17';

insert overwrite table q22_customer_tmp1
select
  avg(c_acctbal)
from
  q22_customer_tmp
where
  c_acctbal > 0.00;

insert overwrite table q22_orders_tmp
select 
  o_custkey 
from 
  orders
group by 
  o_custkey;

insert overwrite table q22_global_sales_opportunity
select
  cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal
from
(
  select cntrycode, c_acctbal, avg_acctbal from
  q22_customer_tmp1 ct1 join
  (
    select cntrycode, c_acctbal from
      q22_orders_tmp ot 
      right outer join q22_customer_tmp ct 
      on
        ct.c_custkey = ot.o_custkey
    where
      o_custkey is null
  ) ct2
) a
where
  c_acctbal > avg_acctbal
group by cntrycode
order by cntrycode;

The Pig is comparably simple:

 SET default_parallel $reducers;

customer = load '$input/customer' USING PigStorage('|') as (
    c_custkey:long,
    c_name:chararray, 
    c_address:chararray, 
    c_nationkey:int, 
    c_phone:chararray, 
    c_acctbal:double, 
    c_mktsegment:chararray, 
    c_comment:chararray);
orders = load '$input/orders' USING PigStorage('|') as (
    o_orderkey:long, 
    o_custkey:long, 
    o_orderstatus:chararray, 
    o_totalprice:double, 
    o_orderdate:chararray, 
    o_orderpriority:chararray, 
    o_clerk:chararray, 
    o_shippriority:long, 
    o_comment:chararray);

customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_filter_group = group customer_filter all;
avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal;

customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey;

customer_trd_filter = filter customer_orders_left by o_custkey is null;
customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal;

customer_result_group = group customer_rows by cntrycode;
customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal;
customer_result_inorder = order customer_result by group;

store customer_result_inorder into '$output/Q22out' USING PigStorage('|');

Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.

Hadoop Summit Europe Call for Papers Ends this Friday, November 30th

The Hadoop Summit Europe official call for papers ends this Friday, November 30th – so be sure to get your session submissions in this week!

Hadoop Summit Europe is March 20, 21 at the Beurs van Berlage in Amsterdam, Netherlands. You still have time to submit an abstract now!

The four content tracks are:

Applied Hadoop

Sessions in this track focus on applications, tools, algorithms and data science as well as areas of advanced research and emerging applications that use and extend the Hadoop platform. Sessions will cover examples of innovative data processing applications and algorithms for performing the most common statistical analysis as well as supporting the latest advances in artificial intelligence and machine learning.

Operating Hadoop

This track focuses on the deployment and operations of Hadoop clusters with an emphasis on tips, tricks, and best practices. Sessions will cover the full deployment lifecycle from installation, configuration, and initial production deployment to large-scale roll out. Reference architectures that maximize performance while minimizing costs will also be covered.

Hadoop Futures

This track takes a technical look at the key open source projects and research efforts driving innovation in and around the Hadoop platform. Attendees will hear from the technical leads, committers, and expert users who are actively driving the roadmaps, key features, and advanced technology research.

Integrating Hadoop

For many, Hadoop success will largely depend on the ability to integrate with existing data-driven and data management technologies. No matter if it is streaming, batch or real time interaction, these integration points are what exposes the value of Hadoop to the rest of the enterprise. This track This track focuses on Hadoop + enterprise (in particular databases, data warehouses, NoSQL, etc.). Sessions will explore these key integration points and will provide deployment and production examples of successful Hadoop integration within the enterprise today.

Announcing Chairs for Hadoop Summit Europe

Track Chairs have been named for Hadoop Summit Europe. Track Chairs will, in turn, select their track committees who, as a team, will decide which sessions are to be presented at Hadoop Summit Europe. They are as follow:

Operating Hadoop – Evert Lammerts, SARA

I joined Sara as a technical consultant in October 2008. In 2009 I started experimenting with non-traditional distributed processing and storage platforms, mainly Hadoop. I’m currently the lead Hadoop and related big data services. I’m also the organizer of the Dutch Hadoop User Group meetup, jury member for the 2012-2013 Norvig Web Data Science Award, and chair of the Operating Hadoop track of the first European Hadoop Summit.

Before joining Sara I lived in Hungary for four years, where I finished my studies Software Engineering at MTA SZTAKI. During those years I also worked as a short-term expert for the Dutch ministry of Agriculture, Nature Management, and Fisheries, in a Twinning project with our counterpart in Serbia. Right now I’m back in The Netherlands.

Applied Hadoop – Isabel Drost-Fromm, Mahout

Isabel Drost is member of the Apache Software Foundation. She is founder of the Berlin Buzzwords Conference, of the Apache Hadoop Get Together in Berlin, and co-organised of the first European NoSQL meetup. Isabel co-founded Apache Mahout and is active Apache Mahout committer. Isabel is actively engaged with communities of various Apache projects, e.g. Lucene and Hadoop. She is regular speaker at renown conferences on topics related to free software development, scalability, big data, Hadoop and Mahout. Currently Isabel Drost works for Nokia Gate 5 GmbH as Software Developer.

Integrating Hadoop – Lars George, Cloudera

Lars George has been involved with HBase since 2007, and became a HBase committer in 2009. He has spoken at many Hadoop User Group meetings, and conferences such as FOSDEM, QCon, and Hadoop World. He also started the Munich OpenHUG meetings. He now works for Cloudera to support Hadoop and HBase in and around Europe through technical support, consulting work, and training. He is also the author of O’Reilly’s “HBase – The Definitive Guide”.

Hadoop Futures – Steve Loughran, Hortonworks, Hadoop

Steve Loughran is a member of technical staff at Hortonworks, where he works on leading-edge developments within the Hadoop ecosystem, including service availability, cloud infrastructure integration, and emerging layers in the Hadoop stack.

Previously, he worked at HP Laboratories on large-scale distributed systems, including cloud computing infrastructures, dynamic Hadoop clusters and configuration management.

He is the author of Ant in Action, a member of the Apache Software Foundation, an active committer on the Hadoop core projects; an inactive committer on Apache Ant and Axis.

He lives and works in Bristol, England.

Call for Abstract is now Open!

The call for abstracts is now open. To submit an abstract, go here: http://hadoopsummit.org/amsterdam/call-for-papers/. The deadline for submission is November 30th, so hurry now!

ApacheCon EU Day One Roundup – Part 1

Hackathon and Aeromuseum Reception

ApacheCon Europe kicked off yesterday with an all-day Hackathon followed by a committer’s reception at the Sinsheim Technik Museum, which has – among other large aircraft, a Concorde in Air France livery. My favorite was the diesel engine from a U-Boat – and its enormous drive-shaft and pistons.

Taking the Guesswork out of Hadoop Infrastructure

Winding a rented Opal through its gears along village roads for half an hour from my hotel-out-of-a-black-forest-fairy-tale, I made it to ApacheCon EU’s first day of sessions mid-way through the first talk by Steve Watt, ‘Taking the Guesswork out of Hadoop Infrastructure.’ Steve talked about the harsh reality of fitting hardware to a given workload using Hadoop with the quote: “We’ve profiled our Hadoop applications so we know what type of infrastructure we need.” — Said No One, Ever. Steve covered ways to instrument your cluster and outlined practical ways to test and tune your Hadoop and HBase clusters.

He also discussed ‘System on a Chip and Hadoop,’ which brings to mind the recent debate about Hadoop-specific hardware solutions.

Discussions in the hallways centered around long-term trends and shifting economics around cluster computing. With the PC rapidly being replaced by mobile devices and tablets, will the economies of scale for large clusters of PCs change? Will the growth of cloud-computing replace the desktop PC and continue to drive economy of scale? Or, will custom solutions start to make headway over commodity hardware over the next five years as the desktop and notebook PC disappear, driving up the cost of PC-based servers and making custom hardware more competitive? Will the economies of scale and power-efficiency of mobile and tablet chips replace the PC processor in Hadoop clusters? Fun stuff to contemplate!



Chart from MobileRodie.

The chart below would indicate that PC nodes will remain competitive, but that mobile-derived hardware may get cheap enough to compete as well! Or perhaps I’m dreaming :)

Enabling Elastic, Multi-tenant, Highly Available Hadoop on Demand

Next up was Richard McDougall with Enabling Elastic, Multi-tenant, Highly Available Hadoop on Demand which covered the ins and outs of Hadoop with virtualization. We’ve talked previously on the Hortonworks blog about virtualization as a part of Hadoop NameNode HA on Hadoop 1.

Virtualizing Hadoop data nodes on Amazon EC2 or VMWare has posed a major tradeoff in performance in the past, and VMWare is hard at work getting that penalty down to 10% for VMWare virtualized Hadoop clusters. Project Serengeti was founded with this goal in mind.

Extending lifespan with Hadoop and R

Radek Maciaszek presented Extending lifespan with Hadoop and R, which covered his project to identify aging related genes using R and Hadoop at the UCL Institute of Healthy Aging.

Inside Hadoop Development

Hortonworks’ own Steve Loughran presented Inside Hadoop Development.

Thats it for now, I’ll summarize the rest of the day, up next!

Agile Data European Megatour, then Home to Atlanta!

Agile Data hits the road this month, crossing Europe with the good news about Hadoop and teaching Hadoop users how build value from data using Hadoop to build analytics applications.

We’ll be giving out discount coupons to Hadoop Summit Europe, which is March 20-21st in Amsterdam!

  1. 11/3 – Agile Data @ The Warsaw Hadoop Users Group
  2. 11/5 to 11/6 – Attending ApacheCon Europe 2012 in Sinsheim, Germany. Say hello!
  3. 11/7 – Agile Data @ The France Hadoop Users Group in Paris
  4. 11/8 – Agile Data @ Netherlands Hadoop Users Group in Utrecht
  5. 11/12 – Agile Data @ Hadoop Users Group UK in London.
  6. 11/13 – Agile Data @ HP Labs in Bristol, England.
  7. 11/15 – Agile Data @ The combined Data Science ATL / Atlanta Hadoop Users Group

  8. 11/16 – Agile Data @ The Emory Library
  9. 11/19 – Agile Data @ The Atlanta MongoDB Users Group

I’m writing this from Warsaw, the first stop on my tour. This is my first time in Poland, and I’m excited to be speaking tonight at the Warsaw HUG and look forward to hearing about Hadoop in Poland. Tomorrow I’ll be checking out the sites, so let me know if you’d like to volunteer as tourguide in exchange for free, on the spot consulting!

You can view the incomplete book on O’Reilly OFPS here – I’ll be updating it daily for the next three weeks, so check Chapter 10, where I use Pig to build a graphical model in an attempt to improve my wife’s response rate to my emails :) . Code examples for the book are available here on github.

If you can’t make one of the talks, check out the slides below from my DC-HUG presentation, and help spread the good news!



DINOSAURS ARE REAL: Microsoft WOWs audience with HDInsight at Strata NYC (Hortonworks Inside)

You don’t see many demos like the one given by Shawn Bice (Microsoft) today in the Regent Parlor of the New York Hilton, at Strata NYC. “Drive Smarter Decisions with Microsoft Big Data,” was different.

For starters – everything worked like clockwork. Live demos of new products are notorious for failing on-stage, even if they work in production. And although Microsoft was presenting about a Java-based platform at a largely open-source event… it was standing room only, with the crowd overflowing out the doors.

Shawn demonstrated working with Apache Hadoop from Excel, through Power Pivot, to Hive (with sampling-driven early results!?) and out to import third party data-sets. To get the full effect of what he did, you’re going to have to view a screencast or try it out but to give you the idea of what the first proper interface on Hadoop feels like…

There was a comedian who had a bit about… remember when you first saw Jurassic Park for the first time? No matter how old you were, your child-like response was, “DINOSAURS ARE REAL!!!!!!$!!$##!” Our reaction to Jurassic Park was CGI technology disrupting cinema, provoking the same kind of reaction early cinema had on viewers who felt real concern that the horse or train approaching would run them over. At least thats what I learned wasting a lottery-funded academic scholarship on film classes at a state university before having the good sense to fail out and use my time productively.

That feeling you got when you saw your first CGI raptor is what Microsoft’s demo was like, except it went… “HADOOP IS IN EXCEL!!$%!%!%!$????!!!”

This is a serious thing for me, because I hooked up Pig and Excel years ago:

Which is a crappy demo of Hadoop connecting to Excel, but which gives me mucho moral authority to state that Microsoft’s demo was the right way to hook data to Excel. Take it from someone that spent half of his twenties trying to build web applications that could compete against Excel: until data is in Excel… it ain’t real. With Microsoft’s new offering… big data just got real.

To put this into perspective:

And just so you know I’m not bullshitting you about Hadoop and Big Data and Raptors and next thing you know you’re checking for your wallet and nodding awkwardly and trying to find a pause in this lunatic rant to get the hell out of here, I’ll just come out and tell you:

I have a raptor named lame-o-saurus in a Cowboy Curtis hat permanently tattood on my body. Again, we resort to visualization (mind the hair):

To summarize:

  1. I am the world’s primary authority on the wrong way to hook Hadoop to Excel.
  2. I have strange tattoos which affirm the validity of my metaphors.
  3. Microsoft has fundamentally altered Big Data with their HDInsights offering.
  4. Yesterday, a breakthrough happened in the Regent Parlor of the Hilton, NYC.

Visicalc… we’ve come such a long way.

Go to page:123