One of the best features of embedding is how it simplifies writing UDFs and using them right away in the same script without superfluous declarations. Computing a transitive closure is a good example of an algorithm requiring an iteration, a few simple UDFs and an end condition to decide when to stop iterating. Embedding is available in Pig 0.9. Knowledge of both Pig and Python is required to follow. Examples are available on github.
2. The problem Before diving into the solution I will give a brief overview of the problem. We want to find the connected components of a graph by performing a transitive closure. Of course I will assume that the graph does not fit in main memory otherwise what follows is not very useful.
A picture is worth a thousand words; here is a visual statement of the problem:
I highlighted the diameter in red as it plays an important role in the complexity of the algorithm. The diameter of a connected component is the longest shortest path between 2 nodes of the component.
The algorithm will require log2(max(diameter of a component)) iterations because every iteration we double the number of relations that have been followed. This is not the overall complexity but it ensures that the number of iterations will stay low as with just 10 iterations you can handle components with a diameter of 1000+. In most real life use cases the diameter is very low and a few iterations will suffice.
3. The algorithm I wrote the following implementation to illustrate the embedding feature. It may not be optimal but it is a working solution with a reasonable number of iterations. The algorithm does a graph traversal from all the nodes in parallel. It does one hop per iteration (pardon the scientific jargon) and then merges the results per node (using follow() and OR()). As a consequence the diameter of the graph followed from a node doubles every iteration. In this case, the edges of the graph are considered bidirectional and are stored one way (see normalize() and bidirectional()) to save space. The edges are stored in two files: followed (empty at the beginning) and to_follow which will tell us when all the edges have been followed (to_follow is empty). The last part of the script does some formatting to turn a list of edges into group of nodes (using sort() to make sure they are always represented the same way).
4. The code Here is an example of how to solve this problem by embedding Pig in Python.
SPLIT new_links INTO new_followed IF followed != 0, new_to_follow IF followed == 0;
new_followed = FOREACH new_followed GENERATE id1, id2;
new_to_follow = FOREACH new_to_follow GENERATE id1, id2;
STORE new_followed INTO ‘$new_followed’;
STORE new_to_follow INTO ‘$new_to_follow’;
to_follow = “data/tc_data_simple”
followed = “out/tc/followed_0”
# create empty dataset for first iteration
Pig.fs(“mkdir “ + followed)
Pig.fs(“touchz “ + followed + “/part-m-00000”)
for i inrange(10):
new_to_follow = “out/tc/to_follow_” + str(i + 1)
new_followed = “out/tc/followed_” + str(i + 1)
job = Q.bind().runSingle() ifnot job.isSuccessful(): raise‘failed’
to_follow = new_to_follow
followed = new_followed # detect if we are done ifnot job.result(“new_to_follow”).iterator().hasNext(): break
links = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
links = FOREACH links GENERATE FLATTEN( bidirectional(id1, id2) );
result = DISTINCT (
FOREACH (GROUP links by id1)
STORE result INTO ‘out/tc/groups’;
5. Notables There are a few things to notice in this implementation:
UDFs are just defined by functions in the same script
The output schema of the UDF is defined by using the @outputSchema decorator. Pig needs the schema of the output to interpret the statements following the UDF call.
The native Python structures (dictionary, tuple, list) are used, the conversion to Pig types (Map, tuple, bag) is automatic. This makes the UDF code much more compact.
UDFs are directly available in embedded Pig scripts using the function names. No other declaration is required.
We iterate a maximum of 10 times (max diameter = 210 = 1024) and check if the “to follow” relation is empty to decide if we need to stop the iteration. This is done using the JobStats object returned by runSingle().
Livy: A REST Interface for Apache Spark
Apache Spark is a powerful framework for data processing and analysis. Spark provides two modes for data exploration: Interactive: provided by spark-shell, pySpark, and SparkR REPLs Batch: using spark-submit to submit a Spark application to cluster without interaction in the middle of run-time. While these two modes look different on the surface, deep down they…
Five Missteps to Avoid on your First Big Data Journey
You have heard about Big Data for a long time, and how companies that use Big Data as part of their business decision making process experience significantly higher profitability than their competition. Now that your company is ready to embark on its first Apache Hadoop® journey there are important lessons to be learned. Read on…
Hive View 2.0 is New in Apache Ambari 2.5 Ambari’s Hive View gives analysts and DBAs a convenient web interface to Apache Hive which allows SQL analytics, data management and performance diagnostics. Ambari 2.5 introduces Hive View 2.0 with a brand new user experience plus a slew of great new tools to help DBAs run…
In 2016, we published the second version v1.0.1 of Spark HBase Connector (SHC). In this blog, we will go through the major features we have implemented this year. Support Phoenix coder SHC can be used to write data out to HBase cluster for further downstream processing. It supports Avro serialization for input and output data…
Major Japanese Telco Improves Efficiency, Secures Data
You may have read yesterday’s blog post that summarizes how Yahoo! Japan, the largest web portal in Japan, scaled its business analytics with access to over 75 petabytes of data in Hortonworks Data Platform (HDP). You can read the full English translation of that Japanese case study here. This post is about SoftBank Corp, another…
Four Trends in Artificial Intelligence That Affect Enterprises
Andrew Ng, the renowned data scientist, has said that artificial intelligence (AI) needs to be a company-wide strategic decision. Companies that don't strategically invest in AI will slowly lose market share to companies whose core businesses are built around AI. AI enables the prediction, planning and automation of a variety of tasks, and for enterprises,…
Integrate SparkR and R for Better Data Science Workflow
R is one of the primary programming languages for data science with more than 10,000 packages. R is an open source software that is widely taught in colleges and universities as part of statistics and computer science curriculum. R uses data frame as the API which makes data manipulation convenient. R has powerful visualization infrastructure,…
Applied Healthcare Informatics: A Healthcare Data Ecosystem Constructed on HDP and Utilizing HDF
This is a guest blog post by Charles Boicey, Chief Innovation Officer at Clearsense. Clearsense was born out of a passion for helping healthcare organizations realize the promise of their data and its ability to help them make better, faster clinical decisions—to meet the challenges of value-based care, drive research, improve patient care, and ultimately…
Securing Our Cars In A Connected World - Automotive Cyber Security Summit 2017
This week I attended the 2017 Automotive Cyber Security Summit in Detroit with my colleague Mike Schiebel (General Manager, Cyber Security, Hortonworks). Together, we were speakers in a session entitled “Securing the Connected Car in a Connected World”. Here are highlights of what we presented: How Did We Get Here? A Historical Perspective As the…
Apache, Hadoop, Falcon, Atlas, Tez, Sqoop, Flume, Kafka, Pig, Hive, HBase, Accumulo, Storm, Solr, Spark, Ranger, Knox, Ambari, ZooKeeper, Oozie, Metron and the Hadoop elephant and Apache project logos are either registered trademarks or trademarks of the Apache Software Foundation in the United States or other countries.