Community Blog: PageRank Implementation in Pig

In this post I’m going to give a very simple example of how to use Pig; embedded in Python to implement the PageRank; algorithm. It goes in a little more details on the same example given in the presentation I gave at the Pig user meetup. On the same topic, Daniel published a nice K-Means implementation using the same embedding feature. This was originally published on my blog; I’m re-posting it here on request from the fine people at Hortonworks.

1. What is Pig embedded

Pig 0.9 supports running Python scripts with Pig statements embedded. This is simply run using the pig command:

pig myscript.py

The implementation is using Jython to execute the Python code, not CPython. As a consequence this is Python 2.5 (not 3.0) and you can not use the native extensions. On the other hand you can use the java based extensions and the java APIs (for example: java.util.regex.Matcher).
I find embedding especially useful for simplifying the implementation of iterative algorithms. It makes a difference in the following areas:

  • Loop and exit criteria
  • User Defined Functions
  • Parameter passing

In this post I will not cover UDFs just yet. stay tuned for more.

2. A very simple example: The PageRank implementation

Input format:

www.A.com     1     { (www.B.com), (www.C.com), (www.D.com), (www.E.com) }
www.B.com     1     { (www.D.com), (www.E.com) }
www.C.com     1     { (www.D.com) }
www.D.com     1     { (www.B.com) }
www.E.com     1     { (www.A.com) }
www.F.com     1     { (www.B.com), (www.C.com) }

 

#!/usr/bin/python
from org.apache.pig.scripting import *P = Pig.compile(“””
— PR(A) = (1-d) + d (PR(T1)/C(T1) + … + PR(Tn)/C(Tn))
previous_pagerank =
LOAD ‘$docs_in’
USING PigStorage(‘\t‘)
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );

outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;

new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links;

STORE new_pagerank
INTO ‘$docs_out’
USING PigStorage(‘\t‘);
“””)

params = { ‘d’: ‘0.5’, ‘docs_in’: ‘data/pagerank_data_simple’ }

for i in range(10):
out = “out/pagerank_data_” + str(i + 1)
params["docs_out"] = out
Pig.fs(“rmr “ + out)
stats = P.bind(params).runSingle()
if not stats.isSuccessful():
raise ‘failed’
params["docs_in"] = out

 

Things to notice here: We first “compile()” the pig script, then pass parameters using bind(params) and runSingle() for each iteration. The output of each iteration becomes the input of the previous one.

3. Application

Download from http://downloads.dbpedia.org/3.6/en/page_links_en.nt.bz2

Convert to the expected input format:

A = LOAD 'page_links_en.nt.bz2' using PigStorage(' ')
   as (url:chararray, p:chararray, link:chararray);
B = GROUP A by url;
C = foreach B generate group as url, 1 as pagerank, A.link as links;
STORE C into 'input';

execute the command line:

pig -Dpython.cachedir=/home/{myuser}/tmp pagerank.py

Get Max and Min from the resulting dataset:

A = LOAD 'pagerank_data_10'
   AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
B= GROUP A ALL;
C = FOREACH B generate FLATTEN(TOBAG(MAX(A.pagerank), MIN(A.pagerank))) as pagerank;
D = JOIN C BY pagerank, A BY pagerank;
E = FOREACH D GENERATE C::pagerank, A::url;
DUMP E;

And the winner is … the United States! (according to en.wikipedia.org)

(6890.17,<http://dbpedia.org/resource/United_States>)
(0.50003946,<http://dbpedia.org/resource/Ficus_macrophylla_f._columnaris>)
(0.50003946,<http://dbpedia.org/resource/Melaleuca_trichostachya>)
(0.50003946,<http://dbpedia.org/resource/Anetholea_anisata>)
(0.50003946,<http://dbpedia.org/resource/Fieldia_australis>)
(0.50003946,<http://dbpedia.org/resource/Coprosma_nitida>)
(0.50003946,<http://dbpedia.org/resource/Heritiera_actinophylla>)
(0.50003946,<http://dbpedia.org/resource/Calochilus_robertsonii>)
(0.50003946,<http://dbpedia.org/resource/Euphorbia_eremophila>)
(0.50003946,<http://dbpedia.org/resource/Macrotyloma_uniflorum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Phebalium_squamulosum_subsp._squamulosum>)
(0.50003946,<http://dbpedia.org/resource/Cryptocarya_nova-anglica>)
(0.50003946,<http://dbpedia.org/resource/Pellaea_nana>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_nivea>)
(0.50003946,<http://dbpedia.org/resource/Poa_meionectes>)
(0.50003946,<http://dbpedia.org/resource/Akania_bidwillii>)
(0.50003946,<http://dbpedia.org/resource/Anthocarapa_nitidula>)
(0.50003946,<http://dbpedia.org/resource/Jasminum_volubile>)
(0.50003946,<http://dbpedia.org/resource/Seringia_arborescens>)
(0.50003946,<http://dbpedia.org/resource/Grammitis_billardierei>)
(0.50003946,<http://dbpedia.org/resource/Utricularia_monanthos>)
(0.50003946,<http://dbpedia.org/resource/Acacia_mitchellii>)
(0.50003946,<http://dbpedia.org/resource/Halosarcia_halocnemoides>)
(0.50003946,<http://dbpedia.org/resource/Calomeria_amaranthoides>)
(0.50003946,<http://dbpedia.org/resource/Tripladenia_cunninghamii>)
(0.50003946,<http://dbpedia.org/resource/Gaultheria_appressa>)
(0.50003946,<http://dbpedia.org/resource/Arytera_distylis>)
(0.50003946,<http://dbpedia.org/resource/Premna_lignum-vitae>)
(0.50003946,<http://dbpedia.org/resource/Drosera_burmanni>)
(0.50003946,<http://dbpedia.org/resource/Derwentia_derwentiana>)
(0.50003946,<http://dbpedia.org/resource/Atalaya_multiflora>)
(0.50003946,<http://dbpedia.org/resource/Callitris_gracilis>)
(0.50003946,<http://dbpedia.org/resource/Salix_x_sepulcralis>)

;

4. A little more interesting: Stop iterating when a threshold is reached

#!/usr/bin/python
from org.apache.pig.scripting import *P = Pig.compile(“””
previous_pagerank =
LOAD ‘$docs_in’
AS ( url: chararray, pagerank: float, links:{ link: ( url: chararray ) } );
outbound_pagerank =
FOREACH previous_pagerank
GENERATE
pagerank / COUNT ( links ) AS pagerank,
FLATTEN ( links ) AS to_url;

new_pagerank =
FOREACH
( COGROUP outbound_pagerank BY to_url, previous_pagerank BY url INNER )
GENERATE
group AS url,
( 1 – $d ) + $d * SUM ( outbound_pagerank.pagerank ) AS pagerank,
FLATTEN ( previous_pagerank.links ) AS links,
FLATTEN ( previous_pagerank.pagerank ) AS previous_pagerank;

pagerank_diff = FOREACH new_pagerank GENERATE ABS ( previous_pagerank – pagerank );

max_diff =
FOREACH
( GROUP pagerank_diff ALL )
GENERATE
MAX ( pagerank_diff );

STORE new_pagerank
INTO ‘$docs_out';

STORE max_diff
INTO ‘$max_diff';

“””)

d = 0.5
docs_in= “data/simple”

for i in range(10):
docs_out = “out/pagerank_data_” + str(i + 1)
max_diff = “out/max_diff_” + str(i + 1)
Pig.fs(“rmr “ + docs_out)
Pig.fs(“rmr “ + max_diff)
stats = P.bind().runSingle()
if not stats.isSuccessful():
raise ‘failed’
max_diff_value = float(str(stats.result(“max_diff”).iterator().next().get(0)))
print ” max_diff_value = “ + str(max_diff_value)
if max_diff_value < 0.01:
print “done at iteration “ + str(i)
break
docs_in = docs_out

Main differences:

  • We compute an extra maxdiff relation that contains a single tuple.
  • Variables can also be bound using the current scope when using the parameterless bind()
  • We can easily check if we reached the expected threshold using the JobStat object returned by runSingle and stop iterating if needed.

Next…
The examples are available on github.
Next time I’ll go in the details of a transitive closure implementation using UDFs.

Categorized by :
Apache Hadoop Pig

Comments

|
December 18, 2012 at 5:46 am
|

Community Blog: PageRank Implementation in Pig | Hortonworks, ended up being
a fantastic title to give this particular article.
Exactly where can I read more concerning this?

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Join the Webinar!

Discover HDP 2.2: Apache Falcon for Hadoop Data Governance
Thursday, November 6, 2014
1:00 PM Eastern / 12:00 PM Central / 11:00 AM Mountain / 10:00 AM Pacific

More Webinars »

HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.
Explore Technology Partners
Hortonworks nurtures an extensive ecosystem of technology partners, from enterprise platform vendors to specialized solutions and systems integrators.