Posts by Russell Jurney:


Strata NYC Reporting: Monday @ Big Data Camp, Tuesday @ Strata Retrospective

This is Russell Jurney, your Big Data reporter on the ground here at Strata NYC/Hadoop World at the New York Hilton. Monday night’s main event was Big Data Camp. As in any unconference, the best action was in the hallway, meeting people you only know by reputation or from twitter. Highlights were:

  • Microsoft’s demonstration of Excel -> Power Pivot -> Hortonworks Data Platform
  • In light of today’s announcement – the Hadoop market just got MUCH bigger :)

  • Druid: Real-Time Analytics at a Billion Rows Per Second by Eric Tschetter, Co-founder of Metamarkets
  • In-RAM stores are an interesting new development as RAM becomes cheaper and cheaper, and can augment a Hadoop-centric workload.

  • The Horrors Hidden in Your Models by Steven Hillion
  • This talk stressed the importance of unit testing your statistical models and finding areas where they fall-over, then working with customers to understand the problem. A humorous use-case involving a hoax ‘finger-in-chili’ incident was examined.

Tuesday’s tutorial sessions were great. My favorites were:

Check back tomorrow for coverage of Wednesday’s technical sessions!

Miss Piggy Takes Manhattan: Pig Meetup at Strata NYC on Wed, Oct 24th

There will be a Pig meetup at Strata NYC/Hadoop World, at 6:30PM on Wed, Oct 24th in the Bryant Room of the Hilton New York. This will also be the inaugural meeting of the NYC Pig User Group, which Doug Daniels of Pig contributor Mortar Data was good enough to organize. We look forward to future Pig meetups in NYC!

Hortonworks’ own Daniel Dai @daijy, VP of Apache Pig, will present on new features in Pig 0.11. You can view a summary of JIRA tickets for Pig 0.11 here. New features include the CUBE operator, a new RANK operator, the addition of a DateTime type, speed improvements via SchemaTuple, and many others.

More information is available on the Pig meetup page: http://www.meetup.com/PigUser/events/85047782/.

Those of you too young to understand the Miss Piggy reference, should look here.

YARN Meetup at Hortonworks on Friday, Oct 12

Hortonworks is hosting an Apache YARN Meetup on Friday, Oct 12, to solicit feedback on the YARN APIs. We’ve talked about YARN before in a four-part series on YARN, parts one, two, three and four.

YARN, or “Apache Hadoop NextGen MapReduce,” has come a long way this year. It is now a full-fledged sub-project of Apache Hadoop and has already been deployed on a massive 2,000 node cluster at Yahoo. Many projects, both open-src and otherwise, are porting to work in YARN such as Storm, S4 and many of them are in fairly advanced stages. We also have several individuals implementing one-off or ad-hoc application on YARN.

This meetup is a good time for YARN developers to catch up and talk more about YARN, it’s current status and medium-term and long-term roadmap.

Agenda includes:

  • YARN committers from Yahoo will present on current YARN deployments at Yahoo, including lessons learned, stability, etc.
  • Hortonworks YARN committers will talk about upcoming features such as RM Restart, Container Re-use for MR, Multi-resource scheduling etc.
  • Chris Riccomini from LinkedIn will talk about his experiences building new applications on top of YARN.

A WebEx session will be available, so attendees from all over the world can participate. Follow the meetup page for more information and updates to the agenda.

If you would like to add to the agenda, please get in touch with Arun, or leave a comment in the meetup page.

More information is available on meetup.com here: http://www.meetup.com/Hadoop-Contributors/events/85353562/.

Pig Macro for TF-IDF Makes Topic Summarization 2 Lines of Pig

In a recent post we used Pig to summarize documents via the Term-Frequency, Inverse Document Frequency (TF-IDF) algorithm.

In this post, we’re going to turn that code into a Pig macro that can be called in one line of code:

1
2
import 'tfidf.macro';
my_tf_idf_scores = tf_idf(id_body, 'message_id', 'body');

Our macro, in filename tfidf.macro looks just like our pig script, with a couple of new lines. Note the use of macro variables for input and output preceded with the ‘$’ character: $in_relation, $out_relation, $id_field and $text_field. These let us apply the variable to any relation with a unique identifier field and a text body field. You can get it on github here. The file which tests the macro is here. The code that the macro generates is here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DEFINE tf_idf(in_relation, id_field, text_field) RETURNS out_relation {
  token_records = foreach $in_relation generate $id_field, FLATTEN(TOKENIZE($text_field)) as tokens;
 
  /* Calculate the term count per document */
  doc_word_totals = foreach (group token_records by ($id_field, tokens)) generate 
    FLATTEN(group) as ($id_field, token), 
    COUNT_STAR(token_records) as doc_total;
 
  /* Calculate the document size */
  pre_term_counts = foreach (group doc_word_totals by $id_field) generate
    group AS $id_field,
    FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total), 
    SUM(doc_word_totals.doc_total) as doc_size;
 
  /* Calculate the TF */
  term_freqs = foreach pre_term_counts generate $id_field as $id_field,
    token as token,
    ((double)doc_total / (double)doc_size) AS term_freq;
 
  /* Get count of documents using each token, for idf */
  token_usages = foreach (group term_freqs by token) generate
    FLATTEN(term_freqs) as ($id_field, token, term_freq),
    COUNT_STAR(term_freqs) as num_docs_with_token;
 
  /* Get document count */
  just_ids = foreach $in_relation generate $id_field;
  ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
 
  /* Note the use of Pig Scalars to calculate idf */
  $out_relation = foreach token_usages {
    idf    = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
    tf_idf = (double)term_freq * idf;
    generate $id_field as $id_field,
      token as score,
      (chararray)tf_idf as value:chararray;
  };
};

Note that to debug macros, we can use the -r flag, which will expand the code the macro generates into a .expanded file. It is worth pointing out that this takes us from 37 lines of Pig to 2 lines of pig. Macros facilitate code modularization, re-use and sharing.

Are you sharing enough Hadoop code in your enterprise?

JSONize Anything in Pig with ToJson

The need for a ToJson EvalFunc

When integrating Pig with different NoSQL ‘databases,’ or when publishing data from Hadoop, it can be convenient to JSONize your data. Although Pig has JsonStorage, there hasn’t been a ToJson EvalFunc. This has been inconvenient, as in our post about Pig and ElasticSearch, such that for creating JSON for ElasticSearch to index, tricks like this were necessary:

1
2
3
4
5
6
store enron_emails into '/tmp/enron_emails_elastic' using JsonStorage();
json_emails = load '/tmp/enron_emails_elastic' AS (json_record:chararray);
 
/* Now we can store our email json data to elasticsearch for indexing with message_id. */
store json_emails into 'es://enron/email?json=true&size=1000' USING
  com.infochimps.elasticsearch.pig.ElasticSearchStorage('/me/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/elasticsearch-0.18.6/plugins')

Note how we store as JSON via JsonStorage, then load as a chararray to get the entire record as JSON. It would be more convenient to convert Pig bags and tuples to JSON directly. This would let us retain an ID field as key, and only JSONize our record for that key as a string.

ToJson to the Rescue!

That is precisely what the ToJson method of pig-to-json does. It takes a bag or tuple or nested combination thereof and returns a JSON string.

Our data looks like so:

1
2
3
4
5
6
7
8
9
emails: {message_id: chararray,
         date: chararray,
         from: (address: chararray,name: chararray),
         subject: chararray,
         body: chararray,
         tos: {ARRAY_ELEM: (address: chararray,name: chararray)},
         ccs: {ARRAY_ELEM: (address: chararray,name: chararray)},
         bccs: {ARRAY_ELEM: (address: chararray,name: chararray)}
}

We can JSONify bags:

1
2
3
4
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = limit emails 10;
json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;
dump json_test

Which gets us JSON arrays of JSON objects:

1
2
3
4
5
6
(<589.1075842593084.JavaMail.evans@thyme>,[{"address":"gerald.nemec@enron.com","name":null}])
(<614.1075847580822.JavaMail.evans@thyme>,[{"address":"jfichera@saberpartners.com","name":null},{"address":"barry.goode@gov.ca.gov","name":null},{"address":"hoffman@blackstone.com","name":null},{"address":"joseph.fichera@gov.ca.gov","name":null}])
(<735.1075840186524.JavaMail.evans@thyme>,[{"address":"kam.keiser@enron.com","name":"Kam Keiser"},{"address":"mike.grigsby@enron.com","name":"Mike Grigsby"}])
(<758.1075842602845.JavaMail.evans@thyme>,[{"address":"joan.quick@enron.com","name":null}])
(<765.1075860359973.JavaMail.evans@thyme>,[{"address":"jay_dudley@pgn.com","name":null},{"address":"michele_farrell@pgn.com","name":null}])
...

We can JSONify tuples:

1
2
3
4
emails2 = load '/me/Data/enron.avro' using AvroStorage();
emails2 = limit emails2 10;
json_test2 = foreach emails2 generate message_id, com.hortonworks.pig.udf.ToJson(from) as tuple_json;
dump json_test2

Which gets us JSON objects:

1
2
3
4
5
(<28.1075842613917.JavaMail.evans@thyme>,{"address":"emmye@dracospring.com","name":"\"Emmye\""})
(<85.1075854368299.JavaMail.evans@thyme>,{"address":"darron.giron@enron.com","name":null})
(<167.1075851646300.JavaMail.evans@thyme>,{"address":"jeff.dasovich@enron.com","name":"Jeff Dasovich"})
(<185.1075857304356.JavaMail.evans@thyme>,{"address":"chris.dorland@enron.com","name":"Chris Dorland"})
(<735.1075840186524.JavaMail.evans@thyme>,{"address":"m..love@enron.com","name":"Phillip M. Love"})

And we can JSONify more complex, nested structures:

1
2
3
4
5
-- This works for arbitrarily complex data structures as well
a = foreach (group emails by from.address) generate group as from_address, COUNT_STAR(emails) as sent_count, FLATTEN(ema.tos) as tos;  
b = group a by from_address;
c = foreach b generate group as from_address, com.hortonworks.pig.udf.ToJson(a) as json_test;
store c into '/tmp/big_test_num';

Which gets us an array of JSON objects where one field refers to an array of other objects, and one field stores a number:

1
2
3
4
5
6
bc@ori.org	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"bc@ori.org"}]
ben@crs.hn	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"ben@crs.hn"}]
cba@uh.edu	[{"tos":[{"address":"rod.hayslett@enron.com","name":null}],"sent_count":1,"from_address":"cba@uh.edu"}]
cei@uh.edu	[{"tos":[{"address":"ceiinfo@uh.edu","name":"Shena Cherian"}],"sent_count":1,"from_address":"cei@uh.edu"}]
nc@mmf.com	[{"tos":[{"address":"kenneth.lay@enron.com","name":null}],"sent_count":1,"from_address":"nc@mmf.com"}]
nyb@uni.de	[{"tos":[{"address":"extramoney@mailman.enron.com","name":null}],"sent_count":1,"from_address":"nyb@uni.de"}]

We are JSONizing fields like crazy. Its madness! Woohoo!

You can download the project on github here: https://github.com/rjurney/pig-to-json. To build it, you can:

1
ant clean; ant dist

ToJson Internals

Setting up a Pig EvalFunc project is easy. I copied my ivy.xml from some place, to import dependencies from maven:

1
2
3
4
5
6
7
8
9
<ivy-module version="2.0">
  <info organisation="org.apache" module="hello-ivy"/>
  <dependencies>
    <dependency org="com.googlecode.json-simple" name="json-simple" rev="1.1"/>
    <dependency org="org.apache.pig" name="pig" rev="0.8.0"/>
    <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
    <dependency org="commons-lang" name="commons-lang" rev="2.5"/>
  </dependencies>
</ivy-module>

I also copied my build.xml from some place too, and edited a few values to make it work with my class. The meat of the UDF is in ToJson.java, which I copied from some place else and edited. There’s an interesting bit here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public Schema outputSchema(Schema inputSchema) {
 
        // We must strip the outer {} or this schema won't parse on the back-end
        // This is a trick until PIG-2910 is fixed. Thanks to Thejas for this!
        String schemaString = inputSchema.toString().substring(1, inputSchema.toString().length() - 1);
 
        // Set the input schema for processing
        UDFContext context = UDFContext.getUDFContext();
        Properties udfProp = context.getUDFProperties(this.getClass());
 
        udfProp.setProperty("horton.json.udf.schema", schemaString);
 
        // Construct our output schema which is one field, that is a chararray
        return new Schema(new FieldSchema(null, DataType.CHARARRAY));
    }

We use EvalFunc.outputSchema to pass our input’s schema from Pig’s front end to its distributed back-end using the UDFContext’s properties. Our output schema is always a chararray, as this is ToJson after all. We retrieve the input schema in our Evalfunc.exec method:

1
2
3
4
5
6
7
schema = Utils.getSchemaFromString(strSchema);
...
// Parse the schema from the string stored in the properties object.
Object field = input.get(0);
Object jsonObject = fieldToJson(field, schema.getFields().get(0));
String json = jsonObject.toString();
return json;

Pig UDFs are pretty easy to write, as you can see! Be lazy. Find an apache licensed pig UDF on github or in Piggybank and make it your own!

InfoQ: Hadoop and Metadata (Removing the Impedance Mis-match)

InfoQ has an article out today on HCatalog by Hortonworks’ own Alan Gates and Russell Jurney.

Apache Hadoop enables a revolution in how organization’s process data, with the freedom and scale Hadoop provides enabling new kinds of applications building new kinds of value and delivering results from big data on shorter timelines than ever before. The shift towards a Hadoop-centric mode of data processing in the enterprise has however posed a challenge: how do we collaborate in the context of the freedom that Hadoop provides us? How do we share data which can be stored and processed in any format the user desires? Furthermore, how do we integrate between different tools and with other systems that make-up data-center as computer?

Check out the article at InfoQ: http://www.infoq.com/articles/HadoopMetadata

Search Hadoop with Search-Hadoop.com

As the Hadoop ecosystem has exploded into many projects, searching for the right answers when questions arise can be a challenge. Thats why I was thrilled to hear about search-hadoop.com, from Sematext. It has a sister site called search-lucene where you can… search lucene!

Search-Hadoop.com searches across projects – JIRAs, source code, mailing lists, wikis, etc. so you can see design and API docs, as well as questions, answers and general documentation. Filtering by project is a big help – but search-hadoop also lets you see the similarities between projects.

Search Hadoop runs on Solr 3.6.1, but will be moving to Solr 4.0 this Fall. Solr 4.0, aka SolrCloud, is a fully distributed version of Solr (indices are sharded and replicated) that uses ZooKeeper for coordination.

The autocomplete feature is particularly cool. It offers several groups of suggestions separated by a lovely thin pink line, so one can easily pick the suggestion to follow. The motivation is that people searching for info often have an idea what type of content they want to see – issues, ML messages, wiki pages, etc.

A couple of cool features: You can also search by author by clicking on the author name in search results. e.g. http://search-hadoop.com/?q=&fc_author=Russell+Jurney. Queries starting with project names are automatically limited to the project name, e.g. http://search-hadoop.com/?q=pig+join will show only results from Pig.

HCatalog Meetup at Twitter

Representatives from Twitter, Yahoo, LinkedIn, Hortonworks and IBM met at Twitter HQ on Thursday to talk HCatalog. Committers from HCatalog, Pig and Hive were on hand to discuss the state of HCatalog and its future.

Apache HCatalog is a table and storage management service for data created using Apache Hadoop.

A central theme was using HCatalog to enable sharing and use of legacy data and diverse formats like TSV, JSON, RCFile, Protobuf, Thrift and Avro, among diverse tools like Pig, Hive, Cascading, SQL-H and JAQL.

A key issue discussed were the mechanics of HCatalog’s integration with Hive as the project develops and matures. Some HCatalog users use Hive, and some do not – but HCatalog relies on the Hive metastore regardless. As usual in open source, each organization has its own set of problems, perspectives and priorities, and the discussion centers around commonalities in finding a common path forward.

One thing was clear: HCatalog is HOT! An increasing number of organizations are adopting HCatalog for managing data and systems integration around Hadoop.

Pig as Duct Tape, Part Three: TF-IDF Topics with Cassandra, Python Streaming and Flask

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use and their environment variables are available at https://github.com/rjurney/enron-python-flask-cassandra-pig and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part one and two can get you started using Pig if you’re not familiar.

Introduction

In this post, we are going to mine topics from the Enron email message bodies via TF-IDF using Pig Streaming with Python’s NLTK library and store them in Cassandra to be served in a REST API via Python and Flask.

Cassandra

Apache Cassandra is a multi-node, linearly scalable column store capable of serving intense I/O loads while remaining available for batch processing with Hadoop. In this example, we’ll be using it in local mode.

An excellent getting started guide for Cassandra is available in the Pygmalion project’s Getting-Started Guide, and more information on building Cassandra is available in the Cassandra Wiki. An excellent presentation on Pig and Cassandra is here.

You can download Cassandra here: http://www.apache.org/dyn/closer.cgi?path=/cassandra/1.1.5/apache-cassandra-1.1.5-src.tar.gz

1
2
3
4
5
wget http://mirror.olnevhost.net/pub/apache/cassandra/1.1.5/apache-cassandra-1.1.5-src.tar.gz
tar -xvzf apache-cassandra-1.1.5-src.tar.gz
cd apache-cassandra-1.1.5-src
ant
sudo bin/cassandra -f

Now you can monitor Cassandra in that shell. Further, you may access the Cassandra CLI via bin/cassandra-cli. The help command is helpful. Lets setup a keyspace and column family for our data.

1
2
3
4
5
6
7
8
$ bin/cassandra-cli
 
create keyspace enron;
use enron;
create column family email with
    comparator = UTF8Type and
    default_validation_class = UTF8Type and
    key_validation_class = UTF8Type;

CassandraStorage

Cassandra includes support for loading and storing data using Pig via the CassandraStorage UDF (User Defined Function).

CassandraStorage expects certain environment variables be set. You can edit and run env.sh to setup your environment.

1
2
3
4
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
export CASSANDRA_HOME=/me/Software/apache-cassandra-1.1.5-src

Once you build Cassandra and setup your environment, you can load the relevant jars for CassandraStorage in Pig.

1
2
3
4
5
register /me/Software/apache-cassandra-1.1.5-src/build/apache-cassandra*.jar
register /me/Software/apache-cassandra-1.1.5-src/lib/*.jar
register /me/Software/apache-cassandra-1.1.5-src/build/lib/jars/*.jar
 
define CassandraStorage org.apache.cassandra.hadoop.pig.CassandraStorage();

The Enron Emails as Avros

As usual, we’ll be working with the Enron emails in Avro format, available here. Note that we limit the data to 100 emails to speed processing up in local mode.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* AvroStorage */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
 
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
 
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = filter emails by message_id is not null;
 
/* Limit the count of emails we process if we are working in local mode */
emails = limit emails 100;
 
id_body = foreach emails generate message_id, body;

Munging Message Bodies with Python Streaming and NLTK

Pig Streaming lets you send Pig relations through arbitrary executables – even simple shell commands. Pig doesn’t care. Pigs eat anything. The syntax is simple enough:

1
2
define test_stream `token_extractor.py` SHIP ('token_extractor.py');
cleaned_words = stream id_body through test_stream as (message_id:chararray, token_strings:chararray);

Much of the power of Python for data processing lies with its nltk and scipy libraries… and of course, with regular expressions. There is an excellent book on Python’s NLTK or the “Natural Language ToolKit.” I’ve created a Python class called TokenExtractor that cleans up the body of the email so that it can be split on whitespace for tokenization by Pig’s TOKENIZE. Nothing special here, just munging and nltk.tokenize. To learn more about data munging and data science, check out Mike Driscoll’s (@medriscoll) post, ‘Three Sexy Skills of Data Geeks.

1
2
3
4
5
6
7
8
9
10
11
def main():
  te = TokenExtractor()
  for line in sys.stdin:
    message_id, body = line.split('\t')
    tokens = te.tokenize(body)
    no_urls = te.remove_urls(tokens)
    lowers = te.lower(no_urls)
    no_punc = te.remove_punctuation(lowers)
    no_newlines = te.remove_endlines(no_punc)
    no_shorts = te.short_filter(no_newlines)
    print message_id + "\t" + " ".join(no_shorts)

Next I use Pig’s TOKENIZE to generate the actual tokens. This is a little unusual – tokenize once in our Python, tokenize again, but it works – so ship it! Python cleans the strings and Pig creates tuples for us, which is convenient. This gives us pairs of message_ids and tokens.

1
token_records = foreach cleaned_words generate message_id, FLATTEN(TOKENIZE(token_strings)) as tokens;

Note that we have trimmed all fields besides message_id and tokens. We need to limit our data size to ensure efficiency, as we have just exploded our data to be larger than it originally was – every single word in every single message body now has an RFC-2822 message ID attached to it.

Term Frequency-Inverse Document Frequency (TF-IDF) in Apache Pig

Now that we’ve got tokens (words) and their message ids, we’re going to calculate relevant topics per email with a simple algorithm called ‘Term Frequency/Inverse Document Frequency.’ Which is to say we’ll compute the top N sorted TF-IDF values for each email – to see what they’re about!

Wikipedia is very informative on Tf*idf. An excellent tutorial on Tf-idf in Pig is available here, and we borrow from it. We should all thank Jacob Perkins, @thedatachef for all the awesome Pig code on his blog and in github.

We will be using the words ‘token’, ‘term’, and ‘word’ interchangeably below.

  1. Term Counts per Document
  2. First we group by document and token and take a count of the number of times a word is used in each document.

    1
    2
    3
    4
    
    /* Calculate the term count per document */
    doc_word_totals = foreach (group token_records by (message_id, tokens)) generate 
                        flatten(group) as (message_id, token), 
                        COUNT_STAR(token_records) as doc_total;
  3. Document Size: Words per Document
  4. Next we calculate the size of a document in terms of words – to normalize the term counts above. Naturally, if a document is long a single occurrence of a word is less significant than it would be in a shorter document. We group by message ID and take a SUM of all word counts for that document. We use FLATTEN to project that document total with the per-token counts we just calculated so that we can normalize these counts in the next step.

    1
    2
    3
    4
    5
    
    /* Calculate the document size */
    pre_term_counts = foreach (group doc_word_totals by message_id) generate
                        group AS message_id,
                        FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total), 
                        SUM(doc_word_totals.doc_total) as doc_size;
  5. Term Frequency per Document
  6. Now we divide term count by document length to arrive at term frequency per document – our numerator.

    1
    2
    3
    4
    
    /* Calculate the TF */
    term_freqs = foreach pre_term_counts generate message_id as message_id,
                   token as token,
                   ((double)doc_total / (double)doc_size) AS term_freq;
  7. Count of Documents with each Term
  8. Now we start on our denominator. We group our term frequencies by term, and count the documents containing each term. We project this document count with the term frequencies themselves for easy calculation in our next step.

    1
    2
    3
    4
    
    /* Get count of documents using each token, for idf */
    token_usages = foreach (group term_freqs by token) generate
                     FLATTEN(term_freqs) as (message_id, token, term_freq),
                     COUNT_STAR(term_freqs) as num_docs_with_token;
  9. Total Document Count
  10. Our next step requires a document count, which we achieve with a GROUP ALL and a count.

    1
    2
    3
    
    /* Get document count */
    just_ids = foreach emails generate message_id;
    ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
  11. Calculate TF-IDF
  12. Finally, we calculate TF-IDF using the relations we’ve prepared. Note the use of a Pig scalar to calculate our IDF term – which is the LOG of the total document count divided by the number of documents with that token.

    Also note that we took a shortcut and cast our tf_idf score as a chararray to simplify our Cassandra schema. Finally, we used the field names score and value, which CassandraStorage expects.

    1
    2
    3
    4
    5
    6
    7
    8
    
    /* Note the use of Pig Scalars to calculate idf */
    tfidf_all = foreach token_usages {
      idf = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
      tf_idf = (double)term_freq * idf;
      generate message_id as message_id,
        token as score,
        (chararray)tf_idf as value:chararray;
    };
  13. Prepare and Store our Data in Cassandra
  14. Finally, we store the top ten terms by tf-idf score per document in Cassandra, keyed by the message ID.

    1
    2
    3
    4
    5
    6
    7
    8
    
    /* Get the top 10 Tf*Idf scores per message */
    per_message_cassandra = foreach (group tfidf_all by message_id) {
      sorted = order tfidf_all by value desc;
      top_10_topics = limit sorted 10;
      generate group, top_10_topics.(score, value);
    }
     
    store per_message_cassandra into 'cassandra://enron/email_topics' USING CassandraStorage();
  15. Sample Message IDs to Query Cassandra
  16. We need a key (message id) to fetch our values in Cassandra. So we grab ten records and dump them to console.

    1
    2
    3
    
    /* This will give you some message_id keys to fetch in Cassandra, and some message bodies to compare topics to. */
    samples = limit just_ids 10;
    dump samples;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    (<28.1075842613917.JavaMail.evans@thyme>)
    (<85.1075854368299.JavaMail.evans@thyme>)
    (<167.1075851646300.JavaMail.evans@thyme>)
    (<185.1075857304356.JavaMail.evans@thyme>)
    (<431.1075859137859.JavaMail.evans@thyme>)
    (<589.1075842593084.JavaMail.evans@thyme>)
    (<614.1075847580822.JavaMail.evans@thyme>)
    (<735.1075840186524.JavaMail.evans@thyme>)
    (<758.1075842602845.JavaMail.evans@thyme>)
    (<765.1075860359973.JavaMail.evans@thyme>)

Verify our Data with Cassandra CLI

We can verify our data is present in Cassandra with cassandra-cli.

1
2
cd $CASSANDRA_HOME
bin/cassandra-cli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[default@unknown] use enron;
Authenticated to keyspace: enron
[default@enron] get email_topics['<431.1075859137859.JavaMail.evans@thyme>'];
=> (column=bankruptcy, value=0.02577520626485872, timestamp=1348084758725000)
=> (column=end, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=left, value=0.024021461379613435, timestamp=1348084758725000)
=> (column=palmer, value=0.017183470843239148, timestamp=1348084758725000)
=> (column=party, value=0.05155041252971744, timestamp=1348084758725000)
=> (column=phillip, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=pl, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=plove, value=0.02577520626485872, timestamp=1348084758725000)
=> (column=tonight, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=your, value=0.017969743348148298, timestamp=1348084758725000)
Returned 10 results.
Elapsed time: 4 msec(s).

Note that the more documents we use (remember our LIMIT statement up top), the better topics we will get.

Pygmalion

Note that the Pygmalion project by Jeremy Hanna @jeromatron helps convert back and forth between Pig relations and Cassandra columns. Jeremy is an excellent open source citizen and was helpful with this post. Shout outs to Jeremy!

Accessing Cassandra from Python with pycassa

Python connects to Cassandra via the pycassa project. A good tutorial is available here. You can install pycassa with pip or easy_install. On Mac OS X Lion, the pip install failed so I used easy_install:

1
easy_install pycassa

Using Cassandra with pycassa is easy:

1
2
3
4
5
6
7
import pycassa
 
pool = pycassa.ConnectionPool('enron')
cf = pycassa.ColumnFamily(pool, 'email_topics')
 
cf.get('<431.1075859137859.JavaMail.evans@thyme>') # Replace me
# OrderedDict([(u'bankruptcy', u'0.02577520626485872'), (u'end', u'0.018016096034710077'), (u'left', u'0.024021461379613435'), (u'palmer', u'0.017183470843239148'), (u'party', u'0.05155041252971744'), (u'phillip', u'0.018016096034710077'), (u'pl', u'0.018016096034710077'), (u'plove', u'0.02577520626485872'), (u'tonight', u'0.018016096034710077'), (u'your', u'0.017969743348148298')])

Serving Topics as JSON via a REST API with Flask

The Flask project is a lightweight web application framework in Python. Getting started with Flask is easy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask
import pycassa
import json
 
pool = pycassa.ConnectionPool('enron')
cf = pycassa.ColumnFamily(pool, 'email_topics')
 
app = Flask(__name__)
 
@app.route("/message/topics/<message_id>")
def topics(message_id):
  return json.dumps(cf.get(message_id))
 
if __name__ == "__main__":
  app.run(debug=True)

Finally, navigate your browser to your new web service for Enron email topics: http://localhost:5000/message/topics/%3C431.1075859137859.JavaMail.evans@thyme%3E. You may have to use a message_id printed out in console on your system, as we may have limited different sub-sets of the set in our respective Pig scripts.

Conclusion

We’ve used Pig to extract structured topics and scores from unstructured text via a simple algorithm that we’ve implemented in a distributed manner on top of Hadoop… without knowing or thinking about MapReduce. We used arbitrary tools to cleanup text via Pig streaming. We’ve used Pig to store that data into yet another distributed system, Cassandra, and we’ve served it as simple JSON it via a web service using Python and Flask.

Hadoop excels at extracting structured data from unstructured data, and Pig is fine duct tape indeed!

Hadoop Features Large at Stanford XLDB

Hadoop featured prominently at Stanford’s annual XLDB conference last week, as representatives from academia and industry gathered to discuss Extremely Large Databases. The conference program, with slides are available: http://www-conf.slac.stanford.edu/xldb2012/ProgramC.asp. A highly technical lineup presented on Big Data in biology and physics, and cloud computing and Hive in particular were topic areas.

Hortonworks’ own Ashutosh Chauhan @ashutoshchauhan, an Apache Pig, Hive and HCatalog committer, presented ‘Hive vs Pig: Similarities and Differences‘ (slides).

Twitter Analytics Presents Hadoop and Pig at UC Berkeley

Twitter Analytics presented their distributed infrastructure, including Hadoop and Pig, at a UC Berkeley iSchool special course called INFO 290: Analyzing Big Data with Twitter. Twitter is a major contributor to many Apache projects. The course was over-subscribed and was a great success, as students got to learn from practicing data scientists using Hadoop on truly massive datasets. The entire lecture series is available here.

Bill Graham @billgraham, a Data Systems Engineer at Twitter Analytics and Apache Pig committer, presented an Introduction to Hadoop. His slides are available here. His presentation gives a comprehensive introduction to Apache Hadoop including its history, motivation, practice and operation.

Jonathan Coveney @jco, a Data Systems Engineer at Twitter Analytics and Apache Pig committer, presented Pig at Twitter. Slides for this presentation are available here. His presentation gives a comprehensive explanation of Apache Pig‘s philosophy, use and intricacies. It is one of the most thorough introductions to Pig I’ve seen and will serve as excellent documentation for beginners and intermediate Pig users alike.

Hats off to Twitter for their contribution to Apache open source and education. More Pig talks and papers are available on the Pig Confluence here.

Four New Installments in ‘The Future of Apache Hadoop’ Webinar Series

During the ‘Future of Apache Hadoop’ webinar series, Hortonworks founders and core committers will discuss the future of Hadoop and related projects including Apache Pig, Apache Ambari, Apache Zookeeper and Apache Hadoop YARN.

Apache Hadoop has rapidly evolved to become the leading platform for managing, processing and analyzing big data. Consequently there is a thirst for knowledge on the future direction for Hadoop related projects. The Hortonworks webinar series will feature core committers of the Apache projects discussing the essential components required in a Hadoop Platform, current advances in Apache Hadoop, relevant use-cases and best practices on how to get started with the open source platform. Each webinar will include a live Q&A with the individuals at the center of the Apache Hadoop movement.

This four-part webinar series is now open for registration, and the schedule will include:

  • Wednesday, September 12 at 10:00 a.m. PT / 1:00 p.m. ET
  • Pig Out on Hadoop
    With: Alan Gates, Hortonworks founder and contributor to Apache Pig and HCatalog projects.
    Register here.

  • Wednesday, September 26 at 10:00 a.m. PT / 1:00 p.m. ET
  • Deployment and Management of Hadoop Clusters with Ambari
    With: Matt Foley, committer and PMC member of the Apache Hadoop Project and member of Technical Staff at Hortonworks.
    Register here.

  • Wednesday, October 17 at 10:00 a.m. PT / 1:00 p.m. ET
  • Scaling Apache Zookeeper for the Next Generation of Hadoop Applications
    With: Mahadev Konar, Hortonworks founder and contributor to the Apache Pig and HCatalog projects
    Register here.

  • Wednesday, October 31 at 10:00 a.m. PT / 1:00 p.m. ET
  • YARN: The Future of Data Processing with Apache Hadoop
    With: Arun C. Murthy, Hortonworks founder and VP of Apache Hadoop at Apache Software Foundation, the lead of the MapReduce project and YARN.
    Register here.

For more information, please register.

Previous webinars on “The Future of Apache Hadoop” are available here.

A press release is available here.

Click to Tweet: @Hortonworks unveils four new live webinars, with Q&A, on “The Future of Apache Hadoop” series http://bit.ly/OM0XpE #BigData #Hadoop

Recap of the August Pig Hackathon at Hortonworks

The August Pig Hackathon brought Pig users from Hortonworks, Yahoo, Cloudera, Visa, Kaiser Permanente, and LinkedIn to Hortonworks HQ in Sunnyvale, CA to talk and work on Apache Pig.

hackers hacking away at the august 2012 pig hackathon at Hortonworks in Sunnyvale, CA

Jonathan Coveney and Bill Graham from Twitter walked newer Pig users through how Pig translates a Pig Latin script to map reduce jobs and went over how to read the output of explain. For those interested, Hortonworks founder Alan Gates covers this in Chapter 1 of Programming Pig.

Thejas Nair walked through how to contribute patches to Pig and how to work with committers to get the patches in. You can learn more about this on the Pig Wiki.

The group talked through the proposal for a new EvalFunc interface that would make it much easier to write UDFs or User Defined Functions for Pig. Part of what makes Pig so powerful is its extensibility, and making that even easier would make Pig a better tool. A discussion in JIRA ticket PIG-2421 is availble if you want to collaborate on improving Pig’s eval funcs.

Alan Gates presented some thoughts on building a generic DAG (directed acyclic graph) execution and optimization engine that could be used by Pig and Hive and that would take advantage of new features in Hadoop 2.0. This would reduce duplication between the projects as well as allow users to share UDFs between them. We covered using Pig and Hive together and via HCatalog in previous posts.

You don’t have to be a Pig expert to attend a Pig meetup – all levels of proficiency are invited. Committers love to meet new users that appreciate their work. One attendant said, “There were many pig commiters at the meetup. The Twitter and HortonWorks people were very helpful.”

To find out about more Pig meetups, join the Pig User group on meetup. We can’t wait to see you there!

Pig as Hadoop Connector, Part Two: HBase, JRuby and Sinatra

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use are available at https://github.com/rjurney/enron-jruby-sinatra-hbase-pig and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part one of this series on MongoDB is available here: http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/.

Introduction

Hadoop is about freedom as much as scale: providing you disk spindles and processor cores together to process your data with whatever tool you choose. Unleash your creativity. Pig as duct tape facilitates this freedom, enabling you to connect distributed systems at scale in minutes, not hours. In this post we’ll demonstrate how you can turn raw data into a web service using Hadoop, Pig, HBase, JRuby and Sinatra. In doing so we will demonstrate yet another way to use Pig as connector to publish data you’ve processed on Hadoop.

Apache HBase is the Hadoop database. It has emerged as the dominant database used with Hadoop, supporting billions of rows and millions of columns. HBase is a highly available, fast column store, supporting realtime workloads while providing easy access to your data to and from Hadoop via scans. A really great introduction to HBase’s data model is here: http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable.

JRuby is Ruby implemented in Java. JRuby has emerged as an invaluable tool in helping enterprises with aging Java infrastructures bring new life into their codebase by wrapping complex Java with simple Ruby to provide more productive interfaces for application developers. Using JRuby in this example also helps us to learn the HBase Java APIs, as we’ll be calling them directly from JRuby. We’ll be using JRuby to serve data from HBase as a web service, with a lightweight, ‘no frills’ web framework called Sinatra.

Booting HBase

An excellent HBase quickstart tutorial is available here: http://hbase.apache.org/book/quickstart.html. For more detailed information, check out the HBase Reference Guide – a full blown book on Apache HBase. We’ll be booting HBase in local mode for testing. HBase runs on top of Hadoop and Zookeeper in production, but local mode takes care of that for us for experimentation.

We’ll be using the latest stable version of HBase, version 0.94.1.

$ wget http://archive.apache.org/dist/hbase/hbase-0.94.1/hbase-0.94.1.tar.gz
$ tar -xvzf hbase-0.94.1.tar.gz
$ sudo mkdir /var/hbase

Now edit hbase-0.94.1/conf/hbase-site.xml to include our hbase working directory:

&lt;property&gt;
  &lt;name&gt;hbase.rootdir&lt;/name&gt;
  &lt;value&gt;file:///var/hbase&lt;/value&gt;
&lt;/property&gt;

Launch HBase in local mode:

$ cd hbase-0.94.1
$ bin/start-hbase.sh

Now checkout the “Shell Exercises” section of the HBase Book: http://hbase.apache.org/book/quickstart.html#shell_exercises. Lets create a new table for testing. Start the HBase shell (really JIRB under the covers). The help command is there to guide us.

$ bin/hbase shell
...
1.8.7-p352 :002 &gt; help

Lets create our first HBase table called ‘enron’ with a single column family called ‘email’. We might make another column family later for an organizational chart or extracted entities named ‘people.’ Column families are groups of columns.

create 'enron', 'email'
0 row(s) in 1.7900 seconds

Verify that its there with list.

list 'enron'
TABLE                                                                                                                                
enron                                                                                                                                
1 row(s) in 0.0690 seconds

We can put, get and scan records easily. The beauty of HBase is that we can update records in realtime from our application, and then scan them in batch using Hadoop without worrying about stale data.

&gt; put 'enron', 'row1', 'email:address', 'bob@enron.com'
0 row(s) in 0.0190 seconds
&gt; put 'enron', 'row2', 'email:address', 'stevo@enron.com'
0 row(s) in 0.0190 seconds
 
&gt; get 'enron', 'row2'
COLUMN                             CELL                                                                                              
 email:address                     timestamp=1345691920800, value=stevo@enron.com                                                    
1 row(s) in 0.0110 seconds
 
&gt; scan 'enron'
ROW                                COLUMN+CELL                                                                                       
 row1                              column=email:address, timestamp=1345691847565, value=bob@enron.com                                
 row2                              column=email:address, timestamp=1345691920800, value=stevo@enron.com                              
2 row(s) in 0.1190 seconds

Note that HBase doesn’t care what kind of data we store into it, and it returns a timestamp. HBase can hold a history of values for each cell, and we can even use this feature in our applications to store historical data!

Storing Data in HBase with Pig

Pig supports HBase via HBaseStorage. An excellent guide is here, and a here is a good presentation on Pig and HBase at Twitter from 2010.

We need to tell Pig where to find HBase via the HBASE_HOME environment variable.

$ echo 'export HBASE_HOME=/me/hbase-0.94.1' &gt;&gt; ~/.bash_profile
source ~/.bash_profile

We also need to replace the HBase jar distributed with Pig with 0.94.1.

$ rm /me/pig/build/ivy/lib/Pig/hbase-0.90.0.jar
$ cp target/hbase-0.94.1.jar /me/pig/build/ivy/lib/Pig/

Now we can load records in Avro, process them and store them in HBase.

/* Load Avro jars and define shortcut */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
 
/* HBaseStorage libraries */
register /me/pig/build/ivy/lib/Pig/hbase-0.94.1.jar
register /me/pig/build/ivy/lib/Pig/zookeeper-3.3.3.jar
register /me/pig/build/ivy/lib/Pig/guava-11.0.jar
 
/* Load JRuby to validat emails and creat UUIDs for HBase rowIds */
register 'udf.rb' using jruby as udfs;
 
emails = load '/me/tmp/enron.avro' using AvroStorage();
/* Project all unique combinations of from/to for each email to more than one person */
from_to = foreach emails generate from.address as from_address, FLATTEN(tos.(address)) as to_address;
 
/* Group by email from/to pairs and count emails between those addresses.
   Also, generate a UUID for storing rows in HBase. */
by_pair = group from_to by (from_address, to_address);
sent_counts = foreach by_pair generate udfs.uuid() as id, 
                                       FLATTEN(group) as (from_address, to_address), 
                                       COUNT_STAR(from_to) as total_sent;
 
/* Store to the HBase table 'enron' using a UUID as row key with the loadKey option. */
store sent_counts into 'enron' using 
     org.apache.pig.backend.hadoop.hbase.HBaseStorage('address.pairs:from_address address.pairs:to_address address.pairs:total_sent', 'loadKey true');

ILLUSTRATE shows us our dataflow:

-----------------------------------------------------------------------
| from_to     | from_address:chararray     | to_address:chararray     | 
-----------------------------------------------------------------------
|             | jane.mcbride@enron.com     | tana.jones@enron.com     | 
|             | jane.mcbride@enron.com     | tana.jones@enron.com     | 
-----------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| by_pair     | group:tuple(from_address:chararray,to_address:chararray)             | from_to:bag{:tuple(from_address:chararray,to_address:chararray)}                                 | 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|             | (jane.mcbride@enron.com, tana.jones@enron.com)                       | {(jane.mcbride@enron.com, tana.jones@enron.com), (jane.mcbride@enron.com, tana.jones@enron.com)} | 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------
| sent_counts     | id:chararray                         | from_address:chararray     | to_address:chararray     | total_sent:long     | 
----------------------------------------------------------------------------------------------------------------------------------------
|                 | 8d0434e4-b50d-47ac-8278-fc1bb16ad8e4 | jane.mcbride@enron.com     | tana.jones@enron.com     | 2                   | 
----------------------------------------------------------------------------------------------------------------------------------------

Loading Data from HBase with Pig

Loading data from HBase in Pig is easy, and you can pick individual columns or filter the data. Lets look at the top 100 most prolific email relationships, which could be a measure of how strong a relationship is.

/* HBaseStorage shortcut */
register /me/pig/build/ivy/lib/Pig/hbase-0.94.1.jar
register /me/pig/build/ivy/lib/Pig/zookeeper-3.3.3.jar
register /me/pig/build/ivy/lib/Pig/guava-11.0.jar
 
/* Grab the top 100 most prolific email relationships from HBase and dump them. */
address_pairs = LOAD 'hbase://enron4' using 
  org.apache.pig.backend.hadoop.hbase.HBaseStorage('address.pairs:from_address address.pairs:to_address address.pairs:total_sent')
  as (from_address:chararray, to_address:chararray, total_sent:long);
sorted = order address_pairs by total_sent DESC;
top_100 = limit address_pairs 100;
dump top_100
(pete.davis@enron.com,pete.davis@enron.com,4489)
(vince.kaminski@enron.com,vkaminski@aol.com,1143)
(jeff.dasovich@enron.com,susan.mara@enron.com,935)
(jeff.dasovich@enron.com,paul.kaufman@enron.com,879)
...

Pig UDFs in JRuby

Checkout the id:chararray field in the above example. We created that with a JRuby UDF.

Pig added JRuby UDFs in version 0.10.0. Writing UDFs in JRuby is much simpler than in Java. Our UDF class in udf.rb looks like this:

require 'pigudf'
require 'lib/data_utils'
 
# Refer to our Utils class to share JRuby code between Pig and Sinatra
class Udfs &lt; PigUdf  
  outputSchema "uuid:chararray"
  def uuid()
    DataUtils.uuid()
  end
end

Notice how we employ an external utility class, which in turn calls Java’s java.util.UUID.randomUUID().toString() method. We put our code in a utility class called DataUtils so that it might be shared with other applications, like our Sinatra web app. Code sharing between Hadoop and other systems using JRuby is efficient.

Our utility class looks like this:

# Magic line
require 'java'
 
import java.util.UUID
 
class DataUtils
  # Create Unique IDs - code adapted from https://github.com/jdamick/uuid/blob/master/lib/uuid.rb
  def self.uuid()
    self.generate()
  end
 
  def self.generate()
    java.util.UUID.randomUUID().toString()
  end
end

Using ILLUSTRATE lets us see our UDF code run on real data, without waiting on Hadoop jobs to finish. This is great for development!

HBase and JRuby

You can download JRuby at http://jruby.org/download or better yet, install it via rvm, which you can install via the instructions here: https://rvm.io/rvm/install/.

$ rvm install jruby

JRuby can use the Java native HBase client, which is fast and efficient (the HBase shell is actually a modified JRuby Interactive Ruby Shell). Thrift and JSON APIs are provided for other languages. Details about JRuby and HBase are available at http://wiki.apache.org/hadoop/Hbase/JRuby, although the example uses old APIs that we’ve updated for this example.

A great resource to see JRuby in action against HBase is in the HBase shell itself: https://github.com/apache/hbase/tree/trunk/hbase-server/src/main/ruby.

To connect to HBase in JRuby, we’ll need to setup our CLASSPATH to import the HBase jars.

cd $HBASE_HOME
wget http://central.maven.org/maven2/org/jruby/jruby-complete/1.6.7.2/jruby-complete-1.6.7.2.jar
export CLASSPATH=$CLASSPATH:`java -jar $HBASE_HOME/jruby-complete-1.6.7.2.jar -e "puts Dir.glob('$HBASE_HOME/{.,build,lib}/*.jar').join(':')"`

Now lets import the Java HBase client classes in JRuby and connect to HBase. htable.rb from the Hbase Shell is helpful: https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/ruby/hbase/table.rb.

$ jirb

We begin by importing the relevant Java classes into JRuby.

# Adapted from obsolete example at http://wiki.apache.org/hadoop/Hbase/JRuby
 
include Java
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.util.Writables
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text

Next, we take a queue from the HBase shell and create a to_string utility method.

# Make a String of the passed kv
def to_string(column, kv, maxlength = -1)
  if kv.isDelete
    val = "timestamp=#{kv.getTimestamp}, type=#{org.apache.hadoop.hbase.KeyValue::Type::codeToType(kv.getType)}"
  else
    val = "timestamp=#{kv.getTimestamp}, value=#{org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getValue)}"
  end
  val
end

Connecting to HBase is easy. Lets wrap the code in a connect method.

# Connect to HBase and our table
def connect(table_name)
  @conf = HBaseConfiguration.create
  admin = HBaseAdmin.new(@conf)
  @table = HTable.new(@conf, table_name)
end

Fetching a record is fairly easy. Lets make a get method.

def get_key(key)
  my_get = Get.new(key.to_java_bytes)
  result = @table.get(my_get)
  result_ary = []
  for kv in result.list
    family = String.from_java_bytes(kv.get_family)
    qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.get_qualifier)
    column = "#{family}:#{qualifier}"
    value = to_string(column, kv, -1)
    timestamp = kv.get_timestamp
    str_value = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.get_value)
    result_ary &lt;&lt; str_value.to_s
  end
  result_ary
end

I’ve wrapped connect and get in a simple JRuby HBase Client: https://github.com/rjurney/enron-jruby-sinatra-hbase-pig/blob/master/lib/hbase_client.rb.

A simple unit test verifies things work:

$ jruby test/hbase_client.rb
require 'lib/hbase_client'
 
hclient = HBaseClient.new
hclient.connect('enron')
hclient.get('row1')
1345691847565
bob@enron.com

JRuby and Sinatra

Sinatra is a simple Ruby framework for web applications. It is summarized nicely in its README. Installing Sinatra is easy.

jgem install sinatra

Our sinatra app, sinatra.rb, is simple enough:

require 'rubygems'
require 'sinatra'
require 'json'
require 'lib/hbase_client'
require 'lib/data_utils'
 
hclient = HBaseClient.new
hclient.connect('enron')
 
get '/:message_id' do |message_id|
  JSON hclient.get(message_id)
end
 
get '/create_uuid' do
  "This web service shares code with a Pig JRuby UDF to produce this UUID: " + DataUtils.uuid
end

Run our app: jruby ./web.rb and navigate to our UUID web service.

Now pick out a message ID from a scan in the HBase shell and fetch it as JSON:

&gt; scan 'enron'
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:from_address, timestamp=1345904219080, value=stanley.horton@enron.com
 2b1                                                                                                                                 
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:to_address, timestamp=1345904219080, value=maria.pavlou@enron.com            
 2b1                                                                                                                                 
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:total_sent, timestamp=1345904219080, value=1                                 
 2b1                                                                                                                                 
310192 row(s) in 246.2150 seconds
&gt; get 'enron', 'ffffe8b6-2abb-43ba-aad8-6ee8c45262b1'
COLUMN                             CELL                                                                                              
 address.pairs:from_address        timestamp=1345904219080, value=stanley.horton@enron.com                                           
 address.pairs:to_address          timestamp=1345904219080, value=maria.pavlou@enron.com                                             
 address.pairs:total_sent          timestamp=1345904219080, value=1                                                                  
3 row(s) in 0.0300 seconds

Our web service publishes stats on messages between email addresses from HBase.

Conclusion

Starting with emails in Avro format, we have processed our data using Pig and published it to HBase, where a simple JRuby Sinatra app serves it as JSON. We’ve also managed to share UUID code between our JRuby Pig UDF and our Sinatra web application.

About the Author

Russell Jurney is Hortonworks Hadoop Evangelist and the author of the book Agile Data (O’Reilly, Dec 2012), which teaches a flexible toolset and methodology for building effective analytics applications using Apache Hadoop and cloud computing.

Pig as Hadoop Connector, Part One: Pig, MongoDB and Node.js

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems, to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use are available at https://github.com/rjurney/enron-node-mongo and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part two of this series on HBase and JRuby is available here: http://hortonworks.com/blog/pig-as-hadoop-connector-part-two-hbase-jruby-and-sinatra/.

Introduction

In this post we’ll be using Hadoop, Pig, mongo-hadoop, MongoDB and Node.js to turn Avro records into a web service. We do so to illustrate Pig’s ability to act as glue between distributed systems, and to show how easy it is to publish data from Hadoop to the web.

Pig and Avro

Pig’s Avro support is solid in Pig 0.10.0. To use AvroStorage, we need only load piggbank.jar, and the jars for avro and json-simple. A shortcut to AvroStorage is convenient as well. Note that all paths are relative to your Pig install path. We load Avro support into Pig like so:

/* Load Avro jars and define shortcut */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage(); /* Shortcut */

MongoDB’s Java Driver

To connect to MongoDB, we’ll need the MongoDB Java Driver. You can download it here: https://github.com/mongodb/mongo-java-driver/downloads. We’ll load this jar in our Pig script.

Mongo-Hadoop

The mongo-hadoop project provides integration between MongoDB and Hadoop. You can download the latest version at https://github.com/mongodb/mongo-hadoop/downloads. Once you download and unzip the project, you’ll need to build it with sbt.

./sbt package

This will produce the following jars:

$ find .|grep jar
./core/target/mongo-hadoop-core-1.1.0-SNAPSHOT.jar
./pig/target/mongo-hadoop-pig-1.1.0-SNAPSHOT.jar
./target/mongo-hadoop-1.1.0-SNAPSHOT.jar

We load these MongoDB libraries in Pig like so:

/* MongoDB libraries and configuration */
register /me/mongo-hadoop/mongo-2.7.3.jar /* MongoDB Java Driver */
register /me/mongo-hadoop/core/target/mongo-hadoop-core-1.1.0-SNAPSHOT.jar
register /me/mongo-hadoop/pig/target/mongo-hadoop-pig-1.1.0-SNAPSHOT.jar
 
/* Set speculative execution off so we don't have the chance of duplicate records in Mongo */
set mapred.map.tasks.speculative.execution false
set mapred.reduce.tasks.speculative.execution false
define MongoStorage com.mongodb.hadoop.pig.MongoStorage(); /* Shortcut */
set default_parallel 5 /* By default, lets have 5 reducers */

Writing to MongoDB

Loading Avro data and storing records to MongoDB are one-liners in Pig.

avros = load 'enron.avro' using AvroStorage();
store avros into 'mongodb://localhost/enron.emails' using MongoStorage();

From Avro to Mongo in One Line

I’ve automated loading Avros and storing them to MongoDB in the script at https://github.com/rjurney/enron-node-mongo/blob/master/avro_to_mongo.pig, using Pig’s parameter substitution:

avros = load '$avros' using AvroStorage();
store avros into '$mongourl' using MongoStorage();

We can then call our script like this, and it will load our Avros to Mongo:

pig -l /tmp -x local -v -w -param avros=enron.avro \
   -param mongourl='mongodb://localhost/enron.emails' avro_to_mongo.pig

We can verify our data is in MongoDB like so:

$ mongo enron

MongoDB shell version: 2.0.2
connecting to: enron

> show collections
emails
system.indexes

> db.emails.findOne({message_id: "%3C3607504.1075843446517.JavaMail.evans@thyme%3E"})
{
	"_id" : ObjectId("502b4ae703643a6a49c8d180"),
	"message_id" : "",
	"date" : "2001-04-25T12:35:00.000Z",
	"from" : {
		"address" : "jeff.dasovich@enron.com",
		"name" : "Jeff Dasovich"
	},
	"subject" : null,
	"body" : "Breathitt's hanging tough, siding w/Hebert, standing for markets.  Jeff",
	"tos" : [
		{
			"address" : "7409949@skytel.com",
			"name" : null
		}
	],
	"ccs" : [ ],
	"bccs" : [ ]
}

To the Web with Node.js

We’ve come this far, so we may as well publish our data on the web via a simple web service. Lets use Node.js to fetch a record from MongoDB by message ID, and then return it as JSON. To do this, we’ll use Node’s mongodb package. Installation instructions are available in our github project.

Our node application is simple enough. We listen for an http request on port 1337, and use the messageId parameter to query an email by message id.

// Dependencies
var mongodb = require("mongodb"),
    http = require('http'),
    url = require('url');

// Set up Mongo
var Db = mongodb.Db,
    Server = mongodb.Server;

// Connect to the MongoDB 'enron' database and its 'emails' collection
var db = new Db("enron", new Server("127.0.0.1", 27017, {}));
db.open(function(err, n_db) { db = n_db });
var collection = db.collection("emails");

// Setup a simple API server returning JSON
http.createServer(function (req, res) {
  var inUrl = url.parse(req.url, true);
  var messageId = inUrl.query.messageId;

  // Given a message ID, find one record that matches in MongoDB
  collection.findOne({message_id: messageId}, function(err, item) {
    // Return 404 on error
    if(err) {
      console.log("Error:" + err);
      res.writeHead(404);
      res.end();
    }
    // Return 200/json on success
    if(item) {
      res.writeHead(200, {'Content-Type': 'application/json'});
      res.send(JSON.stringify(item));
      res.end();
    }
  });

}).listen(1337, '127.0.0.1');

console.log('Server running at http://127.0.0.1:1337/');

Navigating to http://localhost:1337/?messageId=%3C3607504.1075843446517.JavaMail.evans@thyme%3E returns an enron email as JSON:

We’ll leave the CSS as an exercise for your web developer, or you might try Bootstrap if you don’t have one.

Conclusion

The Hadoop Filesystem serves as a dumping ground for aggregating events. Apache Pig is a scripting interface to Hadoop MapReduce. We can manipulate and mine data on Hadoop, and when we’re ready to publish it to an application we use mongo-hadoop to store our records in MongoDB. From there, creating a web service is a few lines of javascript with Node.js – or your favorite web framework.

MongoDB is a popular NoSQL database for web applications. Using Hadoop and Pig we can aggregate and process logs at scale and publish new data-driven features back to MongoDB – or whatever our favorite database is.

Note: we should ensure that there is sufficient I/O between our Hadoop cluster and our MongoDB cluster, lest we overload Mongo with writes from Hadoop. Be careful out there! I have however verified that writing from an Elastic MapReduce Hadoop cluster to a replicated MongoHQ cluster (on Amazon EC2) works well.

About the Author

Russell Jurney is a data scientist and the author of the book Agile Data (O’Reilly, Dec 2012), which teaches a flexible toolset and methodology for building effective analytics applications using Apache Hadoop and cloud computing.

Go to page:123