Category Archives: Pig


Pig Eye for the SQL Guy

Cat Miller is an engineer at Mortar Data, a Hadoop-as-a-service provider, and creator of mortar, an open source framework for data processing.

Introduction

For anyone who came of programming age before cloud computing burst its way into the technology scene, data analysis has long been synonymous with SQL. A slightly awkward, declarative language whose production can more resemble logic puzzle solving than coding, SQL and the relational databases it builds on have been the pervasive standard for how to deal with data.

As the world has changed, so too has our data; an ever-increasing amount of data is now stored without a rigorous schema, or must be joined to outside data sets to be useful. Compounding this problem, often the amounts of data are so large that working with them on a traditional SQL database is so non-performant as to be impractical.

Enter Pig, a SQL-like language that gracefully tolerates inconsistent schemas, and that runs on Hadoop. (Hadoop is a massively parallel platform for processing the largest of data sets in reasonable amounts of time. Hadoop powers Facebook, Yahoo, Twitter, and LinkedIn, to name a few in a growing list.)

This then is a brief guide for the SQL developer diving into the waters of Pig Latin for the first time. Pig is similar enough to SQL to be familiar, but divergent enough to be disorienting to newcomers. The goal of this guide is to ease the friction in adding Pig to an existing SQL skillset.

What’s Similar?

The basic concepts in SQL map pretty well onto Pig. There are analogues for the major SQL keywords, and as a result you can write a query in your head as SQL and then translate it into Pig Latin without undue mental gymnastics.

WHERE → FILTER

The syntax is different, but conceptually this is still putting your data into a funnel to create a smaller dataset.

HAVING → FILTER

Because a FILTER is done in a separate step from a GROUP or an aggregation, the distinction between HAVING and WHERE doesn’t exist in Pig.

ORDER BY → ORDER

This keyword behaves pretty much the same in Pig as in SQL.

JOIN

In Pig, joins can have their execution specified, and they look a little different, but in essence these are the same joins you know from SQL, and you can think about them in the same way. There are INNER and OUTER joins, RIGHT and LEFT specifications, and even CROSS for those rare moments that you actually want a Cartesian product.

Because Pig is most appropriately used for data pipelines, there are often fewer distinct relations or tables than you would expect to see in a traditional normalized relational database.

Control over Execution

SQL performance tuning generally involves some fiddling with indexes, punctuated by the occasional yelling at an explain plan that has inexplicably decided to join the two largest tables first. It can mean getting a different plan the second time you run a query, or having the plan suddenly change after several weeks of use because the statistics have evolved, throwing your query’s performance into the proverbial toilet.

Various SQL implementations offer hints to combat this problem—you can use a hint to tell your SQL optimizer that it should use an index, or to force a given table to be first in the join order. Unfortunately, because hints are dependent on the particular SQL implementation, what you actually have at your disposal varies by platform.

Pig offers a few different ways to control the execution plan. The first is just the explicit ordering of operations. You can write your FILTER before your JOIN (the reverse of SQL’s order) and be clever about eliminating unused fields along the way, and have confidence that the executed order will not be worse.

Secondly, the philosophy of Pig is to allow users to choose implementations where multiple ones are possible. As a result, there are three specialized joins that a can be used when the features of the data are known, and are less appropriate for a regular join. For regular joins, the order of the arguments dictates execution—the larger data set should appear last in this type of join.

As with SQL, in Pig you can pretty much ignore the performance tweaks until you can’t. Because of the explicit control of ordering, it can be useful to have a general sense of the “good” order to do things in, though Pig’s optimizer will also try to push up FILTERs and LIMITs, taking some of the pressure off.

What’s Different?

A Row By Any Other Name

The SQL paradigm is very straightforward—there are tables, and tables contain rows. Every select statement yields a set of rows, and each field in a row is a basic data type. Conceptually, the result of any SQL select statement can be imported into Excel without loss of information.

Pig introduces a mature nesting notion in tuples and data bags that changes the game significantly. Pig consists of data sets called relations (sometimes called aliases for the names they are given), and those contain records that are data tuples, which can in turn recursively contain data bags, data tuples, or data items. There is distinct lack of flatness in Pig, and the best way to see it is to explore how GROUP works.

Not Your Grandmother’s GROUP

In the handling of GROUP, SQL and Pig diverge significantly. SQL’s GROUP doesn’t exist outside of the aggregation performed on it; you would never SELECT * GROUP by field1–it just doesn’t make sense. Because everything in SQL is a row, the grouping created isn’t persistent—only the data produced aggregating over it remains.

Pig’s GROUP is an entirely different beast, albeit used for the same purpose. It is a persistent relation that can be used again and again, independent of what aggregations you might choose to perform on it later.

student_grades = GROUP grades by student;

This student_grades relation has two fields: one called group and populated with the value of student, the other called grades populated with a data bag containing tuples for all the entries with the same value of student.

group	grades
alyssa 	{< hacking101, 95 >, < english, 60 >}
ben	{< math, 90 >}

Now to do an aggregation, perform it on the student_grades alias.

average_grades = FOREACH student_grades GENERATE group, AVG(grades.value);
Procedural Paradise

This is the first thing any Google search on Pig will tell you, and it is the most glaringly obvious change from SQL. After having taught your brain for years how to turn an idea inside-out and mash all of its pieces into one query, Pig makes query writing feel like writing Java or C++. In addition to to obvious potential cognitive benefits, this has some technical ones as well.

Subquery Reuse

Ever write a query with a subselect, and then realize you actually needed to use that subtable twice in the query? Did you feel absolutely awful as you cut and pasted that subtable? Did you wonder whether your SQL plan would successfully manage to not calculate it twice? (Note: the WITH clause mitigates this pain in a lot of cases, but isn’t available in all flavors of SQL.)

Because in Pig Latin every step has a declared alias, reusing “subquery” tables is natural and intuitive, and generally does not involve building them twice.

Getting multiple queries out of one pipeline

In SQL you can find yourself in a place where you want to use the data, do some manipulation on it, and then take it in a few different directions. To do this in one query requires profligate use of JOIN, and enough parens to intimidate a LISP hacker.

In a Pig pipeline, any and all aliases produced along the way can be stored, and all it takes is adding a new STORE statement to the script.

User-Defined Functions

SQL has had decades for people to figure out what analytic functions they need for arbitrary data analysis, and so when you find yourself suddenly interested in extracting the day of the week from a date, that function is ready and waiting.

Pig’s list of built-in functions is growing, but is still dwarfed by what Oracle or MYSQL provides. What turns this into a tolerable constraint is that Pig allows the user to define aggregate or analytic functions in other languages (Java, Python, and others) and then apply them in Pig quickly and without fuss.

REGISTER udf.jar;

new_data = FOREACH my_data GENERATE udf.ImportantFunction(field1);

The Well-Disguised SQLer

In general, if you can think about it in SQL, you can do it in Pig. Be aware of the nested data structures, have a cheat sheet for syntax, and relish the ability to write queries the way your brain thinks them, and not the way SQL demands.

As a final thought, let’s resurrect our old friend the emp table, and take a look at some SQL to Pig Latin examples.

Average Salary by Location
SQL
SELECT loc, AVG(sal) FROM emp JOIN dept USING(deptno) WHERE sal > 3000 GROUP BY loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY loc;

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);
Ordered Average Salary by Location

Suppose now that the following is true.

● The ‘loc’ field is a string/varchar field, and we have two pieces of software that automatically populate it. One stores values as lowercase, one as uppercase. (If this seems like a contrived example to you, you have chosen your employers and software vendors well.)

The new parts of the queries appear in bold.

SQL
SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno) WHERE sal > 3000) std_table GROUP BY standard_loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);

This kind of change is friendlier to Pig because of the limitations in SQL’s GROUP BY clause; the trade-off of verboseness for clarity increases in value the more complex your query gets.

Above Average Salary for Location

Now suppose instead of arbitrarily selecting 3,000 as a threshold, which is going to overselect people living in large expensive cities, we want to select those employees who make more than twice the average for their location.

SQL

There are several ways to accomplish this, of course, but for illustration purposes the most vanilla version is shown here.

SELECT empno FROM 
(SELECT empno, UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table1 JOIN
(SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table2 GROUP BY standard_loc) grp_table2
USING(standard_loc) WHERE sal > (2 * avg_salary);
Pig Latin
emp_join_dept = JOIN emp BY deptno, dept by deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

loc_avg_salary = FOREACH grouped_by_loc GENERATE AVG(emp_join_dept.sal) as avg_salary, FLATTEN(emp_join_dept);

highly_paid = FILTER loc_avg_salary BY sal > (2 * avg_salary);

Here the dreaded subquery reuse rears its unattractive head in SQL and leads to a bit of a frankenquery. In contrast, the Pig script stays the same length, because we can effectively just shift the FILTER from the beginning to the end of our data flow. FLATTEN is a new entry to our function arena, for which there is no SQL analogue. What FLATTEN does is unnest tuples and data bags; in this example it’s taking the data bag ‘emp_join_dept’ created by the GROUP function, and removing the nesting so that the fields within it will be at the same level as avg_salary.

Run It Yourself for the Swine-Curious

If you want to to try out some Pig examples hands-on, you can get a free Mortar account here. We’ve generated a one million row emp table data set, so to run these examples on your own all you need to do is:

    1. Sign up at app.mortardata.com for a free account.
    2. Go to Web Projects
    3. Select My Web Projects -> New Blank project
    4. To load the two data files, you need LOAD statements, and to store the resulting data you need a STORE statement, so in the end your pig script should look like this:
dept = LOAD 's3n://mortar-example-data/employee/dept.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (deptno:int, dname:chararray, loc:chararray);

emp = LOAD 's3://mortar-example-data/employee/emp.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (empno:int, ename:chararray, job:chararray, mgr:int, hiredate:chararray, sal:float, deptno:int);

-- Pig example code here

STORE avg_salary INTO 's3n://mortar-example-output-data/$MORTAR_EMAIL_S3_ESCAPED/emp_example’ USING PigStorage('\t');
  1. Click Run and select a 2-node cluster. Even with one million rows, it runs in under three minutes. Sit back and watch the power of modern computing tackle the problems of the 1980s.

You can also try out Pig in the Hortonworks Sandbox.

Apache Pig 0.11 Released!

Apache Pig version 0.11 was released last week. An Apache Pig blog post summarized the release. New features include:

  • A DateTime datatype, documentation here.
  • A RANK function, documentation here.
  • A CUBE operator, documentation here.
  • Groovy UDFs, documentation here.

And many improvements. Oink it up for Pig 0.11! Hortonworks’ Daniel Dai gave a talk on Pig 0.11 at Strata NY, check it out:

Pig, ToJson, and Redis to publish data with Flask


Pig can easily stuff Redis full of data. To do so, we’ll need to convert our data to JSON. We’ve previously talked about pig-to-json in JSONize anything in Pig with ToJson. Once we convert our data to json, we can use the pig-redis project to load redis.

Build the pig to json project:

git clone git@github.com:rjurney/pig-to-json.git
ant

Then run our Pig code:

/* Load Avro jars and define shortcut */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

register /me/Software/pig-to-json/dist/lib/pig-to-json.jar
register /me/Software/pig-redis/dist/pig-redis.jar

-- Enron emails are available at https://s3.amazonaws.com/rjurney_public_web/hadoop/enron.avro
emails = load '/me/Data/enron.avro' using AvroStorage();

json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;

store json_test into 'dummy-name' using com.hackdiary.pig.RedisStorer('kv', 'localhost');

Now run our Flask web server:

python server.py

redis-pig

Code for this post is available here: https://github.com/rjurney/enron-pig-tojson-redis-node.

Imperative and Declarative Hadoop: TPC-H in Pig and Hive

According to the Transaction Processing Council, TPC-H is:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

TPC-H was implemented for Hive in HIVE-600 and for Pig in PIG-2397 by Hortonworks intern Jie Li. In going over this work, I was struck by how it outlined differences between Pig and SQL.

There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.

Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:

DROP TABLE lineitem;
DROP TABLE q1_pricing_summary_report;

-- create tables and load data
Create external table lineitem (
    L_ORDERKEY INT, L_PARTKEY INT, 
    L_SUPPKEY INT, 
    L_LINENUMBER INT, 
    L_QUANTITY DOUBLE, 
    L_EXTENDEDPRICE DOUBLE, 
    L_DISCOUNT DOUBLE, 
    L_TAX DOUBLE, 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    L_SHIPDATE STRING, 
    L_COMMITDATE STRING, 
    L_RECEIPTDATE STRING, 
    L_SHIPINSTRUCT STRING, 
    L_SHIPMODE STRING, 
    L_COMMENT STRING) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem';

-- create the target table
CREATE TABLE q1_pricing_summary_report ( 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    SUM_QTY DOUBLE, 
    SUM_BASE_PRICE DOUBLE, 
    SUM_DISC_PRICE DOUBLE, 
    SUM_CHARGE DOUBLE, 
    AVE_QTY DOUBLE, 
    AVE_PRICE DOUBLE, 
    AVE_DISC DOUBLE, 
    COUNT_ORDER INT);

set mapred.min.split.size=536870912;

-- the query
INSERT OVERWRITE TABLE q1_pricing_summary_report 
SELECT 
    L_RETURNFLAG, 
    L_LINESTATUS, 
    SUM(L_QUANTITY), 
    SUM(L_EXTENDEDPRICE), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), 
    AVG(L_QUANTITY),
    AVG(L_EXTENDEDPRICE), 
    AVG(L_DISCOUNT), 
    COUNT(1) 
FROM 
  lineitem 
WHERE 
  L_SHIPDATE<='1998-09-02' 
GROUP BY L_RETURNFLAG, L_LINESTATUS 
ORDER BY L_RETURNFLAG, L_LINESTATUS;

One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:

 SET default_parallel $reducers;

LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (
    orderkey:long, 
    partkey:long, 
    suppkey:long, 
    linenumber:long, 
    quantity:double, 
    extendedprice:double, 
    discount:double, 
    tax:double, 
    returnflag, 
    linestatus, 
    shipdate, 
    commitdate, 
    receiptdate, 
    shipinstruct, 
    shipmode, 
    comment);

SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02';

SubLine = FOREACH SubLineItems GENERATE 
    returnflag, 
    linestatus, 
    quantity, 
    extendedprice, 
    extendedprice * (1-discount) AS disc_price, 
    extendedprice * (1-discount) * (1+tax) AS charge, 
    discount;

StatusGroup = GROUP SubLine BY (returnflag, linestatus);

PriceSummary = FOREACH StatusGroup GENERATE 
    group.returnflag AS returnflag, 
    group.linestatus AS linestatus, 
    SUM(SubLine.quantity) AS sum_qty, 
    SUM(SubLine.extendedprice) AS sum_base_price, 
    SUM(SubLine.disc_price) as sum_disc_price, 
    SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, 
    AVG(SubLine.extendedprice) as avg_price, 
    AVG(SubLine.discount) as avg_disc, 
    COUNT(SubLine) as count_order;

SortedSummary = ORDER PriceSummary BY returnflag, linestatus;

STORE SortedSummary INTO '$output/Q1out';

Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:

DROP TABLE customer;
DROP TABLE orders;
DROP TABLE q22_customer_tmp;
DROP TABLE q22_customer_tmp1;
DROP TABLE q22_orders_tmp;
DROP TABLE q22_global_sales_opportunity;

-- create tables and load data
create external table customer (
    C_CUSTKEY INT, 
    C_NAME STRING, 
    C_ADDRESS STRING, 
    C_NATIONKEY INT, 
    C_PHONE STRING, 
    C_ACCTBAL DOUBLE, 
    C_MKTSEGMENT STRING, 
    C_COMMENT STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer';

create external table orders (
    O_ORDERKEY INT, 
    O_CUSTKEY INT, 
    O_ORDERSTATUS STRING, 
    O_TOTALPRICE DOUBLE, 
    O_ORDERDATE STRING, 
    O_ORDERPRIORITY STRING, 
    O_CLERK STRING, 
    O_SHIPPRIORITY INT, 
    O_COMMENT STRING) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders';

-- create target tables
create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string);
create table q22_customer_tmp1(avg_acctbal double);
create table q22_orders_tmp(o_custkey int);
create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double);

-- the query
insert overwrite table q22_customer_tmp
select 
  c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode
from 
  customer
where 
  substr(c_phone, 1, 2) = '13' or
  substr(c_phone, 1, 2) = '31' or
  substr(c_phone, 1, 2) = '23' or
  substr(c_phone, 1, 2) = '29' or
  substr(c_phone, 1, 2) = '30' or
  substr(c_phone, 1, 2) = '18' or
  substr(c_phone, 1, 2) = '17';

insert overwrite table q22_customer_tmp1
select
  avg(c_acctbal)
from
  q22_customer_tmp
where
  c_acctbal > 0.00;

insert overwrite table q22_orders_tmp
select 
  o_custkey 
from 
  orders
group by 
  o_custkey;

insert overwrite table q22_global_sales_opportunity
select
  cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal
from
(
  select cntrycode, c_acctbal, avg_acctbal from
  q22_customer_tmp1 ct1 join
  (
    select cntrycode, c_acctbal from
      q22_orders_tmp ot 
      right outer join q22_customer_tmp ct 
      on
        ct.c_custkey = ot.o_custkey
    where
      o_custkey is null
  ) ct2
) a
where
  c_acctbal > avg_acctbal
group by cntrycode
order by cntrycode;

The Pig is comparably simple:

 SET default_parallel $reducers;

customer = load '$input/customer' USING PigStorage('|') as (
    c_custkey:long,
    c_name:chararray, 
    c_address:chararray, 
    c_nationkey:int, 
    c_phone:chararray, 
    c_acctbal:double, 
    c_mktsegment:chararray, 
    c_comment:chararray);
orders = load '$input/orders' USING PigStorage('|') as (
    o_orderkey:long, 
    o_custkey:long, 
    o_orderstatus:chararray, 
    o_totalprice:double, 
    o_orderdate:chararray, 
    o_orderpriority:chararray, 
    o_clerk:chararray, 
    o_shippriority:long, 
    o_comment:chararray);

customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_filter_group = group customer_filter all;
avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal;

customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey;

customer_trd_filter = filter customer_orders_left by o_custkey is null;
customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal;

customer_result_group = group customer_rows by cntrycode;
customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal;
customer_result_inorder = order customer_result by group;

store customer_result_inorder into '$output/Q22out' USING PigStorage('|');

Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.

DataFu: The WD-40 of Big Data

If Pig is the “duct tape for big data“, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
  C1 = order A by timestamp;
  generate user, Sessonize(C1);
}
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines for every italicized function.

You can download DataFu at http://data.linkedin.com/opensource/datafu.

You can grab sample data and code you can run on your own for this sessionization example below.

Read More

Hortonworks Sandbox — the Fastest On Ramp to Apache Hadoop

Go from Zero to Big Data in 15 Minutes!

Today Hortonworks announced the availability of the Hortonworks Sandbox, an easy-to-use, flexible and comprehensive learning environment that will provide you with fastest on-ramp to learning and exploring enterprise Apache Hadoop.

The Hortonworks Sandbox is:

  • A free download
  • A complete, self contained virtual machine with Apache Hadoop pre-configured
  • A personal, portable and standalone Hadoop environment
  • A set of hands-on, step-by-step tutorials that allow you to learn and explore Hadoop on your own

The Hortonworks Sandbox is designed to help close the gap between people wanting to learn and evaluate Hadoop, and the complexities of spinning up an evaluation cluster of Hadoop. The Hortonworks Sandbox provides a powerful combination of hands-on, step-by-step tutorials paired with an easy to use Web interface designed to lower the learning curve for people who just want to explore and evaluate Hadoop, as quickly as possible.

One of our key focus areas is enabling Hadoop as an enterprise-viable platform that is easy to use and consume by our customers and the broader ecosystem. Over the past year or so, we have seen the complex and disjointed experience people face trying to learn Hadoop, and with the Sandbox, it allows you to have the fastest onramp to Apache Hadoop. We want the Sandbox to deliver an integrated, easy-to-use, easily updateable learning environment. Ongoing updates to the tutorials are planned, delivering new, interesting hands-on exercises, exploring different features and use cases.

These tutorials are built based on the experience gained training thousands of people in our Hortonworks University Training classes. As we continue to build out the Sandbox, we will provide additional levels of sophistication – think of it as the Hadoop 101, 201 and 301 levels of learning. And, the process of updating the tutorials is easy through the click of the “Update” button, initiating a lightweight download of just the tutorial content.

The Sandbox is a single node implementation of the Hortonworks Data Platform (HDP) 1.2 that behaves just like a normal Hadoop environment, which allows you to add your own datasets in an isolated protected environment to evaluate the use of Hadoop in your own data architectures.

Use the Sandbox to:

  • Explore Hadoop on your own
  • Plan out the integration points of your proof of concept project
  • Prepare for a more complex pilot deployment

When you are ready, you can download and deploy the Hortonworks Data Platform with the confidence that you have thought through exactly how and where Hadoop can help.

What can you expect from us in the coming months with the Hortonworks Sandbox?

  1. Join us for a special launch webinar on February 5, “Go from Zero to Big Data in 15 Minutes“. I will be hosting this webinar with one of our awesome Solution Engineers who will give you a sneak peek at some cool use cases for the Sandbox.
  2. New tutorials released on roughly a monthly basis.
  3. Demos and exercises of the integration with the tools and applications from our eco-system partners like Teradata, Alteryx, Datameer, and Microsoft. How cool would it be to run Excel on top of a personal Hadoop environment?? Well, that’s coming, so check back often.

I’m excited that you will be able to go from Zero to Big Data in 15 Minutes in a simple, easy-to-use fashion. And, I’m eager to hear your feedback – please let me know what you think of the Sandbox, what kinds of tutorials you would like to see and I would love to hear about your creative uses of the Sandbox. Leave your comments on this blog, Tweet out using #hwsandbox, comment in the Sandbox Forum, or email. The Hortonworks Sandbox is free and available for download here.

Apache Pig 0.10.1 Released

We are pleased to announce that Apache Pig 0.10.1 was recently released. This is primarily a maintenance release focused on stability and bug fixes. In fact, Pig 0.10.1 includes 42 new JIRA fixes since the Pig 0.10.0 release.

Some of the notable changes include:

  • Source code-only distribution

In the download section for Pig 10.0.1, you will now find a source-only tarball (pig-0.10.1-src.tar.gz) alongside the traditional full tarball, rpm and deb distributions.

  • Better support for Apache Hadoop 0.23.x/2.x

Starting with Pig 0.10.1, the Pig team will now publish Maven artifacts for Hadoop 0.23.x/2.x (PIG-2907). Note that if you are using Hadoop 0.23.x/2.x, you will need to get different Pig Maven artifacts than from Hadoop 0.20.x/1.x. Here is the information to retrieve the Pig Maven artifacts for Hadoop 0.23.x/2.x:

&lt;dependency&gt;
 
 &lt;groupId&gt;org.apache.pig&lt;/groupId&gt;
 
 &lt;artifactId&gt;pig&lt;/artifactId&gt;
 
 &lt;version&gt;0.10.1&lt;/version&gt;
 
 &lt;classifier&gt;h2&lt;/classifier&gt;
 
&lt;/dependency&gt;

In addition, the Pig team fixed a number of bugs specific to Hadoop 0.23.x/2.x (including PIG-3035, PIG-2783, PIG-2761, PIG-2912, and PIG-2791).

  • Better support for Oracle JDK 7

All unit tests for Pig 0.10.1 now pass with Oracle JDK7 (PIG-2908).

  •  End-to-End (e2e) tests and unit tests fixes

We continue to improve Pig e2e testing. With the latest enhancements, we are able to significantly reduce runtime for Pig e2e tests (PIG-2711). We are trying hard to make e2e tests pass on all platforms (PIG-2859, PIG-2783, PIG-2745).

We have also included some fixes for unit tests (PIG-2908, PIG-2650, PIG-3099, PIG-2960) to make sure unit tests pass on all currently supported platforms.

  • Other fixes

There are a number of other important bug fixes in the core Pig code, UDF and documentation. Details can be found in this document.

Special thanks for the Apache Pig community for doing all of this great work to make these improvements happen!

~ Daniel Dai

Big Data Security Part Three: PacketPig Finding Zero Day Attacks

Introduction

This is part three of a Big Data Security blog series. You can read the previous two posts here: Part One / Part Two.

When Russell Jurney and I first teamed up to write these posts we wanted to do something that no one had done before to demonstrate the power of Big Data, the simplicity of Pig and the kind of Big Data Security Analytics we perform at Packetloop. Packetpig was modified to support Amazon’s Elastic Map Reduce (EMR) so that we could process a 600GB set of full packet captures. All that we needed was a canonical Zero Day attack to analyse. We were in luck!

In August 2012 a vulnerability in Oracle JRE 1.7 created huge publicity when it was disclosed that a number of Zero Day attacks had been report to Oracle in April but had still not been addressed in late August 2012. To make matters worse Oracle’s scheduled patch for JRE was months away (October 16). This position subsequently changed and a number of out-of-band patches for JRE were released for what became known as CVE-2012-4681 on the 30th of August.

The vulnerability exposed around 1 Billion systems to exploitation and the exploit was 100% effective on Windows, Mac OSX and Linux. A number of security researchers were already seeing the exploit in the wild as it was incorporated into exploit packs for the delivery of malware.

What is a Zero Day?

Put simply it’s any vulnerability that can be exploited without an available mitigation. The mitigation most people measure Zero Days by is a patch from the software vendor (in this case Oracle).

If we look at the timeline of this exploit you can see how long it was Zero Day for;

  • The Bug was introduced to JRE on July 28th 2011.
  • It was Disclosed to the public on April 2nd 2012.
  • The Exploit was available in the Metasploit Framework on August 26th 2012. With other PoC’s publicly available around the same time.
  • Detection was available via Snort IDS/IPS on August 28th 2012.
  • Lastly a Patch was available from Oracle on 30th August 2012.

If you compare the date the Bug was introduced and the date of the Patch the Zero Day time is 399 days. Comparing the date of Disclosure with the Patch date is still a staggering 150 days. To put this in perspective, a software bug that affects around 1 Billion devices was able to be exploited for well over a year and certainly was being seen in the wild. Whether you take the view that the Zero Day period is around 150 days (from disclosure)  or over a year (from introduction) both are extremely scary.

So how can you tell whether you were exploited using this JRE bug in the last 6 months or year? How can you prove your network or important systems haven’t been exploited using this vulnerability?

Finding Zero Day attacks

Packetpig provides you with the ability to search vast amounts of network packet captures for Zero Day attacks. To demonstrate this I executed the Metasploit Exploit for the JRE bug against a Windows XP workstation and recorded the packet capture. I then went and hid this 500KB capture amongst 600GB of Full Packet Captures from a system we monitor on the Internet. Every packet is captured to an S3 bucket so we can quickly scan the S3 bucket for Zero Days using Amazon’s Elastic Map Reduce.

So for the purpose of this demonstration as soon as the Snort Signatures were updated on the 28th of August I downloaded them. This allowed me to scan the 600GB of packet captures with the old signatures (in this case 2905) and then again with the new signatures (in this case 2931).

Let’s run through the Packetpig job ‘snort_comparison.pig‘ to see how this was done. The key to understanding the job is that we use the Packetpig SnortLoader() to scan the network packet captures with the old signatures and again with the new signatures. Anything in the old signature scan is removed from the new signature scan leaving only the Zero Day attacks.

In the same way as our last post we setup a number of variables using an include.pig file. After that we define old_snort_conf and new_snort_conf;

%DEFAULT includepath pig/include.pig
RUN $includepath;
 
%DEFAULT time 60
 
-- for local mode: uncomment the next line and comment the one after that
--%DEFAULT old_snort_conf 'lib/snort-2905/etc/snort.conf'
%DEFAULT old_snort_conf '/mnt/var/lib/snort-2905/etc/snort.conf'
 
-- for local mode: uncomment the next line and comment the one after that
--%DEFAULT new_snort_conf 'lib/snort-2931/etc/snort.conf'
%DEFAULT new_snort_conf '/mnt/var/lib/snort-2931/etc/snort.conf'

The SnortLoader() is used with the old snort.conf and the new snort.conf to scan the packet captures;

snort_old_alerts =
    LOAD '$pcap'
    USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$old_snort_conf')
    AS (
        ts:long,
        sig:chararray,
        priority:int,
        message:chararray,
        proto:chararray,
        src:chararray,
        sport:int,
        dst:chararray,
        dport:int
);
 
snort_new_alerts =
    LOAD '$pcap'
    USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$new_snort_conf')
    AS (
        ts:long,
        sig:chararray,
        priority:int,
        message:chararray,
        proto:chararray,
        src:chararray,
        sport:int,
        dst:chararray,
        dport:int
);
Next we group (COGROUP) the old and the new Snort scans and we filter out any signatures that appear in both;

snort_joined = COGROUP snort_old_alerts BY sig, snort_new_alerts BY sig;
new_only_filtered = FILTER snort_joined BY (COUNT(snort_old_alerts) == 0);

Lastly we re-project the data and then store it. The snort_comparison_new/part-r-00000 file is a verbose version of snort_comparison/summary/part-r-00000.

new_only_flattened = FOREACH new_only_filtered GENERATE FLATTEN(snort_new_alerts);
new_only_summary = FOREACH new_only_filtered GENERATE group, COUNT(snort_new_alerts);
 
STORE new_only_flattened INTO '$output/snort_comparison_new';
STORE new_only_summary INTO '$output/snort_comparison_summary';

To demonstrate this in practice I test the job on a small number of packet captures on my local development laptop. Watch the video to see how to do it.

Next I take it to the cloud and use 80 x m2.4large instances to process 600GB of full packet captures to find the Oracle JRE 1.7 attack. The 80 nodes spin up, install all the Packetpig software (bootstrap) and then go to work crunching the network packet captures. Check out the video to see the full process.

Big Graph Data on Hortonworks Data Platform

hortonworks-aurelius-header

HDP Monitor The Hortonworks Data Platform (HDP) conveniently integrates numerous Big Data tools in the Hadoop ecosystem. As such, it provides cluster-oriented storage, processing, monitoring, and data integration services. HDP simplifies the deployment and management of a production Hadoop-based system.

In Hadoop, data is represented as key/value pairs. In HBase, data is represented as a collection of wide rows. These atomic structures makes global data processing (via MapReduce) and row-specific reading/writing (via HBase) simple. However, writing queries is nontrivial if the data has a complex, interconnected structure that needs to be analyzed (see Hadoop joins and HBase joins). Without an appropriate abstraction layer, processing highly structured data is cumbersome. Indeed, choosing the right data representation and associated tools opens up otherwise unimaginable possibilities. One such data representation that naturally captures complex relationships is a graph (or network). This post presents Aurelius‘ Big Graph Data technology suite in concert with Hortonworks Data Platform. Moreover, for a real-world grounding, a GitHub clone is described in this context to help the reader understand how to use these technologies for building scalable, distributed, graph-based systems.

Aurelius Graph Cluster and Hortonworks Data Platform Integration

Aurelius Graph Cluster The Aurelius Graph Cluster can be used in concert with Hortonworks Data Platform to provide users a distributed graph storage and processing system with the management and integration benefits provided by HDP. Aurelius’ graph technologies include Titan, a highly-scalable graph database optimized for serving real-time results to thousands of concurrent users and Faunus, a distributed graph analytics engine that is optimized for batch processing graphs represented across a multi-machine cluster.

In an online social system, for example, there typically exists a user base that is creating things and various relationships amongst these things (e.g. likes, authored, references, stream). Moreover, they are creating relationships amongst themselves (e.g. friend, group member). To capture and process this structure, a graph database is useful. When the graph is large and it is under heavy transactional load, then a distributed graph database such as Titan/HBase can be used to provide real-time services such as searches, recommendations, rankings, scorings, etc. Next, periodic offline global graph statistics can be leveraged. Examples include identifying the most connected users, or tracking the relative importance of particular trends. Faunus/Hadoop serves this requirement. Graph queries/traversals in Titan and Faunus are simple, one-line commands that are optimized both semantically and computationally for graph processing. They are expressed using the Gremlin graph traversal language. The roles that Titan, Faunus, and Gremlin play in HDP are diagrammed below. Aurelius and HDP Integration

A Graph Representation of GitHub

Octocat socialite GitHub is an online source code service where over 2 million people collaborate on over 4 million projects. However, GitHub provides more than just revision control. In the last 4 years, GitHub has become a massive online community for software collaboration. Some of the biggest software projects in the world use GitHub (e.g. the Linux kernel).

GitHub is growing rapidly — 10,000 to 30,000 events occur each hour (e.g. a user contributing code to a repository). Hortonworks Data Platform is suited to storing, analyzing, and monitoring the state of GitHub. However, it lacks specific tools for processing this data from a relationship-centric perspective. Representing GitHub as a graph is natural because GitHub connects people, source code, contributions, projects, and organizations in diverse ways. Thinking purely in terms of key/value pairs and wide rows obfuscates the underlying relational structure which can be leveraged for more complex real-time and batch analytic algorithms.

GitHub Octocat

GitHub provides 18 event types, which range from new commits and fork events, to opening new tickets, commenting, and adding members to a project. The activity is aggregated in hourly archives, [each of which] contains a stream of JSON encoded GitHub events. (via githubarchive.org)

The aforementioned events can be represented according to the popular property graph data model. A graph schema describing the types of “things” and relationships between them is diagrammed below. A parse of the raw data according to this schema yields a graph instance. GitHub Schema

Deploying a Graph-Based GitHub

Amazon EC2 To integrate the Aurelius Graph Cluster with HDP, Whirr is used to launch a 4 m1.xlarge machine cluster on Amazon EC2. Detailed instructions for this process are provided on the Aurelius Blog, with the exception that a modified Whirr properties file must be used for HDP. A complete HDP Whirr solution is currently in development. To add Aurelius technologies to an existing HDP cluster, simply download Titan and Faunus, which interface with installed components such as Hadoop and HBase without further configuration.

5830 hourly GitHub Archive files between mid-March 2012 and mid-November 2012 contain 31 million GitHub events. The archive files are parsed to generate a graph. For example, when a GitHub push event is parsed, vertices with the types user, commit, and repository are generated. An edge with label pushed links the user to the commit and an edge with label to links the commit to the repository. The user vertex has properties such as user name and email address, the commit vertex has properties such as the unique sha sum identifier for the commit and its timestamp, and the repository vertex has properties like its URL and the programming language used. In this way, the 31 million events give rise to 27 million vertices and 79 million edges (a relatively small graph). Complete instructions for parsing the data are in the githubarchive-parser documentation. Once the configuration options are reviewed, launching the automated parallel parser is simple.

$ export LC_ALL="C"
$ export JAVA_OPTIONS="-Xmx1G"
$ python AutomatedParallelParser.py batch

The generated vertex and edge data is imported into the Titan/HBase cluster using the BatchGraph wrapper of the Blueprints graph API (a simple, single threaded insertion tool).

$ export JAVA_OPTIONS="-Xmx12G"
$ gremlin -e ImportGitHubArchive.groovy vertices.txt edges.txt

Titan: Distributed Graph Database

Titan: A Distributed Graph Database Titan is a distributed graph database that leverages existing storage systems for its persistence. Currently, Titan provides out-of-the-box support for Apache HBase and Cassandra (see documentation). Graph storage and processing in a clustered environment is made possible because of numerous techniques to both efficiently represent a graph within a BigTable-style data system and to efficiently process that graph using linked-list walking and vertex-centric indices. Moreover, for the developer, Titan provides native support for the Gremin graph traversal language. This section will demonstrate various Gremlin traversals over the parsed GitHub data.

The following Gremlin snippet determines which repositories Marko Rodriguez (okram) has committed to the most. The query first locates the vertex with name okram and then takes outgoing pushed-edges to his commits. For each of those commits, the outgoing to-edges are traversed to the repository that commit was pushed to. Next, the name of the repository is retrieved and those names are grouped and counted. The side-effect count map is outputted, sorted in decreasing order, and displayed. A graphical example demonstrating gremlins walking is diagrammed below.

gremlin> g = TitanFactory.open('bin/hbase.local')                
==>titangraph[hbase:127.0.0.1]
gremlin> g.V('name','okram').out('pushed').out('to').github_name.groupCount.cap.next().sort{-it.value}
==>blueprints=413
==>gremlin=69
==>titan=49
==>pipes=49
==>rexster=40
==>frames=26
==>faunus=23
==>furnace=9
==>tinkubator=5
==>homepage=1

Github Gremlin Traversal

The above query can be taken 2-steps further to determine Marko’s collaborators. If two people have pushed commits to the same repository, then they are collaborators. Given that the number of people committing to a repository could be many and typically, a collaborator has pushed numerous commits, a max of 2500 such collaborator paths are searched. One of the most important aspects of graph traversing is understanding the combinatorial path explosions that can occur when traversing multiple hops through a graph (see Loopy Lattices).

gremlin> g.V('name','okram').out('pushed').out('to').in('to').in('pushed').hasNot('name','okram')[0..2500]
   .name.groupCount.cap.next().sort{-it.value}[0..4]
==>lvca=877
==>spmallette=504
==>sgomezvillamor=424
==>mbroecheler=356
==>joshsh=137

Complex traversals are easy to formulate with the data in this representation. For example, Titan can be used to generate followship recommendations. There are numerous ways to express a recommendation (with varying semantics). A simple one is: “Recommend me people to follow based on people who watch the same repositories as me. The more repositories I watch in common with someone, the higher they should be ranked.” The traversal below starts at Marko, then traverses to all the repositories that Marko watches. Then to who else (not Marko) looks at those repositories and finally counts those people and returns the top 5 names of the sorted result set. In fact, Marko and Stephen (spmallette) are long time collaborators and thus, have similar tastes in software.

gremlin> g.V('name','okram').out('watched').in('watched').hasNot('name','okram').name.groupCount
   .cap.next().sort{-it.value}[0..4]
==>spmallette=3
==>alex-wajam=3
==>crimeminister=2
==>redgetan=2
==>snicaise=2
gremlin> g.V('name','okram').out('created').has('type','Comment').count()
==>159
gremlin> g.V('name','okram').out('created').has('type','Issue').count()  
==>176
gremlin> g.V('name','okram').out('edited').count()                     
==>85

A few self-describing traversals are presented above that are rooted at okram. Finally, note that Titan is optimized for local/ego-centric traversals. That is, from a particular source vertex (or small set of vertices), use some path description to yield a computation based on the explicit paths walked. For doing global graph analyses (where the source vertex set is the entire graph), a batch processing framework such as Faunus is used.

Faunus: Graph Analytics Engine

Faunus: Graph Computing with HadoopEvery Titan traversal begins at a small set of vertices (or edges). Titan is not designed for global analyses which involve processing the entire graph structure. The Hadoop component of Hortonworks Data Platform provides a reliable backend for global queries via Faunus. Gremlin traversals in Faunus are compiled down to MapReduce jobs, where the first job’s InputFormat is Titan/HBase. In order to not interfere with the production Titan/HBase instance, a snapshot of the live graph is typically generated and stored in Hadoop’s distributed file system HDFS as a SequenceFile available for repeated analysis. The most general SequenceFile (with all vertices, edges, and properties) is created below (i.e. a full graph dump).

faunus$ cat bin/titan-seq.properties 
faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
hbase.zookeeper.quorum=10.68.65.161
hbase.mapreduce.inputtable=titan
hbase.mapreduce.scan.cachedrows=75
faunus.graph.output.format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
faunus.sideeffect.output.format=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
faunus.output.location=full-seq
faunus.output.location.overwrite=true

faunus$ bin/gremlin.sh

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = FaunusFactory.open('bin/titan-seq.properties')
==>faunusgraph[titanhbaseinputformat]
gremlin> g._().toString()
==>[IdentityMap]
gremlin> g._()
12/12/13 09:19:53 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
12/12/13 09:19:55 INFO mapred.JobClient:  map 0% reduce 0%
12/12/13 09:21:26 INFO mapred.JobClient:  map 1% reduce 0%
12/12/13 09:21:36 INFO mapred.JobClient:  map 2% reduce 0%
12/12/13 09:21:43 INFO mapred.JobClient:  map 3% reduce 0%
...
gremlin> hdfs.ls()
==>rwx------ ubuntu supergroup 0 (D) .staging
==>rwxr-xr-x ubuntu supergroup 0 (D) full-seq
gremlin> hdfs.ls('full-seq/job-0')
==>rw-r--r-- ubuntu supergroup 0 _SUCCESS
==>rwxr-xr-x ubuntu supergroup 0 (D) _logs
==>rw-r--r-- ubuntu supergroup 243768636 part-m-00000
==>rw-r--r-- ubuntu supergroup 125250887 part-m-00001
==>rw-r--r-- ubuntu supergroup 331912876 part-m-00002
==>rw-r--r-- ubuntu supergroup 431617929 part-m-00003
...

Given the generated SequenceFile, the vertices and edges are counted by type and label, which is by definition a global operation.

gremlin> g.V.type.groupCount
==>Gist         780626
==>Issue        1298935
==>Organization 36281
==>Comment      2823507
==>Commit       20338926
==>Repository   2075934
==>User         983384
==>WikiPage     252915
gremlin> g.E.label.groupCount                                           
==>deleted        170139
==>on             7014052
==>owns           180092
==>pullRequested  930796
==>pushed         27538088
==>to             27719774
==>added          181609
==>created        10063346
==>downloaded     122157
==>edited         276609
==>forked         1015435
==>of             536816
==>appliedForkTo  1791
==>followed       753451
==>madePublic     26602
==>watched        2784640

Since GitHub is collaborative in a way similar to Wikipedia, there are a few users who contribute a lot, and many users who contribute little or none at all. To determine the distribution of contributions, Faunus can be used to compute the out degree distribution of pushed-edges, which correspond to users pushing commits to repositories. This is equivalent to Gremlin visiting each user vertex, counting all of the outgoing pushed-edges, and returning the distribution of counts.

gremlin> g.V.sideEffect('{it.degree = it.outE("pushed").count()}').degree.groupCount
==>1	57423
==>10	8856
==>100	527
==>1000	9
==>1004	5
==>1008	6
==>1011	6
==>1015	6
==>1019	3
==>1022	9
==>1026	2
==>1033	6
==>1037	4
==>104	462
==>1040	3
==>...

When the degree distribution is plotted using log-scaled axes, the results are similar to the Wikipedia contribution distribution, as expected. This is a common theme in most natural graphs — real-world graphs are not random structures and are composed of few “hubs” and numerous “satellites.”
github-pushed-out-degree-distribution

Hortonworks with Gremlin More sophisticated queries can be performed by first extracting a slice of the original graph that only contains relevant information. These slices can be saved to HDFS for subsequent traversals. For example, to calculate the most central co-watched project on GitHub, the primary graph is stripped down to only watched-edges between users and repositories. The final traversal below, walks the “co-watched” graph 2 times and counts the number of paths that have gone through each repository. The repositories are sorted by their path counts in order to express which repositories are most central/important/respected according to the watches subgraph.

gremlin> g.E.has('label','watched').keep.V.has('type','Repository','User').keep
...
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitVerticesMapReduce$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_DROPPED=19377850
12/12/13 11:08:13 INFO mapred.JobClient:     VERTICES_KEPT=2074099
12/12/13 11:08:13 INFO mapred.JobClient:   com.thinkaurelius.faunus.mapreduce.sideeffect.CommitEdgesMap$Counters
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_DROPPED=55971128
12/12/13 11:08:13 INFO mapred.JobClient:     OUT_EDGES_KEPT=1934706
...
gremlin> g = g.getNextGraph()
gremlin> g.V.in('watched').out('watched').in('watched').out('watched').property('_count',Long.class)
   .order(F.decr,'github_name')
==>backbone	4173578345
==>html5-boilerplate	4146508400
==>normalize.css	3255207281
==>django	3168825839
==>three.js	3078851951
==>Modernizr	2971383230
==>rails	2819031209
==>httpie	2697798869
==>phantomjs	2589138977
==>homebrew	2528483507
...

Conclusion

Aurelius This post discussed the use of Hortonworks Data Platform in concert with the Aurelius Graph Cluster to store and process the graph data generated by the online social coding system GitHub. The example data set used throughout was provided by GitHub Archive, an ongoing record of events in GitHub. While the dataset currently afforded by GitHub Archive is relatively small, it continues to grow each day. The Aurelius Graph Cluster has been demonstrated in practice to support graphs with hundreds of billions of edges. As more organizations realize the graph structure within their Big Data, the Aurelius Graph Cluster is there to provide both real-time and batch graph analytics.

Acknowledgments

The authors wish to thank Steve Loughran for his help with Whirr and HDP. Moreover, Russell Jurney requested this post and, in a steadfast manner, ensured it was delivered.

Related Material

Hawkins, P., Aiken, A., Fisher, K., Rinard, M., Sagiv, M., “Data Representation Synthesis,” PLDI’11, June 2011.

Pham, R., Singer, L., Liskin, O., Filho, F. F., Schneider, K., “Creating a Shared Understanding of
Testing Culture on a Social Coding Site
.” Leibniz Universität Hannover, Software Engineering Group: Technical Report, Septeber 2012.

Alder, B. T., de Alfaro, L., Pye, I., Raman V., “Measuring Author Contributions to the Wikipedia,” WikiSym ’08 Proceedings of the 4th International Symposium on Wikis, Article No. 15, September 2008.

Rodriguez, M.A., Mallette, S.P., Gintautas, V., Broecheler, M., “Faunus Provides Big Graph Data Analytics,” Aurelius Blog, November 2012.

Rodriguez, M.A., LaRocque, D., “Deploying the Aurelius Graph Cluster,” Aurelius Blog, October 2012.

Ho, R., “Graph Processing in Map Reduce,” Pragmatic Programming Techniques Blog, July 2010.

Authors


Vadas Gintautas Marko A. Rodriguez

Big Data Security Part Two: Introduction to PacketPig

Introduction

Packetpig is the tool behind Packetloop. In Part One of the Introduction to Packetpig I discussed the background and motivation behind the Packetpig project and problems Big Data Security Analytics can solve. In this post I want to focus on the code and teach you how to use our building blocks to start writing your own jobs.

The ‘building blocks’ are the Packetpig custom loaders that allow you to access specific information in packet captures. There are a number of them but two I will focus in this post are;

  • Packetloader() allows you to access protocol information (Layer-3 and Layer-4) from packet captures.
  • SnortLoader() inspects traffic using Snort Intrusion Detection software.

Calculating Bandwidth and Binning Time

The Packetloader() provides access to IP, TCP and UDP headers for each packet in the capture. A great example of it’s use is the ‘binning.pig‘ script. This script allows you to calculate the bandwidth used by TCP and UDP packets as well as total bandwidth at any period you define. You might want to calculate these totals every minute, hour, day, week or month to produce a graph.

Firstly run the binning script using the following command.

./pigrun.py -x local -r data/web.pcap -f pig/examples/binning.pig

Then open up output/binning/part-r-00000 in a text editor to see the output.

Now let’s walk through the script. Firstly let’s include all the jar’s required for Packetpig and binning.pig to run;

%DEFAULT includepath pig/include.pig
RUN $includepath;
Then the amount of time you want to bin your values into. In this case I want to output the values every minute (60 seconds) but I could easily change this to an hour (3600 seconds) by commenting and uncommenting the following lines;
%DEFAULT time 60
--%DEFAULT time 3600
Then I load the data out of the packet captures into quite a large schema using the Packetloader();
packets = load '$pcap' using com.packetloop.packetpig.loaders.pcap.packet.PacketLoader() AS (
    ts,
    ip_version:int,
    ip_header_length:int,
    ip_tos:int,
    ip_total_length:int,
    ip_id:int,
    ip_flags:int,
    ip_frag_offset:int,
    ip_ttl:int,
    ip_proto:int,
    ip_checksum:int,
    ip_src:chararray,
    ip_dst:chararray,
    tcp_sport:int,
    tcp_dport:int,
    tcp_seq_id:long,
    tcp_ack_id:long,
    tcp_offset:int,
    tcp_ns:int,
    tcp_cwr:int,
    tcp_ece:int,
    tcp_urg:int,
    tcp_ack:int,
    tcp_psh:int,
    tcp_rst:int,
    tcp_syn:int,
    tcp_fin:int,
    tcp_window:int,
    tcp_len:int,
    udp_sport:int,
    udp_dport:int,
    udp_len:int,
    udp_checksum:chararray
);

This is a very rich data model and through leveraging the timestamp (ts), size of the IP packet (ip_total_length), and size of the TCP (tcp_len) and UDP (udp_len) we can calculate total and respective bandwidths at any interval.  The beauty of pig is that I could easily hone in on specific hosts by grouping on the Source IP, Destination IP and Destination Port – but let’s keep things simple in this post.

The ip_proto field allows be to filter all packets based on protocol. TCP is IP protocol 6 and UDP is IP protocol 17.

tcp = FILTER packets BY ip_proto == 6;
udp = FILTER packets BY ip_proto == 17;

Once filtered we can bin each packet into a time period and then project a summary of the data with the size of all TCP packets in that time period (bin) summed.

tcp_grouped = GROUP tcp BY (ts / $time * $time);
tcp_summary = FOREACH tcp_grouped GENERATE group, SUM(tcp.tcp_len) AS tcp_len;

And then the same for UDP.

udp_grouped = GROUP udp BY (ts / $time * $time);
udp_summary = FOREACH udp_grouped GENERATE group, SUM(udp.udp_len) AS udp_len;

To get calculate total bandwidth of all IP packets we bin all packets using the same time period and then sum ip_total_length.

bw_grouped = GROUP packets BY (ts / $time * $time);
bw_summary = FOREACH bw_grouped GENERATE group, SUM(packets.ip_total_length) AS bw;
The output we were looking for is basically comma separated values for timestamp, tcp bandwidth, udp bandwidth and total bandwidth. This is produced by a final join and projection.
joined = JOIN tcp_summary BY group, udp_summary BY group, bw_summary BY group;
summary = FOREACH joined GENERATE tcp_summary::group, tcp_len, udp_len, bw;

It may seem a little cryptic but basically the JOIN statement is joining using the group that all the summaries share which is the time period. If you ILLUSTRATE the joined variable you will see the data is there but not in the format we are looking for.

| joined | tcp_summary::group:int | tcp_summary::tcp_len:long | udp_summary::group:int | udp_summary::udp_len:long | bw_summary::group:int | bw_summary::bw:long |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 1322644980 | 2080 | 1322644980 | 81 | 1322644980 | 2305 |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

However the summary projection generates the output the way we want it and we store that in a CSV format using PigStorage(‘,’).

STORE summary INTO '$output/binning' USING PigStorage(',');

Threat Detection

The SnortLoader() can be used to replay all conversations through Snort IDS and output attacks that it finds. The SnortLoader() can also take a snort.conf as a parameter so you can scan packet captures with specific Snort versions.

Run the basic snort.pig script to get an idea of the output.

./pigrun.py -x local -r data/web.pcap -f pig/examples/snort.pig

Now let’s run through the snort.pig script. Again we include all the jar’s we need for Packetpig.

%DEFAULT includepath pig/include.pig
RUN $includepath;

The script is constructed so that you can pass parameters to either scan all traffic for attacks or zero in on specific source and destination IP addresses. By leaving most of these null we inspect all traffic. Also note we are again binning time every 60 seconds. Lastly the Packetpig includes a number of versions of Snort. The default snort.conf we include ensures you use the latest one.

%DEFAULT time 60
%DEFAULT src null
%DEFAULT dst null
%DEFAULT sport null
%DEFAULT dport null
%DEFAULT snortconfig 'lib/snort/etc/snort.conf'

The SnortLoader() receives the snortconfig paramter and starts inspection the packet capture for attacks and provides them back to you in defined schema.

snort_alerts =
  LOAD '$pcap'
  USING com.packetloop.packetpig.loaders.pcap.detection.SnortLoader('$snortconfig')
  AS (
    ts:long,
    sig:chararray,
    priority:int,
    message:chararray,
    proto:chararray,
    src:chararray,
    sport:int,
    dst:chararray,
    dport:int
  );

Using this schema you can access the timestamp (ts), Snort Signature ID (sig), Severity/Priority (priority), Description of the attack (message) and the Source (src), Source Port (sport), Destination (dst) and Destination port (dport) of the attack.

If you ran the script and opened up output/snort/part-m-00000 you will see a number of attacks matching the schema output of the SnortLoader(). One thing to note is Snort using Priority 1 for the highest severity, Priority 2 for the next highest etc.

1322645240 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 184.84.221.18 80 192.168.0.19 34299
1322645387 139_1 2 (spp_sdf) SDF Combination Alert DIVERT 184.84.221.18 0 192.168.0.19 0
1322645603 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 74.125.237.27 80 192.168.0.19 41791
1322645907 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 199.181.254.21 80 192.168.0.19 54222
1322645689 120_3 3 (http_inspect) NO CONTENT-LENGTH OR TRANSFER-ENCODING IN HTTP RESPONSE TCP 74.125.237.123 80 192.168.0.19 42514
1322645739 138_5 2 SENSITIVE-DATA Email Addresses TCP 74.125.237.123 80 192.168.0.19 42514

The snort.pig script is our most basic example but hopefully you are already thinking about what you could filter on (e.g. Severity) as well as re projecting the data you access out of SnortLoader() to find the top ten attackers and top ten victims.

In my next post I will show you how to find Zero Day attacks in past network packet captures.

Agile Data European Megatour, then Home to Atlanta!

Agile Data hits the road this month, crossing Europe with the good news about Hadoop and teaching Hadoop users how build value from data using Hadoop to build analytics applications.

We’ll be giving out discount coupons to Hadoop Summit Europe, which is March 20-21st in Amsterdam!

  1. 11/3 – Agile Data @ The Warsaw Hadoop Users Group
  2. 11/5 to 11/6 – Attending ApacheCon Europe 2012 in Sinsheim, Germany. Say hello!
  3. 11/7 – Agile Data @ The France Hadoop Users Group in Paris
  4. 11/8 – Agile Data @ Netherlands Hadoop Users Group in Utrecht
  5. 11/12 – Agile Data @ Hadoop Users Group UK in London.
  6. 11/13 – Agile Data @ HP Labs in Bristol, England.
  7. 11/15 – Agile Data @ The combined Data Science ATL / Atlanta Hadoop Users Group

  8. 11/16 – Agile Data @ The Emory Library
  9. 11/19 – Agile Data @ The Atlanta MongoDB Users Group

I’m writing this from Warsaw, the first stop on my tour. This is my first time in Poland, and I’m excited to be speaking tonight at the Warsaw HUG and look forward to hearing about Hadoop in Poland. Tomorrow I’ll be checking out the sites, so let me know if you’d like to volunteer as tourguide in exchange for free, on the spot consulting!

You can view the incomplete book on O’Reilly OFPS here – I’ll be updating it daily for the next three weeks, so check Chapter 10, where I use Pig to build a graphical model in an attempt to improve my wife’s response rate to my emails :) . Code examples for the book are available here on github.

If you can’t make one of the talks, check out the slides below from my DC-HUG presentation, and help spread the good news!



Big Data Security Part One: Introducing PacketPig

Series Introduction

Packetloop CTO Michael Baker (@cloudjunky) made a big splash when he presented ‘Finding Needles in Haystacks (the Size of Countries)‘ at Blackhat Europe earlier this year. The paper outlines a toolkit based on Apache Pig, Packetpig @packetpig (available on github), for doing network security monitoring and intrusion detection analysis on full packet captures using Hadoop.

In this series of posts, we’re going to introduce Big Data Security and explore using Packetpig on real full packet captures to understand and analyze networks. In this post, Michael will introduce big data security in the form of full data capture, Packetpig and Packetloop.

Introducing Packetpig

Intrusion detection is the analysis of network traffic to detect intruders on your network. Most intrusion detection systems (IDS) look for signatures of known attacks and identify them in real-time. Packetpig is different. Packetpig analyzes full packet captures – that is, logs of every single packet sent across your network – after the fact. In contrast to existing IDS systems, this means that using Hadoop on full packet captures, Packetpig can detect ‘zero day’ or unknown exploits on historical data as new exploits are discovered. Which is to say that Packetpig can determine whether intruders are already in your network, for how long, and what they’ve stolen or abused.

Packetpig is a Network Security Monitoring (NSM) Toolset where the ‘Big Data’ is full packet captures. Like a Tivo for your network, through its integration with Snort, p0f and custom java loaders, Packetpig does deep packet inspection, file extraction, feature extraction, operating system detection, and other deep network analysis. Packetpig’s analysis of full packet captures focuses on providing as much context as possible to the analyst. Context they have never had before. This is a ‘Big Data’ opportunity.

Full Packet Capture: A Big Data Opportunity

What makes full packet capture possible is cheap storage – the driving factor behind ‘big data.’ A standard 100Mbps internet connection can be cheaply logged for months with a 3TB disk. Apache Hadoop is optimized around cheap storage and data locality: putting spindles next to processor cores. And so what better way to analyze full packet captures than with Apache Pig – a dataflow scripting interface on top of Hadoop.

In the enterprise today, there is no single location or system to provide a comprehensive view of a network in terms of threats, sessions, protocols and files. This information is generally distributed across domain-specific systems such as IDS Correlation Engines and data stores, Netflow repositories, Bandwidth optimisation systems or Data Loss Prevention tools. Security Information and Event Monitoring systems offer to consolidate this information but they operate on logs – a digest or snippet of the original information. They don’t provide full fidelity information that can be queried using the exact copy of the original incident.

Packet captures are a standard binary format for storing network data. They are cheap to perform and the data can be stored in the cloud or on low-cost disk in the Enterprise network. The length of retention can be based on the amount of data flowing through the network each day and the window of time you want to be able to peer into the past.

Pig, Packetpig and Open Source Tools

In developing Packetpig, Packetloop wanted to provide free tools for the analysis network packet captures that spanned weeks, months or even years. The simple questions of capture and storage of network data had been solved but no one had addressed the fundamental problem of analysis. Packetpig utilizes the Hadoop stack for analysis, which solves this problem.

For us, wrapping Snort and p0f was a bit of a homage to how much security professionals value and rely on open source tools. We felt that if we didn’t offer an open source way of analysing full packet captures we had missed a real opportunity to pioneer in this area. We wanted it to be simple, turn key and easy for people to take our work and expand on it. This is why Apache Pig was selected for the project.

Understanding your Network

One of the first data sets we were given to analyse was a 3TB data set from a customer. It was every packet in and out of their 100Mbps internet connection for 6 weeks. It contained approximately 500,000 attacks. Making sense of this volume of information is incredibly difficult with current tooling. Even Network Security Monitoring (NSM) tools have difficult with this size of data. However it’s not just size and scale. No existing toolset allowed you to provide the same level of context. Packetpig allows you to join together information related to threats, sessions, protocols (deep packet inspection) and files as well as Geolocation and Operating system detection information.

We are currently logging all packets for a website for six months. This data set is currently around 0.6TB and because all the packet captures are stored in S3 we can quickly scan through the dataset. More importantly, we can run a job every nightly or every 15 minutes to correlate attack information with other data from Packetpig to provide an ultimate amount of context related to security events.

Items of interest include:

  • Detecting anomalies and intrusion signatures
  • Learn timeframe and identity of attacker
  • Triage incidents
  • “Show me packet captures I’ve never seen before.”

“Never before seen” is a powerful filter and isn’t limited to attack information. First introduced by Marcus Ranum, “never before seen” can be used to rule out normal network behaviour and only show sources, attacks, and traffic flows that are truly anomalous. For example, think in terms of the outbound communications from a Web Server. What attacks, clients and outbound communications are new or have never been seen before? In an instant you get an understanding that you don’t need to look for the normal, you are straight away looking for the abnormal or signs of misuse.

Agile Data

Packetloop uses the stack and iterative prototyping techniques outlined in the forthcoming book by Hortonworks’ own Russell Jurney, Agile Data (O’Reilly, March 2013). We use Hadoop, Pig, Mongo and Cassandra to explore datasets and help us encode important information into d3 visualisations. Currently we use all of these tools to aid in our research before we add functionality to Packetloop. These prototypes become the palette our product is built from.

Miss Piggy Takes Manhattan: Pig Meetup at Strata NYC on Wed, Oct 24th

There will be a Pig meetup at Strata NYC/Hadoop World, at 6:30PM on Wed, Oct 24th in the Bryant Room of the Hilton New York. This will also be the inaugural meeting of the NYC Pig User Group, which Doug Daniels of Pig contributor Mortar Data was good enough to organize. We look forward to future Pig meetups in NYC!

Hortonworks’ own Daniel Dai @daijy, VP of Apache Pig, will present on new features in Pig 0.11. You can view a summary of JIRA tickets for Pig 0.11 here. New features include the CUBE operator, a new RANK operator, the addition of a DateTime type, speed improvements via SchemaTuple, and many others.

More information is available on the Pig meetup page: http://www.meetup.com/PigUser/events/85047782/.

Those of you too young to understand the Miss Piggy reference, should look here.

Pig Macro for TF-IDF Makes Topic Summarization 2 Lines of Pig

In a recent post we used Pig to summarize documents via the Term-Frequency, Inverse Document Frequency (TF-IDF) algorithm.

In this post, we’re going to turn that code into a Pig macro that can be called in one line of code:

1
2
import 'tfidf.macro';
my_tf_idf_scores = tf_idf(id_body, 'message_id', 'body');

Our macro, in filename tfidf.macro looks just like our pig script, with a couple of new lines. Note the use of macro variables for input and output preceded with the ‘$’ character: $in_relation, $out_relation, $id_field and $text_field. These let us apply the variable to any relation with a unique identifier field and a text body field. You can get it on github here. The file which tests the macro is here. The code that the macro generates is here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DEFINE tf_idf(in_relation, id_field, text_field) RETURNS out_relation {
  token_records = foreach $in_relation generate $id_field, FLATTEN(TOKENIZE($text_field)) as tokens;
 
  /* Calculate the term count per document */
  doc_word_totals = foreach (group token_records by ($id_field, tokens)) generate 
    FLATTEN(group) as ($id_field, token), 
    COUNT_STAR(token_records) as doc_total;
 
  /* Calculate the document size */
  pre_term_counts = foreach (group doc_word_totals by $id_field) generate
    group AS $id_field,
    FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total), 
    SUM(doc_word_totals.doc_total) as doc_size;
 
  /* Calculate the TF */
  term_freqs = foreach pre_term_counts generate $id_field as $id_field,
    token as token,
    ((double)doc_total / (double)doc_size) AS term_freq;
 
  /* Get count of documents using each token, for idf */
  token_usages = foreach (group term_freqs by token) generate
    FLATTEN(term_freqs) as ($id_field, token, term_freq),
    COUNT_STAR(term_freqs) as num_docs_with_token;
 
  /* Get document count */
  just_ids = foreach $in_relation generate $id_field;
  ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
 
  /* Note the use of Pig Scalars to calculate idf */
  $out_relation = foreach token_usages {
    idf    = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
    tf_idf = (double)term_freq * idf;
    generate $id_field as $id_field,
      token as score,
      (chararray)tf_idf as value:chararray;
  };
};

Note that to debug macros, we can use the -r flag, which will expand the code the macro generates into a .expanded file. It is worth pointing out that this takes us from 37 lines of Pig to 2 lines of pig. Macros facilitate code modularization, re-use and sharing.

Are you sharing enough Hadoop code in your enterprise?

JSONize Anything in Pig with ToJson

The need for a ToJson EvalFunc

When integrating Pig with different NoSQL ‘databases,’ or when publishing data from Hadoop, it can be convenient to JSONize your data. Although Pig has JsonStorage, there hasn’t been a ToJson EvalFunc. This has been inconvenient, as in our post about Pig and ElasticSearch, such that for creating JSON for ElasticSearch to index, tricks like this were necessary:

1
2
3
4
5
6
store enron_emails into '/tmp/enron_emails_elastic' using JsonStorage();
json_emails = load '/tmp/enron_emails_elastic' AS (json_record:chararray);
 
/* Now we can store our email json data to elasticsearch for indexing with message_id. */
store json_emails into 'es://enron/email?json=true&size=1000' USING
  com.infochimps.elasticsearch.pig.ElasticSearchStorage('/me/elasticsearch-0.18.6/config/elasticsearch.yml', '/me/elasticsearch-0.18.6/plugins')

Note how we store as JSON via JsonStorage, then load as a chararray to get the entire record as JSON. It would be more convenient to convert Pig bags and tuples to JSON directly. This would let us retain an ID field as key, and only JSONize our record for that key as a string.

ToJson to the Rescue!

That is precisely what the ToJson method of pig-to-json does. It takes a bag or tuple or nested combination thereof and returns a JSON string.

Our data looks like so:

1
2
3
4
5
6
7
8
9
emails: {message_id: chararray,
         date: chararray,
         from: (address: chararray,name: chararray),
         subject: chararray,
         body: chararray,
         tos: {ARRAY_ELEM: (address: chararray,name: chararray)},
         ccs: {ARRAY_ELEM: (address: chararray,name: chararray)},
         bccs: {ARRAY_ELEM: (address: chararray,name: chararray)}
}

We can JSONify bags:

1
2
3
4
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = limit emails 10;
json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;
dump json_test

Which gets us JSON arrays of JSON objects:

1
2
3
4
5
6
(<589.1075842593084.JavaMail.evans@thyme>,[{"address":"gerald.nemec@enron.com","name":null}])
(<614.1075847580822.JavaMail.evans@thyme>,[{"address":"jfichera@saberpartners.com","name":null},{"address":"barry.goode@gov.ca.gov","name":null},{"address":"hoffman@blackstone.com","name":null},{"address":"joseph.fichera@gov.ca.gov","name":null}])
(<735.1075840186524.JavaMail.evans@thyme>,[{"address":"kam.keiser@enron.com","name":"Kam Keiser"},{"address":"mike.grigsby@enron.com","name":"Mike Grigsby"}])
(<758.1075842602845.JavaMail.evans@thyme>,[{"address":"joan.quick@enron.com","name":null}])
(<765.1075860359973.JavaMail.evans@thyme>,[{"address":"jay_dudley@pgn.com","name":null},{"address":"michele_farrell@pgn.com","name":null}])
...

We can JSONify tuples:

1
2
3
4
emails2 = load '/me/Data/enron.avro' using AvroStorage();
emails2 = limit emails2 10;
json_test2 = foreach emails2 generate message_id, com.hortonworks.pig.udf.ToJson(from) as tuple_json;
dump json_test2

Which gets us JSON objects:

1
2
3
4
5
(<28.1075842613917.JavaMail.evans@thyme>,{"address":"emmye@dracospring.com","name":"\"Emmye\""})
(<85.1075854368299.JavaMail.evans@thyme>,{"address":"darron.giron@enron.com","name":null})
(<167.1075851646300.JavaMail.evans@thyme>,{"address":"jeff.dasovich@enron.com","name":"Jeff Dasovich"})
(<185.1075857304356.JavaMail.evans@thyme>,{"address":"chris.dorland@enron.com","name":"Chris Dorland"})
(<735.1075840186524.JavaMail.evans@thyme>,{"address":"m..love@enron.com","name":"Phillip M. Love"})

And we can JSONify more complex, nested structures:

1
2
3
4
5
-- This works for arbitrarily complex data structures as well
a = foreach (group emails by from.address) generate group as from_address, COUNT_STAR(emails) as sent_count, FLATTEN(ema.tos) as tos;  
b = group a by from_address;
c = foreach b generate group as from_address, com.hortonworks.pig.udf.ToJson(a) as json_test;
store c into '/tmp/big_test_num';

Which gets us an array of JSON objects where one field refers to an array of other objects, and one field stores a number:

1
2
3
4
5
6
bc@ori.org	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"bc@ori.org"}]
ben@crs.hn	[{"tos":[{"address":"klay@enron.com","name":null}],"sent_count":1,"from_address":"ben@crs.hn"}]
cba@uh.edu	[{"tos":[{"address":"rod.hayslett@enron.com","name":null}],"sent_count":1,"from_address":"cba@uh.edu"}]
cei@uh.edu	[{"tos":[{"address":"ceiinfo@uh.edu","name":"Shena Cherian"}],"sent_count":1,"from_address":"cei@uh.edu"}]
nc@mmf.com	[{"tos":[{"address":"kenneth.lay@enron.com","name":null}],"sent_count":1,"from_address":"nc@mmf.com"}]
nyb@uni.de	[{"tos":[{"address":"extramoney@mailman.enron.com","name":null}],"sent_count":1,"from_address":"nyb@uni.de"}]

We are JSONizing fields like crazy. Its madness! Woohoo!

You can download the project on github here: https://github.com/rjurney/pig-to-json. To build it, you can:

1
ant clean; ant dist

ToJson Internals

Setting up a Pig EvalFunc project is easy. I copied my ivy.xml from some place, to import dependencies from maven:

1
2
3
4
5
6
7
8
9
<ivy-module version="2.0">
  <info organisation="org.apache" module="hello-ivy"/>
  <dependencies>
    <dependency org="com.googlecode.json-simple" name="json-simple" rev="1.1"/>
    <dependency org="org.apache.pig" name="pig" rev="0.8.0"/>
    <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
    <dependency org="commons-lang" name="commons-lang" rev="2.5"/>
  </dependencies>
</ivy-module>

I also copied my build.xml from some place too, and edited a few values to make it work with my class. The meat of the UDF is in ToJson.java, which I copied from some place else and edited. There’s an interesting bit here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    public Schema outputSchema(Schema inputSchema) {
 
        // We must strip the outer {} or this schema won't parse on the back-end
        // This is a trick until PIG-2910 is fixed. Thanks to Thejas for this!
        String schemaString = inputSchema.toString().substring(1, inputSchema.toString().length() - 1);
 
        // Set the input schema for processing
        UDFContext context = UDFContext.getUDFContext();
        Properties udfProp = context.getUDFProperties(this.getClass());
 
        udfProp.setProperty("horton.json.udf.schema", schemaString);
 
        // Construct our output schema which is one field, that is a chararray
        return new Schema(new FieldSchema(null, DataType.CHARARRAY));
    }

We use EvalFunc.outputSchema to pass our input’s schema from Pig’s front end to its distributed back-end using the UDFContext’s properties. Our output schema is always a chararray, as this is ToJson after all. We retrieve the input schema in our Evalfunc.exec method:

1
2
3
4
5
6
7
schema = Utils.getSchemaFromString(strSchema);
...
// Parse the schema from the string stored in the properties object.
Object field = input.get(0);
Object jsonObject = fieldToJson(field, schema.getFields().get(0));
String json = jsonObject.toString();
return json;

Pig UDFs are pretty easy to write, as you can see! Be lazy. Find an apache licensed pig UDF on github or in Piggybank and make it your own!

Go to page:123