DataFu: The WD-40 of Big Data

If Pig is the “duct tape for big data“, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
  C1 = order A by timestamp;
  generate user, Sessonize(C1);
}
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines for every italicized function.

You can download DataFu at http://data.linkedin.com/opensource/datafu.

You can grab sample data and code you can run on your own for this sessionization example below.

Sessionization Example

Suppose that we have a stream of page views from which we have extracted a member ID and UNIX timestamp. It might look something like this:

memberId timestamp      url
1        1357718725941  /
1        1357718871442  /profile
1        1357719038706  /inbox
1        1357719110742  /groups
...
2        1357752955401  /inbox
2        1357752982385  /profile
...

The full data set for this example can be found here.

Using DataFu we can assign session IDs to each of these events and group by session ID in order to compute the length of each session. From there we can complete the exercise by simply applying the statistics UDFs provided by DataFu.

REGISTER piggybank.jar;
REGISTER datafu-0.0.6.jar;
REGISTER guava-13.0.1.jar; -- needed by StreamingQuantile
 
DEFINE UnixToISO   org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE Sessionize  datafu.pig.sessions.Sessionize('10m');
DEFINE Median      datafu.pig.stats.Median();
DEFINE Quantile    datafu.pig.stats.StreamingQuantile('0.75','0.90','0.95');
DEFINE VAR         datafu.pig.stats.VAR();
 
pv = LOAD 'clicks.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);
 
pv = FOREACH pv
     -- Sessionize expects an ISO string
     GENERATE UnixToISO(time) as isoTime,
              time,
              memberId;
 
pv_sessionized = FOREACH (GROUP pv BY memberId) {
  ordered = ORDER pv BY isoTime;
  GENERATE FLATTEN(Sessionize(ordered)) AS (isoTime, time, memberId, sessionId);
};
 
pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, time;
 
-- compute length of each session in minutes
session_times = FOREACH (GROUP pv_sessionized BY sessionId)
                GENERATE group as sessionId,
                         (MAX(pv_sessionized.time)-MIN(pv_sessionized.time))
                            / 1000.0 / 60.0 as session_length;
 
-- compute stats on session length
session_stats = FOREACH (GROUP session_times ALL) {
  ordered = ORDER session_times BY session_length;
  GENERATE
    AVG(ordered.session_length) as avg_session,
    SQRT(VAR(ordered.session_length)) as std_dev_session,
    Median(ordered.session_length) as median_session,
    Quantile(ordered.session_length) as quantiles_session;
};
 
DUMP session_stats
--(15.737532575757575,31.29552045993877,(2.848041666666667),(14.648516666666666,31.88788333333333,86.69525))
You can download DataFu at http://data.linkedin.com/opensource/datafu.

This is just a taste. There’s plenty more in the library for you to peruse. Take a look here. DataFu is freely available under the Apache 2 license. We welcome contributions, so please send us your pull requests!

Matthew Hayes & Sam Shah

Categorized by :
Big Data Pig

Comments

|
January 26, 2013 at 2:04 pm
|

I am so glad that you have put all the UDF’s in a common place. This is going to be very useful for people who are trying to analyze the data and see what insight can be provided to the user community.

Thanks.
Vijayalakshmi Veeraraghavan

|
January 25, 2013 at 1:35 pm
|

Nice example, how would you add page view number to pv_sessionized? I guess I could use the Enumerate function in Datafu but I’m not shure how to combine that with sessionize in an efficient way.

    rongzheyi
    |
    September 15, 2013 at 4:25 am
    |

    May this can help: http://help.mortardata.com/reference/analyzing_log_data/common_functions

    However, it is not that efficient, though I think numbering pageviews in each session can be combined with sessionizing ?

    Matthew Hayes
    |
    January 29, 2013 at 4:30 pm
    |

    You mean page views per session right? Something like this:

    session_times = FOREACH (GROUP pv_sessionized BY sessionId)
    GENERATE group as sessionId,
    COUNT(pv_sessionized.time) as page_views,
    (MAX(pv_sessionized.time)-MIN(pv_sessionized.time))
    / 1000.0 / 60.0 as session_length;

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Join the Webinar!

Joint Webinar with SAP: Big Data, Big Thinking: Unifying Your IT Architecture
Thursday, September 18, 2014
11 a.m. IST, 2:30 pm JST / KST, 3:30 pm AEST

More Webinars »

Modern Data Architecture
Tackle the challenges of big data. Hadoop integrates with existing EDW, RDBMS and MPP systems to deliver lower cost, higher capacity infrastructure.
HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.