Get Started


Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
August 16, 2012
prev slideNext slide

Pig as Hadoop Connector, Part One: Pig, MongoDB and Node.js

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems, to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use are available at https://github.com/rjurney/enron-node-mongo and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part two of this series on HBase and JRuby is available here: http://hortonworks.com/blog/pig-as-hadoop-connector-part-two-hbase-jruby-and-sinatra/.


In this post we’ll be using Hadoop, Pig, mongo-hadoop, MongoDB and Node.js to turn Avro records into a web service. We do so to illustrate Pig’s ability to act as glue between distributed systems, and to show how easy it is to publish data from Hadoop to the web.

Pig and Avro

Pig’s Avro support is solid in Pig 0.10.0. To use AvroStorage, we need only load piggbank.jar, and the jars for avro and json-simple. A shortcut to AvroStorage is convenient as well. Note that all paths are relative to your Pig install path. We load Avro support into Pig like so:

/* Load Avro jars and define shortcut */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage(); /* Shortcut */

MongoDB’s Java Driver

To connect to MongoDB, we’ll need the MongoDB Java Driver. You can download it here: https://github.com/mongodb/mongo-java-driver/downloads. We’ll load this jar in our Pig script.


The mongo-hadoop project provides integration between MongoDB and Hadoop. You can download the latest version at https://github.com/mongodb/mongo-hadoop/downloads. Once you download and unzip the project, you’ll need to build it with sbt.

./sbt package

This will produce the following jars:

$ find .|grep jar

We load these MongoDB libraries in Pig like so:

/* MongoDB libraries and configuration */
register /me/mongo-hadoop/mongo-2.7.3.jar /* MongoDB Java Driver */
register /me/mongo-hadoop/core/target/mongo-hadoop-core-1.1.0-SNAPSHOT.jar
register /me/mongo-hadoop/pig/target/mongo-hadoop-pig-1.1.0-SNAPSHOT.jar

/* Set speculative execution off so we don't have the chance of duplicate records in Mongo */
set mapred.map.tasks.speculative.execution false
set mapred.reduce.tasks.speculative.execution false
define MongoStorage com.mongodb.hadoop.pig.MongoStorage(); /* Shortcut */
set default_parallel 5 /* By default, lets have 5 reducers */

Writing to MongoDB

Loading Avro data and storing records to MongoDB are one-liners in Pig.

avros = load 'enron.avro' using AvroStorage();
store avros into 'mongodb://localhost/enron.emails' using MongoStorage();

From Avro to Mongo in One Line

I’ve automated loading Avros and storing them to MongoDB in the script at https://github.com/rjurney/enron-node-mongo/blob/master/avro_to_mongo.pig, using Pig’s parameter substitution:

avros = load '$avros' using AvroStorage();
store avros into '$mongourl' using MongoStorage();

We can then call our script like this, and it will load our Avros to Mongo:

pig -l /tmp -x local -v -w -param avros=enron.avro 
   -param mongourl='mongodb://localhost/enron.emails' avro_to_mongo.pig

We can verify our data is in MongoDB like so:

$ mongo enron

MongoDB shell version: 2.0.2
connecting to: enron

> show collections

> db.emails.findOne({message_id: "%3C3607504.1075843446517.JavaMail.evans@thyme%3E"})
	"_id" : ObjectId("502b4ae703643a6a49c8d180"),
	"message_id" : "",
	"date" : "2001-04-25T12:35:00.000Z",
	"from" : {
		"address" : "jeff.dasovich@enron.com",
		"name" : "Jeff Dasovich"
	"subject" : null,
	"body" : "Breathitt's hanging tough, siding w/Hebert, standing for markets.  Jeff",
	"tos" : [
			"address" : "7409949@skytel.com",
			"name" : null
	"ccs" : [ ],
	"bccs" : [ ]

To the Web with Node.js

We’ve come this far, so we may as well publish our data on the web via a simple web service. Lets use Node.js to fetch a record from MongoDB by message ID, and then return it as JSON. To do this, we’ll use Node’s mongodb package. Installation instructions are available in our github project.

Our node application is simple enough. We listen for an http request on port 1337, and use the messageId parameter to query an email by message id.

// Dependencies
var mongodb = require("mongodb"),
    http = require('http'),
    url = require('url');

// Set up Mongo
var Db = mongodb.Db,
    Server = mongodb.Server;

// Connect to the MongoDB 'enron' database and its 'emails' collection
var db = new Db("enron", new Server("", 27017, {}));
db.open(function(err, n_db) { db = n_db });
var collection = db.collection("emails");

// Setup a simple API server returning JSON
http.createServer(function (req, res) {
  var inUrl = url.parse(req.url, true);
  var messageId = inUrl.query.messageId;

  // Given a message ID, find one record that matches in MongoDB
  collection.findOne({message_id: messageId}, function(err, item) {
    // Return 404 on error
    if(err) {
      console.log("Error:" + err);
    // Return 200/json on success
    if(item) {
      res.writeHead(200, {'Content-Type': 'application/json'});

}).listen(1337, '');

console.log('Server running at');

Navigating to http://localhost:1337/?messageId=%3C3607504.1075843446517.JavaMail.evans@thyme%3E returns an enron email as JSON:

We’ll leave the CSS as an exercise for your web developer, or you might try Bootstrap if you don’t have one.


The Hadoop Filesystem serves as a dumping ground for aggregating events. Apache Pig is a scripting interface to Hadoop MapReduce. We can manipulate and mine data on Hadoop, and when we’re ready to publish it to an application we use mongo-hadoop to store our records in MongoDB. From there, creating a web service is a few lines of javascript with Node.js – or your favorite web framework.

MongoDB is a popular NoSQL database for web applications. Using Hadoop and Pig we can aggregate and process logs at scale and publish new data-driven features back to MongoDB – or whatever our favorite database is.

Note: we should ensure that there is sufficient I/O between our Hadoop cluster and our MongoDB cluster, lest we overload Mongo with writes from Hadoop. Be careful out there! I have however verified that writing from an Elastic MapReduce Hadoop cluster to a replicated MongoHQ cluster (on Amazon EC2) works well.

About the Author

Russell Jurney is a data scientist and the author of the book Agile Data (O’Reilly, Dec 2012), which teaches a flexible toolset and methodology for building effective analytics applications using Apache Hadoop and cloud computing.



  • Hi,

    I ran the code but while running the pig script, I get following exception (which possibly is due to incompatible versions of mongodb and hadoop/pig): Can you please specify the minimum version required for the pig-mongo interaction

    Pig Stack Trace
    ERROR 2998: Unhandled internal error. Found class org.apache.hadoop.mapreduce.JobContext, but interface was expected

    java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.JobContext, but interface was expected
    at com.mongodb.hadoop.MongoOutputFormat.checkOutputSpecs(MongoOutputFormat.java:43)
    at org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:80)
    at org.apache.pig.newplan.logical.relational.LOStore.accept(LOStore.java:66)
    at org.apache.pig.newplan.DepthFirstWalker.depthFirst(DepthFirstWalker.java:64)
    at org.apache.pig.newplan.DepthFirstWalker.depthFirst(DepthFirstWalker.java:66)
    at org.apache.pig.newplan.DepthFirstWalker.depthFirst(DepthFirstWalker.java:66)
    at org.apache.pig.newplan.DepthFirstWalker.depthFirst(DepthFirstWalker.java:66)
    at org.apache.pig.newplan.DepthFirstWalker.walk(DepthFirstWalker.java:53)
    at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)

  • Leave a Reply

    Your email address will not be published. Required fields are marked *

    If you have specific technical questions, please post them in the Forums

    You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>