Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
January 24, 2013
prev slideNext slide

DataFu: The WD-40 of Big Data

If Pig is the “duct tape for big data“, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
  C1 = order A by timestamp;
  generate user, Sessonize(C1);
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines for every italicized function.

You can download DataFu at

You can grab sample data and code you can run on your own for this sessionization example below.

Sessionization Example

Suppose that we have a stream of page views from which we have extracted a member ID and UNIX timestamp. It might look something like this:

memberId timestamp      url
1        1357718725941  /
1        1357718871442  /profile
1        1357719038706  /inbox
1        1357719110742  /groups
2        1357752955401  /inbox
2        1357752982385  /profile

The full data set for this example can be found here.

Using DataFu we can assign session IDs to each of these events and group by session ID in order to compute the length of each session. From there we can complete the exercise by simply applying the statistics UDFs provided by DataFu.

REGISTER piggybank.jar;
REGISTER datafu-0.0.6.jar;
REGISTER guava-13.0.1.jar; -- needed by StreamingQuantile

DEFINE UnixToISO   org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE Sessionize  datafu.pig.sessions.Sessionize('10m');
DEFINE Median      datafu.pig.stats.Median();
DEFINE Quantile    datafu.pig.stats.StreamingQuantile('0.75','0.90','0.95');
DEFINE VAR         datafu.pig.stats.VAR();

pv = LOAD 'clicks.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);

pv = FOREACH pv
     -- Sessionize expects an ISO string
     GENERATE UnixToISO(time) as isoTime,

pv_sessionized = FOREACH (GROUP pv BY memberId) {
  ordered = ORDER pv BY isoTime;
  GENERATE FLATTEN(Sessionize(ordered)) AS (isoTime, time, memberId, sessionId);

pv_sessionized = FOREACH pv_sessionized GENERATE sessionId, time;

-- compute length of each session in minutes
session_times = FOREACH (GROUP pv_sessionized BY sessionId)
                GENERATE group as sessionId,
                            / 1000.0 / 60.0 as session_length;

-- compute stats on session length
session_stats = FOREACH (GROUP session_times ALL) {
  ordered = ORDER session_times BY session_length;
    AVG(ordered.session_length) as avg_session,
    SQRT(VAR(ordered.session_length)) as std_dev_session,
    Median(ordered.session_length) as median_session,
    Quantile(ordered.session_length) as quantiles_session;

DUMP session_stats
You can download DataFu at

This is just a taste. There’s plenty more in the library for you to peruse. Take a look here. DataFu is freely available under the Apache 2 license. We welcome contributions, so please send us your pull requests!

Matthew Hayes & Sam Shah



Robert Sahlin says:

Nice example, how would you add page view number to pv_sessionized? I guess I could use the Enumerate function in Datafu but I’m not shure how to combine that with sessionize in an efficient way.

Matthew Hayes says:

You mean page views per session right? Something like this:

session_times = FOREACH (GROUP pv_sessionized BY sessionId)
GENERATE group as sessionId,
COUNT(pv_sessionized.time) as page_views,
/ 1000.0 / 60.0 as session_length;

rongzheyi says:

May this can help:

However, it is not that efficient, though I think numbering pageviews in each session can be combined with sessionizing ?

Vijaya Veeraraghavan says:

I am so glad that you have put all the UDF’s in a common place. This is going to be very useful for people who are trying to analyze the data and see what insight can be provided to the user community.

Vijayalakshmi Veeraraghavan

Yanusha Avula says:


I was trying to call Sessionize method without using Nested FOREACH in PIG. But no luck and I have referred so many examples for sessionize method where all of them used nested FOREACH statements.

So question is Can we call sessionize method withouth using nested FOREACH statements?

Can anyone help with this.

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums