Pig as Duct Tape, Part Three: TF-IDF Topics with Cassandra, Python Streaming and Flask

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use and their environment variables are available at https://github.com/rjurney/enron-python-flask-cassandra-pig and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part one and two can get you started using Pig if you’re not familiar.

Introduction

In this post, we are going to mine topics from the Enron email message bodies via TF-IDF using Pig Streaming with Python’s NLTK library and store them in Cassandra to be served in a REST API via Python and Flask.

Cassandra

Apache Cassandra is a multi-node, linearly scalable column store capable of serving intense I/O loads while remaining available for batch processing with Hadoop. In this example, we’ll be using it in local mode.

An excellent getting started guide for Cassandra is available in the Pygmalion project’s Getting-Started Guide, and more information on building Cassandra is available in the Cassandra Wiki. An excellent presentation on Pig and Cassandra is here.

You can download Cassandra here: http://www.apache.org/dyn/closer.cgi?path=/cassandra/1.1.5/apache-cassandra-1.1.5-src.tar.gz

1
2
3
4
5
wget http://mirror.olnevhost.net/pub/apache/cassandra/1.1.5/apache-cassandra-1.1.5-src.tar.gz
tar -xvzf apache-cassandra-1.1.5-src.tar.gz
cd apache-cassandra-1.1.5-src
ant
sudo bin/cassandra -f

Now you can monitor Cassandra in that shell. Further, you may access the Cassandra CLI via bin/cassandra-cli. The help command is helpful. Lets setup a keyspace and column family for our data.

1
2
3
4
5
6
7
8
$ bin/cassandra-cli
 
create keyspace enron;
use enron;
create column family email with
    comparator = UTF8Type and
    default_validation_class = UTF8Type and
    key_validation_class = UTF8Type;

CassandraStorage

Cassandra includes support for loading and storing data using Pig via the CassandraStorage UDF (User Defined Function).

CassandraStorage expects certain environment variables be set. You can edit and run env.sh to setup your environment.

1
2
3
4
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
export CASSANDRA_HOME=/me/Software/apache-cassandra-1.1.5-src

Once you build Cassandra and setup your environment, you can load the relevant jars for CassandraStorage in Pig.

1
2
3
4
5
register /me/Software/apache-cassandra-1.1.5-src/build/apache-cassandra*.jar
register /me/Software/apache-cassandra-1.1.5-src/lib/*.jar
register /me/Software/apache-cassandra-1.1.5-src/build/lib/jars/*.jar
 
define CassandraStorage org.apache.cassandra.hadoop.pig.CassandraStorage();

The Enron Emails as Avros

As usual, we’ll be working with the Enron emails in Avro format, available here. Note that we limit the data to 100 emails to speed processing up in local mode.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* AvroStorage */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
 
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
 
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = filter emails by message_id is not null;
 
/* Limit the count of emails we process if we are working in local mode */
emails = limit emails 100;
 
id_body = foreach emails generate message_id, body;

Munging Message Bodies with Python Streaming and NLTK

Pig Streaming lets you send Pig relations through arbitrary executables – even simple shell commands. Pig doesn’t care. Pigs eat anything. The syntax is simple enough:

1
2
define test_stream `token_extractor.py` SHIP ('token_extractor.py');
cleaned_words = stream id_body through test_stream as (message_id:chararray, token_strings:chararray);

Much of the power of Python for data processing lies with its nltk and scipy libraries… and of course, with regular expressions. There is an excellent book on Python’s NLTK or the “Natural Language ToolKit.” I’ve created a Python class called TokenExtractor that cleans up the body of the email so that it can be split on whitespace for tokenization by Pig’s TOKENIZE. Nothing special here, just munging and nltk.tokenize. To learn more about data munging and data science, check out Mike Driscoll’s (@medriscoll) post, ‘Three Sexy Skills of Data Geeks.

1
2
3
4
5
6
7
8
9
10
11
def main():
  te = TokenExtractor()
  for line in sys.stdin:
    message_id, body = line.split('\t')
    tokens = te.tokenize(body)
    no_urls = te.remove_urls(tokens)
    lowers = te.lower(no_urls)
    no_punc = te.remove_punctuation(lowers)
    no_newlines = te.remove_endlines(no_punc)
    no_shorts = te.short_filter(no_newlines)
    print message_id + "\t" + " ".join(no_shorts)

Next I use Pig’s TOKENIZE to generate the actual tokens. This is a little unusual – tokenize once in our Python, tokenize again, but it works – so ship it! Python cleans the strings and Pig creates tuples for us, which is convenient. This gives us pairs of message_ids and tokens.

1
token_records = foreach cleaned_words generate message_id, FLATTEN(TOKENIZE(token_strings)) as tokens;

Note that we have trimmed all fields besides message_id and tokens. We need to limit our data size to ensure efficiency, as we have just exploded our data to be larger than it originally was – every single word in every single message body now has an RFC-2822 message ID attached to it.

Term Frequency-Inverse Document Frequency (TF-IDF) in Apache Pig

Now that we’ve got tokens (words) and their message ids, we’re going to calculate relevant topics per email with a simple algorithm called ‘Term Frequency/Inverse Document Frequency.’ Which is to say we’ll compute the top N sorted TF-IDF values for each email – to see what they’re about!

Wikipedia is very informative on Tf*idf. An excellent tutorial on Tf-idf in Pig is available here, and we borrow from it. We should all thank Jacob Perkins, @thedatachef for all the awesome Pig code on his blog and in github.

We will be using the words ‘token’, ‘term’, and ‘word’ interchangeably below.

  1. Term Counts per Document
  2. First we group by document and token and take a count of the number of times a word is used in each document.

    1
    2
    3
    4
    
    /* Calculate the term count per document */
    doc_word_totals = foreach (group token_records by (message_id, tokens)) generate 
                        flatten(group) as (message_id, token), 
                        COUNT_STAR(token_records) as doc_total;
  3. Document Size: Words per Document
  4. Next we calculate the size of a document in terms of words – to normalize the term counts above. Naturally, if a document is long a single occurrence of a word is less significant than it would be in a shorter document. We group by message ID and take a SUM of all word counts for that document. We use FLATTEN to project that document total with the per-token counts we just calculated so that we can normalize these counts in the next step.

    1
    2
    3
    4
    5
    
    /* Calculate the document size */
    pre_term_counts = foreach (group doc_word_totals by message_id) generate
                        group AS message_id,
                        FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total), 
                        SUM(doc_word_totals.doc_total) as doc_size;
  5. Term Frequency per Document
  6. Now we divide term count by document length to arrive at term frequency per document – our numerator.

    1
    2
    3
    4
    
    /* Calculate the TF */
    term_freqs = foreach pre_term_counts generate message_id as message_id,
                   token as token,
                   ((double)doc_total / (double)doc_size) AS term_freq;
  7. Count of Documents with each Term
  8. Now we start on our denominator. We group our term frequencies by term, and count the documents containing each term. We project this document count with the term frequencies themselves for easy calculation in our next step.

    1
    2
    3
    4
    
    /* Get count of documents using each token, for idf */
    token_usages = foreach (group term_freqs by token) generate
                     FLATTEN(term_freqs) as (message_id, token, term_freq),
                     COUNT_STAR(term_freqs) as num_docs_with_token;
  9. Total Document Count
  10. Our next step requires a document count, which we achieve with a GROUP ALL and a count.

    1
    2
    3
    
    /* Get document count */
    just_ids = foreach emails generate message_id;
    ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
  11. Calculate TF-IDF
  12. Finally, we calculate TF-IDF using the relations we’ve prepared. Note the use of a Pig scalar to calculate our IDF term – which is the LOG of the total document count divided by the number of documents with that token.

    Also note that we took a shortcut and cast our tf_idf score as a chararray to simplify our Cassandra schema. Finally, we used the field names score and value, which CassandraStorage expects.

    1
    2
    3
    4
    5
    6
    7
    8
    
    /* Note the use of Pig Scalars to calculate idf */
    tfidf_all = foreach token_usages {
      idf = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
      tf_idf = (double)term_freq * idf;
      generate message_id as message_id,
        token as score,
        (chararray)tf_idf as value:chararray;
    };
  13. Prepare and Store our Data in Cassandra
  14. Finally, we store the top ten terms by tf-idf score per document in Cassandra, keyed by the message ID.

    1
    2
    3
    4
    5
    6
    7
    8
    
    /* Get the top 10 Tf*Idf scores per message */
    per_message_cassandra = foreach (group tfidf_all by message_id) {
      sorted = order tfidf_all by value desc;
      top_10_topics = limit sorted 10;
      generate group, top_10_topics.(score, value);
    }
     
    store per_message_cassandra into 'cassandra://enron/email_topics' USING CassandraStorage();
  15. Sample Message IDs to Query Cassandra
  16. We need a key (message id) to fetch our values in Cassandra. So we grab ten records and dump them to console.

    1
    2
    3
    
    /* This will give you some message_id keys to fetch in Cassandra, and some message bodies to compare topics to. */
    samples = limit just_ids 10;
    dump samples;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
    (<28.1075842613917.JavaMail.evans@thyme>)
    (<85.1075854368299.JavaMail.evans@thyme>)
    (<167.1075851646300.JavaMail.evans@thyme>)
    (<185.1075857304356.JavaMail.evans@thyme>)
    (<431.1075859137859.JavaMail.evans@thyme>)
    (<589.1075842593084.JavaMail.evans@thyme>)
    (<614.1075847580822.JavaMail.evans@thyme>)
    (<735.1075840186524.JavaMail.evans@thyme>)
    (<758.1075842602845.JavaMail.evans@thyme>)
    (<765.1075860359973.JavaMail.evans@thyme>)

Verify our Data with Cassandra CLI

We can verify our data is present in Cassandra with cassandra-cli.

1
2
cd $CASSANDRA_HOME
bin/cassandra-cli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[default@unknown] use enron;
Authenticated to keyspace: enron
[default@enron] get email_topics['<431.1075859137859.JavaMail.evans@thyme>'];
=> (column=bankruptcy, value=0.02577520626485872, timestamp=1348084758725000)
=> (column=end, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=left, value=0.024021461379613435, timestamp=1348084758725000)
=> (column=palmer, value=0.017183470843239148, timestamp=1348084758725000)
=> (column=party, value=0.05155041252971744, timestamp=1348084758725000)
=> (column=phillip, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=pl, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=plove, value=0.02577520626485872, timestamp=1348084758725000)
=> (column=tonight, value=0.018016096034710077, timestamp=1348084758725000)
=> (column=your, value=0.017969743348148298, timestamp=1348084758725000)
Returned 10 results.
Elapsed time: 4 msec(s).

Note that the more documents we use (remember our LIMIT statement up top), the better topics we will get.

Pygmalion

Note that the Pygmalion project by Jeremy Hanna @jeromatron helps convert back and forth between Pig relations and Cassandra columns. Jeremy is an excellent open source citizen and was helpful with this post. Shout outs to Jeremy!

Accessing Cassandra from Python with pycassa

Python connects to Cassandra via the pycassa project. A good tutorial is available here. You can install pycassa with pip or easy_install. On Mac OS X Lion, the pip install failed so I used easy_install:

1
easy_install pycassa

Using Cassandra with pycassa is easy:

1
2
3
4
5
6
7
import pycassa
 
pool = pycassa.ConnectionPool('enron')
cf = pycassa.ColumnFamily(pool, 'email_topics')
 
cf.get('<431.1075859137859.JavaMail.evans@thyme>') # Replace me
# OrderedDict([(u'bankruptcy', u'0.02577520626485872'), (u'end', u'0.018016096034710077'), (u'left', u'0.024021461379613435'), (u'palmer', u'0.017183470843239148'), (u'party', u'0.05155041252971744'), (u'phillip', u'0.018016096034710077'), (u'pl', u'0.018016096034710077'), (u'plove', u'0.02577520626485872'), (u'tonight', u'0.018016096034710077'), (u'your', u'0.017969743348148298')])

Serving Topics as JSON via a REST API with Flask

The Flask project is a lightweight web application framework in Python. Getting started with Flask is easy.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask
import pycassa
import json
 
pool = pycassa.ConnectionPool('enron')
cf = pycassa.ColumnFamily(pool, 'email_topics')
 
app = Flask(__name__)
 
@app.route("/message/topics/<message_id>")
def topics(message_id):
  return json.dumps(cf.get(message_id))
 
if __name__ == "__main__":
  app.run(debug=True)

Finally, navigate your browser to your new web service for Enron email topics: http://localhost:5000/message/topics/%3C431.1075859137859.JavaMail.evans@thyme%3E. You may have to use a message_id printed out in console on your system, as we may have limited different sub-sets of the set in our respective Pig scripts.

Conclusion

We’ve used Pig to extract structured topics and scores from unstructured text via a simple algorithm that we’ve implemented in a distributed manner on top of Hadoop… without knowing or thinking about MapReduce. We used arbitrary tools to cleanup text via Pig streaming. We’ve used Pig to store that data into yet another distributed system, Cassandra, and we’ve served it as simple JSON it via a web service using Python and Flask.

Hadoop excels at extracting structured data from unstructured data, and Pig is fine duct tape indeed!

Categorized by :
Pig

Comments

|
March 21, 2013 at 5:34 am
|

I wanted to inform you for this great help!! I definitely enjoyed every little bit of it.

Pridwayfredia
|
October 29, 2012 at 3:26 pm
|

Condivido compiutamente come le idee espresse ancora. Continuate similmente.

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.
HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.