The Data Lifecycle, Part One: Avroizing the Enron Emails

Series Introduction

This is part one of a series of blog posts covering new developments in the Hadoop pantheon that enable productivity throughout the lifecycle of big data.  In a series of posts, we’re going to explore the full lifecycle of data in the enterprise: Introducing new data sources to the Hadoop filesystem via ETL, processing this data in data-flows with Pig and Python to expose new and interesting properties, consuming this data as an analyst in HIVE, and discovering and accessing these resources as analysts and application developers using HCatalog and Templeton.

The Berkeley Enron Emails

In this project we will convert a MySQL database of Enron emails into Avro document format for analysis on Hadoop with Pig. Complete code for this example is available on here on github.

Email is a rich source of information for analysis by many means. During the investigation of the Enron scandal of 2001, 517,431 messages from 114 inboxes of key Enron executives were collected. These emails were published and have become a common dataset for academics to analyze document collections and social networks. Andrew Fiore and Jeff Heer at UC Berkeley have cleaned this email set and provided it as a MySQL archive.

We hope that this dataset can become a sort of common set for examples and questions, as anonymizing one’s own data in public forums can make asking questions and getting quality answers tricky and time consuming.

More information about the Enron Emails is available:

Load, Inspect and Query the Emails

A mysql 5.5 compatible dump of the Enron emails is available here.  To load it, you will need to create a new database called ‘enron’, and then apply the database dump.

[bash]$ wget --no-check-certificate  https://s3.amazonaws.com/rjurney_public_web/images/enron.mysql.5.5.20.sql.gz
[bash]$ gzip -d enron.mysql.5.5.20.sql.gz
[bash]$ mysql -u root -e 'create database enron'
[bash]$ mysql -u root < enron.mysql.5.5.20.sql
[bash]$ mysql -u root enron
 
mysql> show tables;
 
+-----------------+
| Tables_in_enron |
+-----------------+
| bodies          |
| categories      |
| catgroups       |
| edgemap         |
| edges           |
| headers         |
| mailgraph       |
| messagecats     |
| messages        |
| people          |
| recipients      |
+-----------------+
11 rows in set (0.00 sec)

As we can see, this data is highly structured.

mysql> select * from messages limit 1;
+-----------+----------------------------------------------+---------------------+-----------+----------+--------------------+
| messageid | smtpid                                       | messagedt           | messagetz | senderid | subject            |
+-----------+----------------------------------------------+---------------------+-----------+----------+--------------------+
|         1 |  | 2001-10-31 05:23:56 | -0800 PST |        1 | Path 30 mitigation |
+-----------+----------------------------------------------+---------------------+-----------+----------+--------------------+
1 row in set (0.01 sec)

Querying a single email to return it as a document we might see in our inbox is complex. And yet this is precisely the format that is most convenient for analysis. This is the limitation of highly structured, relational data. Lets select a single email as we might view it in raw format.

mysql> select m.smtpid as id,
       m.messagedt as date,
       s.email as sender,
       (select group_concat(CONCAT(r.reciptype, ':', p.email) SEPARATOR ', ') from recipients r join people p ON r.personid=p.personid where r.messageid = 511) as to_cc_bcc,
       m.subject as subject,
       SUBSTR(b.body, 1, 200) as body
            from messages m
            join people s
                on m.senderid=s.personid
            join bodies b
                on m.messageid=b.messageid
                    where m.messageid=511;
 
+-----------------------------------------------+---------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+--------------------------------------------------------------------------------------------------------------+
| id                                            | date                | sender               | to_cc_bcc                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        | subject                             | body                                                                                                         |
+-----------------------------------------------+---------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+--------------------------------------------------------------------------------------------------------------+
|  | 2002-02-02 12:56:33 | pete.davis@enron.com | to:pete.davis@enron.com cc:albert.meyers@enron.com cc:bill.williams@enron.com cc:craig.dean@enron.com cc:geir.solberg@enron.com cc:john.anderson@enron.com cc:mark.guzman@enron.com cc:michael.mier@enron.com cc:pete.davis@enron.com cc:ryan.slinger@enron.com bcc:albert.meyers@enron.com bcc:bill.williams@enron.com bcc:craig.dean@enron.com bcc:geir.solberg@enron.com bcc:john.anderson@enron.com bcc:mark.guzman@enron.com bcc:michael.mier@enron.com bcc:pete.davis@enron.com bcc:ryan.slinger@enron.com | Schedule Crawler: HourAhead Failure | 
 
Start Date: 2/2/02; HourAhead hour: 11;  HourAhead schedule download failed. Manual intervention required. |
+-----------------------------------------------+---------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+--------------------------------------------------------------------------------------------------------------+
1 row in set (0.04 sec)

This SQL is painful, to say the least. In contrast, with our data in Avro encoded document format, we’ll be able to more easily access these emails to analyze both their structured and unstructured components with whatever tools we prefer.

Dumping MySQL to Tab-Delimited Text

Now that we’re comfortable with our data, lets query it for export.

  1. Get the emails and their senders:
    mysql> select m.smtpid as message_id, m.messagedt as date, s.email as from_address, s.name as from_name, m.subject as subject, b.body as body from messages m join people s on m.senderid=s.personid join bodies b on m.messageid=b.messageid limit 10;
     
    +----------------------+---------------------+----------------------+----------------------+----------------------+----------------------+
    | message_id           | date                | from_address         | from_name            | subject              | body                 |
    +----------------------+---------------------+----------------------+----------------------+----------------------+----------------------+
    |
  2. Get the recipients of those emails, be it to/cc/bcc:
    select m.smtpid, r.reciptype, p.email, p.name from messages m join recipients r on m.messageid=r.messageid join people p on r.personid=p.personid limit 10;
     
    +-----------------------------------------------+-----------+------------------------------+-------------------------------------+
    | smtpid                                        | reciptype | email                        | name                                |
    +-----------------------------------------------+-----------+------------------------------+-------------------------------------+
    |   | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |  | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |   | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |  | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |  | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |  | to        | mktstathourahead@caiso.com   | Market Status: Hour-Ahead/Real-Time |
    |  | to        | 20participants@caiso.com     | ISO Market Participants             |
    |  | to        | isoclientrelations@caiso.com | ISO Client Relations                |
    |  | to        | bill.williams@enron.com      | Bill Williams III                   |
    +-----------------------------------------------+-----------+------------------------------+-------------------------------------+

We can run that same query to dump the results as TSV, or “Tab Separated Values.” MySQL’s mysql client allows us to dump a query as TSV using the -e and -B options. -e executes a supplied query, and -B gives tab-delimited output. For simplicity’s sake, we’ll dump this data in more than one query.

Run these from the command line to perform the dumps.

[bash]$ mysql -u root -B -e "select m.smtpid as message_id, m.messagedt as date, s.email as from_address, s.name as from_name, m.subject as subject, b.body as body from messages m join people s on m.senderid=s.personid join bodies b on m.messageid=b.messageid;" enron > enron_messages.tsv
[bash]$ head enron_messages.tsv
 
message_id  date    from_address    from_name   subject body
    2001-10-31 05:23:56 marketopshourahead@caiso.com    CAISO Market Operations - Hour Ahead    Path 30 mitigation  System Notification: At 0115 PST, WACM terminated request for coordinated\noperation controllable devices for Path 30 USF mitigation.
   2001-10-31 04:04:37 marketopsrealtimebeep@caiso.com CAISO Market Operations - Realtime/BEEP Path 15 Internal path flows are now below limits.  BEEP has been returned to normal\nmode (unsplit operation) as of 0000 hours.  BEEP will dispatch as one zone.\nSent by Market Operations, inquiries please call the Real Time Desk.\n\n\nThe system conditions described in this communication are dynamic and\nsubject to change.  While the ISO has attempted to reflect the most current,\naccurate information available in preparing this notice, system conditions\nmay change suddenly with little or no notice.
 
[bash]$ mysql -u root -B -e "select m.smtpid, r.reciptype, p.email, p.name from messages m join recipients r on m.messageid=r.messageid join people p on r.personid=p.personid" enron > enron_recipients.tsv
[bash]$ head enron_recipients.tsv
 
smtpid  reciptype   email   name
    to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
   to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
    to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
   to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
   to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
   to  mktstathourahead@caiso.com  Market Status: Hour-Ahead/Real-Time
   to  20participants@caiso.com    ISO Market Participants
   to  isoclientrelations@caiso.com    ISO Client Relations
   to  bill.williams@enron.com Bill Williams III

ETL (Extract-Transform-Load) with Pig

We can now load our sql dump in Pig. I prefer to use several parameters when I use Pig in local mode. The ‘-l /tmp’ option lets me put my pig logs in /tmp so they don’t clutter my working directory. ‘-x local’ tells Pig to run in local mode instead of Hadoop mode.  This is very useful during development, as local mode on a sample of your data will be much faster than hadoop mode on all of your data. ‘-v’ enables verbose output, and ‘-w’ enables warnings. These last two options are useful for debugging problems when working with a new dataset, so I use them most of the time.

[bash]$ pig -l /tmp -x local -v -w
 
grunt> enron_messages = LOAD '/me/enron-avro/enron_messages.tsv' AS (
 
     message_id:chararray,
     sql_date:chararray,
     from_address:chararray,
     from_name:chararray,
     subject:chararray,
     body:chararray
 
);
grunt> describe enron_messages
enron_messages: {message_id: chararray,sql_date: chararray,from_address: chararray,from_name: chararray,subject: chararray,body: chararray}
grunt> illustrate enron_messages
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| enron_messages     | message_id:chararray                          | sql_date:chararray    | from_address:chararray    | from_name:chararray    | subject:chararray                   | body:chararray                                                                                                  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|                    |  | 2002-01-25 12:56:33   | pete.davis@enron.com      | Pete Davis             | Schedule Crawler: HourAhead Failure | \n\nStart Date: 1/25/02; HourAhead hour: 11;  HourAhead schedule download failed. Manual intervention required. |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 
grunt>

Data Processing with Pig

If Perl is the duct tape of the internet, then Pig is the duct tape of Big Data. Pig can easily transform data from one format to another. In this case, we’ll use Pig to transform raw TSV to semi-structured Avro records.

Avro-izing our Data with Pig

Now we’ve got our data in Pig, with a schema, lets documentize and save our data in Avro format to persist this schema for further analysis. To do so, we need to register the jars that Avro needs, as well as Piggybank for the AvroStorage UDF itself. We’ll also define a short form of the AvroStorage command, as the fully qualified name is java-long.

register /me/pig/contrib/piggybank/java/piggybank.jar
 
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/build/ivy/lib/Pig/joda-time-1.6.jar
 
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
define CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
 
set default_parallel 10
set aggregate.warning true
rmf /enron/emails.avro
 
enron_messages = load '/enron/enron_messages.tsv' as (
     message_id:chararray,
     sql_date:chararray,
     from_address:chararray,
     from_name:chararray,
     subject:chararray,
     body:chararray
);
 
enron_recipients = load '/enron/enron_recipients.tsv' as (
    message_id:chararray,
    reciptype:chararray,
    address:chararray,
    name:chararray
);
 
split enron_recipients into tos IF reciptype=='to', ccs IF reciptype=='cc', bccs IF reciptype=='bcc';
 
headers = cogroup tos by message_id, ccs by message_id, bccs by message_id parallel 10;
with_headers = join headers by group, enron_messages by message_id parallel 10;
emails = foreach with_headers generate enron_messages::message_id as message_id,
                                  CustomFormatToISO(enron_messages::sql_date, 'yyyy-MM-dd HH:mm:ss') as date,
                                  TOTUPLE(enron_messages::from_address, enron_messages::from_name) as from:tuple(address:chararray, name:chararray),
                                  enron_messages::subject as subject,
                                  enron_messages::body as body,
                                  headers::tos.(address, name) as tos,
                                  headers::ccs.(address, name) as ccs,
                                  headers::bccs.(address, name) as bccs;
 
store emails into '/enron/emails.avro' using AvroStorage('{"fields": [{"doc": "", "type": ["null", "string"], "name": "message_id"}, {"type": ["string", "null"], "name": "date"}, {"fields": [{"doc": "", "type": ["null", "string"], "name": "name"}, {"doc": "", "type": ["null", "string"], "name": "address"}], "type": "record", "name": "from"}, {"type": ["string", "null"], "name": "subject"}, {"type": ["string", "null"], "name": "body"}, {"doc": "", "type": ["null", {"items": ["null", {"fields": [{"doc": "", "type": ["null", "string"], "name": "name"}, {"doc": "", "type": ["null", "string"], "name": "address"}], "type": "record", "name": "to"}], "type": "array"}], "name": "tos"}, {"doc": "", "type": ["null", {"items": ["null", {"fields": [{"doc": "", "type": ["null", "string"], "name": "name"}, {"doc": "", "type": ["null", "string"], "name": "address"}], "type": "record", "name": "cc"}], "type": "array"}], "name": "ccs"}, {"doc": "", "type": ["null", {"items": ["null", {"fields": [{"doc": "", "type": ["null", "string"], "name": "name"}, {"doc": "", "type": ["null", "string"], "name": "address"}], "type": "record", "name": "bcc"}], "type": "array"}], "name": "bccs"}], "type": "record", "name": "Email"}');
[bash]$ ls /enron/emails.avro
 
part-m-00001.avro       part-m-00004.avro       part-m-00006.avro       part-m-00009.avro
part-m-00002.avro       part-m-00005.avro       part-m-00007.avro
part-m-00000.avro       part-m-00003.avro       part-m-00008.avro

Cat Avro

We can cat these Avro encoded files using a simple python utility I wrote, called cat_avro. A less robust Ruby version of cat_avro is available here.

The script uses the Python Avro library, and is pretty simple:

from avro import schema, datafile, io
 
...
 
for record in df_reader:
  if i > 20:
    break
  i += 1
  if field_id:
    pp.pprint(record[field_id])
  else:
    pp.pprint(record)
 
[bash]$ cat_avro /enron/emails.avro/part-m-00001.avro
 
...
 
{u'bccs': [],
 u'body': u'Where is my new Oglethorpe sheet?',
 u'ccs': [],
 u'date': u'2001-01-10T07:28:00.000Z',
 u'from': {u'address': u'chris.germany@enron.com', u'name': u'Chris Germany'},
 u'message_id': u'',
 u'subject': u'Ogy',
 u'tos': [{u'address': u'jim.homco@enron.com', u'name': u'Jim Homco'}]}

The cat_avro utility prints 20 records and then the schema of the records.

Loading avros with AvroStorage

Note that a schema is included with each data file, so that it lives with the data. This is convenient. From now on we don’t have to cast our data as we load it like we did before.

grunt> enron_emails = LOAD '/enron/emails.avro' USING AvroStorage();
grunt> describe enron_emails
 
emails: {message_id: chararray,orig_date: chararray,datetime: chararray,from_address: chararray,from_name: chararray,subject: chararray,body: chararray,tos: {ARRAY_ELEM: (address: chararray,name: chararray)},ccs: {ARRAY_ELEM: (address: chararray,name: chararray)},bccs: {ARRAY_ELEM: (address: chararray,name: chararray)}}
 
grunt> illustrate enron_emails
 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| emails     | message_id:chararray                         | orig_date:chararray    | datetime:chararray       | from_address:chararray    | from_name:chararray                       | subject:chararray     | body:chararray                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    | tos:bag{ARRAY_ELEM:tuple(address:chararray,name:chararray)}             | ccs:bag{ARRAY_ELEM:tuple(address:chararray,name:chararray)}             | bccs:bag{ARRAY_ELEM:tuple(address:chararray,name:chararray)}             |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|            |  | 2000-09-05 13:05:00    | 2000-09-05T13:05:00.000Z | david@ddh-pd.com          | DDH Product Design, Inc." "David Hayslett | Family Reunion Photos | Rod,\n\n It was nice to talk to you this evening. It did sound like you\n had a cold. There is no way to protect from going from air\n conditioning to the outside heat/humidity then back into\n the air conditioning. Just try to get some rest and we'll think positive\n for some cooler weather for you.\n\n Attached pls. find the photos I spoke of. There were 30 of them and I\nnarrowed them to the family I could name. I'll write more later.\n It would be great if you all came out around the holidays!\n Love,\n\n Dave........... \n - Family_Reunion_2000.zip\n | {(hayslettr@yahoo.com, )}                                               | {(rod.hayslett@enron.com, )}                                            | {(rod.hayslett@enron.com, )}                                             |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Conclusion

We’ve seen how Pig can be used to take SQL data and convert it to well formed document data with Avro.  The Berkeley Enron emails are available in Avro document format here.

Categorized by :
Big Data Hadoop Ecosystem Pig

Comments

Wes Floyd
|
September 17, 2013 at 7:29 am
|

Matt,

Could you explain why you chose the Avro format vs. ORC or other options?

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
Hortonworks Data Platform
The Hortonworks Data Platform is a 100% open source distribution of Apache Hadoop that is truly enterprise grade having been built, tested and hardened with enterprise rigor.