I ran across an interesting problem in my attempt to implement random forest using Apache Pig. In random forest, each tree is trained using a bootstrap sample. That is, sample N cases at random out of a dataset of size N, with replacement.

For example, here is the input data:

(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

Here is one bootstrap sample drawn from input:

(5, 2, 3, 2, 3, 9, 7, 3, 0, 4)

Each element can appear 0 to N times.

How does one get it done in Pig? I explored a few options and wanted to share my findings.

Sample operator

Pig does have a sample operator. Here is a sample for how to use it:

A = load 'input.txt'; B = sample A 0.7; |

Unfortunately, the sample operator only does regular sampling. PIG-1713 aims to add additional sampling algorithms, but Pig is not quite there yet.

UDF

Next I looked to write a custom UDF to make it work.

A = load 'input.txt'; B = foreach A generate BSAMPLE(*); |

Unfortunately, there is one important restriction of UDF: UDF always take one input, and generates one output. But there is a workaround: generate a bag, and then flatten it:

A = load '1.txt'; B = foreach A generate flatten(BSAMPLE(*)); |

This is doable, but ugly. So I decided to look at other alternatives.

Streaming with R

Streaming does not have the same restriction as UDF. It can take an arbitrary number of input tuples and produce an arbitrary number of output tuples. I also notice that in R, there is existing function to perform bootstrap sampling. We can simply use the Pig streaming feature and let R do the job for us. Here is the R script I used:

#!/usr/bin/Rscript x |

read.table reads all the map inputs and creates a matrix out of it in memory. sample() is the function do the sampling. “replace=T” means we sample with replacement. And we need to generate N samples, which is NROW(x). Then we write all the output tuples at once using write.table.

Here is the Pig script:

DEFINE mycmd `booststrap.R` ship('booststrap.R'); a = load 'input.txt' as (a0, a1); b = STREAM a THROUGH mycmd; dump b; |

The biggest problem of this approach is we need to hold all input data in memory, which is unrealistic in many cases in the world of big data. Another issue with this approach is that it requires R to be installed across all cluster nodes, which could present a problem in many cases.

Streaming with Python

The ideal solution is to determine the occurrence of a particular input on the fly, and emit the output 0, 1 or k times. Since N is very large, I would assume the occurrence of each input follows a poison distribution with λ=1. (correct me if I am wrong, I am not a statistician :))

Here is the code snip of the python script:

for line in sys.stdin: line=line.rstrip('\n') if len(cumulative)==0: initialize() r = random.random() n = bisect.bisect_left(cumulative, r) for j in range(n): print line |

Note that in this implementation, the number of resulting output tuples are only statistically equal to n. However, this is not an issue in our use case.

BootstrapSampleLoader

Pig does have two SampleLoaders: PoissonSampleLoader and RandomSampleLoader. Although neither is able to do resample, writing a LoadFunc to do bootstrap sample is possible. Building a LoadFunc makes the script cleaner and the performance is better than streaming. Here I write a BootstrapSampleLoader to extend PigStorage to do just that. The logic is similar to streaming:

public class BootstrapSampleLoader extends PigStorage { int remaining = 0; // The number of remaining tuples to generate List cumulative = new ArrayList(); // the cumulative distribution of poison distribution Tuple originalTuple; Random random = new Random(); @Override public Tuple getNext() throws IOException { if (remaining>0) { remaining--; return originalTuple; } if (cumulative.size()==0) initialize(); while (true) { originalTuple = super.getNext(); if (originalTuple==null) return null; double r = random.nextDouble(); remaining = Collections.binarySearch(cumulative, r); if (remaining0) break; } remaining--; return originalTuple; } } |

In summary, I explored different ways to do bootstrap sampling in Pig and found three viable solutions: streaming with R, streaming with Python and BootstrapSampleLoader. I hope you find this information helpful.

Daniel Dai

Categorized by :

Apache Hadoop Pig

code link is broken. Please update the same

Thanks,

Debajyoti

I should note that an SQL engine will do lazy/streaming evaluation on the design table, enumerating it on the fly while running the trials. So actually… I take back the last line of my post. The table isn’t “big” in the sense of memory footprint. Of course the actual bootstrap work is bigger, but if that’s what you want there’s no undue overhead to this implementation (beyond the pass to count N).

You can get this effect in Hadoop too, by properly coding up the enumeration of the “design table” as an on-demand thing.

You might consider using an m-out-of-n bootstrap, where the subsamples are much smaller than N. This is attractive for big data sets though you have to check the literature about choosing a suitable m (but then again you have to check the literature to decide if the n-bootstrap is suitable.) See http://www.stat.berkeley.edu/~bickel/BS2008SS.pdf

To implement it in Pig you can borrow the approach that Brian Dolan put into Section 5.4 of the MAD Skills paper (http://db.cs.berkeley.edu/jmh/papers/madskills-032009.pdf). The syntax there is SQL, but the translation to Pig should be easy.

To start you need to know the number of items N in your input. So in principle this is a 2-pass algorithm if you don’t have that information in advance. But unlike some of the schemes above it uses very little memory and makes no assumptions about the input distribution.

Suppose you want k subsamples each of size m<<N. Then pre-generate the "row-id's" of each subsample by choosing m random numbers between 1 and N, and doing that process k times. This gives you a smallish m*k table (Dolan calls it the "design" table) that tells you the row-id's in each sample. Then you simply write a mapper that tags each input data item with the subsamples it belongs to. Parition by subsample and run your Reduce on each to get the sampling distribution for your estimator, then do a subsequent Reduce to compute averages of the subsamples.

The SQL that Dolan gives is this (more or less):

CREATE VIEW design AS

SELECT DISTINCT a.trial_id, floor ($N * random()) AS row_id

FROM generate_series(1,$k) AS a (trial_id),

generate_series(1,$m) AS b (subsample_id);

CREATE VIEW trials AS

SELECT d.trial_id, AVG(a.values) AS avg_value

FROM design d, T

WHERE d.row_id = T.row_id

GROUP BY d.trial_id;

`SELECT AVG(avg_value), STDDEV(avg_value)`

FROM trials;

This works for the N-bootstrap too, but that design table becomes N*k integers big. That may be OK in many cases too, especially if you’re willing to write code to compress/decompress that table.