One of the best features of embedding is how it simplifies writing UDFs and using them right away in the same script without superfluous declarations. Computing a transitive closure is a good example of an algorithm requiring an iteration, a few simple UDFs and an end condition to decide when to stop iterating. Embedding is available in Pig 0.9. Knowledge of both Pig and Python is required to follow. Examples are available on github.
2. The problem Before diving into the solution I will give a brief overview of the problem. We want to find the connected components of a graph by performing a transitive closure. Of course I will assume that the graph does not fit in main memory otherwise what follows is not very useful.
A picture is worth a thousand words; here is a visual statement of the problem:
I highlighted the diameter in red as it plays an important role in the complexity of the algorithm. The diameter of a connected component is the longest shortest path between 2 nodes of the component.
The algorithm will require log2(max(diameter of a component)) iterations because every iteration we double the number of relations that have been followed. This is not the overall complexity but it ensures that the number of iterations will stay low as with just 10 iterations you can handle components with a diameter of 1000+. In most real life use cases the diameter is very low and a few iterations will suffice.
3. The algorithm I wrote the following implementation to illustrate the embedding feature. It may not be optimal but it is a working solution with a reasonable number of iterations. The algorithm does a graph traversal from all the nodes in parallel. It does one hop per iteration (pardon the scientific jargon) and then merges the results per node (using follow() and OR()). As a consequence the diameter of the graph followed from a node doubles every iteration. In this case, the edges of the graph are considered bidirectional and are stored one way (see normalize() and bidirectional()) to save space. The edges are stored in two files: followed (empty at the beginning) and to_follow which will tell us when all the edges have been followed (to_follow is empty). The last part of the script does some formatting to turn a list of edges into group of nodes (using sort() to make sure they are always represented the same way).
4. The code Here is an example of how to solve this problem by embedding Pig in Python.
SPLIT new_links INTO new_followed IF followed != 0, new_to_follow IF followed == 0;
new_followed = FOREACH new_followed GENERATE id1, id2;
new_to_follow = FOREACH new_to_follow GENERATE id1, id2;
STORE new_followed INTO ‘$new_followed’;
STORE new_to_follow INTO ‘$new_to_follow’;
to_follow = “data/tc_data_simple”
followed = “out/tc/followed_0”
# create empty dataset for first iteration
Pig.fs(“mkdir “ + followed)
Pig.fs(“touchz “ + followed + “/part-m-00000”)
for i inrange(10):
new_to_follow = “out/tc/to_follow_” + str(i + 1)
new_followed = “out/tc/followed_” + str(i + 1)
job = Q.bind().runSingle() ifnot job.isSuccessful(): raise‘failed’
to_follow = new_to_follow
followed = new_followed # detect if we are done ifnot job.result(“new_to_follow”).iterator().hasNext(): break
links = LOAD ‘$followed’ AS (id1: chararray, id2: chararray);
links = FOREACH links GENERATE FLATTEN( bidirectional(id1, id2) );
result = DISTINCT (
FOREACH (GROUP links by id1)
STORE result INTO ‘out/tc/groups’;
5. Notables There are a few things to notice in this implementation:
UDFs are just defined by functions in the same script
The output schema of the UDF is defined by using the @outputSchema decorator. Pig needs the schema of the output to interpret the statements following the UDF call.
The native Python structures (dictionary, tuple, list) are used, the conversion to Pig types (Map, tuple, bag) is automatic. This makes the UDF code much more compact.
UDFs are directly available in embedded Pig scripts using the function names. No other declaration is required.
We iterate a maximum of 10 times (max diameter = 210 = 1024) and check if the “to follow” relation is empty to decide if we need to stop the iteration. This is done using the JobStats object returned by runSingle().
Part 4: SAM's Stream Builder - Building Complex Stream Analytics Apps without Code
This blog was co-authored by: George Vetticaden, Sriharsha Chintalapani, Jungtaek Lim, Sanket Shah Last week, in Part 3 of this blog series, we announced the GA of HDF 3.0 and let the cat out of the bag by introducing the new open source component called Streaming Analytics Manager (SAM), an exciting new technology that helps developers,…
Walgreens Exercises Hadoop to Bring Healthy Data to the Enterprise
The decision-making process for a customer to buy products in the retail space can range from days to seconds. The spontaneous buying patterns among consumers creates a business challenge for retailers to address their data needs just as quickly, otherwise customers will go elsewhere. When you combine a full pharmacy to the needs of a…
Part 6 of Data Lake 3.0: A Self-Diagnosing Data Lake
Thank you for reading our Data Lake 3.0 series! We are encouraged by the positive responses to our blogs (part 1, part 2, part 3, part 4, part 5). In Data Lake 3.0, we are envisioning a large data lake shared between multiple tenants and dockerized applications (ranging from real-time to batch). In a shared…
Run Apache Spark 2.1 & Apache Zeppelin in Hortonworks Data Cloud
Apache Spark 2.1 Improves in Structured Streaming and Machine Learning. Structured Streaming: Kafka .10 support, Metrics & Stability improvements Machine Learning: SparkR Improvements including new ML algorithms for LDA, Random forests, GMM, etc. The recent release of Hortonworks Data Platform 2.6 (“HDP 2.6”) includes Apache Spark 2.1. And Hortonworks Data Cloud (“HDCloud”) for AWS gives…
Hive / Druid integration means Druid is BI-ready from your tool of choice This is Part 3 of a Three-Part series of doing ultra fast OLAP Analytics with Apache Hive and Druid. Connect Tableau to Druid Previously we talked about how the Hive/Druid integration delivers screaming-fast analytics, but there is another, even more powerful benefit to…
What’s New for Apache Spark & Apache Zeppelin in HDP 2.6?
The value of any data is proportional to the insights derived from it. With the Data Lake Architecture, all of the enterprise data is made available in one place. The key to driving insights from the Data Lake is Apache Spark & Apache Zeppelin. Both are key tools to drive Predictive Analytics and Machine Learning.…
Simon Meredith, Chief Technology Officer - CSI, IBM Europe explains the significance of IBM & Hortonworks working together in the era of Big Data What is fuelling IBM’s commitment to Apache Hadoop and Spark? The pressures of day to day business are delaying companies doing more with their data. IBM’s commitment is to initiate, simplify…
Destination Autonomous The march towards autonomous vehicles continues to accelerate. While expert opinion differs on the specific timing and use cases that will emerge first, few deny that self-driving cars are in our future. Not surprisingly, when reviewing Big Data strategies with my automotive clients, discussions on data management strategies for autonomous driving research inevitably…
Introducing Row/ Column Level Access Control for Apache Spark
The latest version of Hortonworks Data Platform (HDP) introduced a number of significant enhancements for our customers. For instance, HDP 2.6.0 now supports both Apache Spark™ 2.1 and Apache Hive™ 2.1 (LLAP™) as GA. Often customers store their data in Hive and analyze that data using both Hive and SparkSQL. An important requirement in this scenario…
Apache, Hadoop, Falcon, Atlas, Tez, Sqoop, Flume, Kafka, Pig, Hive, HBase, Accumulo, Storm, Solr, Spark, Ranger, Knox, Ambari, ZooKeeper, Oozie, Phoenix, NiFi, HAWQ, Zeppelin, Atlas, Slider, Mahout, MapReduce, HDFS, YARN, Metron and the Hadoop elephant and Apache project logos are either registered trademarks or trademarks of the Apache Software Foundation in the United States or other countries.