Pig as Hadoop Connector, Part Two: HBase, JRuby and Sinatra

Series Introduction

Apache Pig is a dataflow oriented, scripting interface to Hadoop. Pig enables you to manipulate data as tuples in simple pipelines without thinking about the complexities of MapReduce.

But Pig is more than that. Pig has emerged as the ‘duct tape’ of Big Data, enabling you to send data between distributed systems in a few lines of code. In this series, we’re going to show you how to use Hadoop and Pig to connect different distributed systems to enable you to process data from wherever and to wherever you like.

Working code for this post as well as setup instructions for the tools we use are available at https://github.com/rjurney/enron-jruby-sinatra-hbase-pig and you can download the Enron emails we use in the example in Avro format at http://s3.amazonaws.com/rjurney.public/enron.avro. You can run our example Pig scripts in local mode (without Hadoop) with the -x local flag: pig -x local. This enables new Hadoop users to try out Pig without a Hadoop cluster.

Part one of this series on MongoDB is available here: http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/.

Introduction

Hadoop is about freedom as much as scale: providing you disk spindles and processor cores together to process your data with whatever tool you choose. Unleash your creativity. Pig as duct tape facilitates this freedom, enabling you to connect distributed systems at scale in minutes, not hours. In this post we’ll demonstrate how you can turn raw data into a web service using Hadoop, Pig, HBase, JRuby and Sinatra. In doing so we will demonstrate yet another way to use Pig as connector to publish data you’ve processed on Hadoop.

Apache HBase is the Hadoop database. It has emerged as the dominant database used with Hadoop, supporting billions of rows and millions of columns. HBase is a highly available, fast column store, supporting realtime workloads while providing easy access to your data to and from Hadoop via scans. A really great introduction to HBase’s data model is here: http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable.

JRuby is Ruby implemented in Java. JRuby has emerged as an invaluable tool in helping enterprises with aging Java infrastructures bring new life into their codebase by wrapping complex Java with simple Ruby to provide more productive interfaces for application developers. Using JRuby in this example also helps us to learn the HBase Java APIs, as we’ll be calling them directly from JRuby. We’ll be using JRuby to serve data from HBase as a web service, with a lightweight, ‘no frills’ web framework called Sinatra.

Booting HBase

An excellent HBase quickstart tutorial is available here: http://hbase.apache.org/book/quickstart.html. For more detailed information, check out the HBase Reference Guide – a full blown book on Apache HBase. We’ll be booting HBase in local mode for testing. HBase runs on top of Hadoop and Zookeeper in production, but local mode takes care of that for us for experimentation.

We’ll be using the latest stable version of HBase, version 0.94.1.

$ wget http://archive.apache.org/dist/hbase/hbase-0.94.1/hbase-0.94.1.tar.gz
$ tar -xvzf hbase-0.94.1.tar.gz
$ sudo mkdir /var/hbase

Now edit hbase-0.94.1/conf/hbase-site.xml to include our hbase working directory:

<property>
  <name>hbase.rootdir</name>
  <value>file:///var/hbase</value>
</property>

Launch HBase in local mode:

$ cd hbase-0.94.1
$ bin/start-hbase.sh

Now checkout the “Shell Exercises” section of the HBase Book: http://hbase.apache.org/book/quickstart.html#shell_exercises. Lets create a new table for testing. Start the HBase shell (really JIRB under the covers). The help command is there to guide us.

$ bin/hbase shell
...
1.8.7-p352 :002 > help

Lets create our first HBase table called ‘enron’ with a single column family called ‘email’. We might make another column family later for an organizational chart or extracted entities named ‘people.’ Column families are groups of columns.

create 'enron', 'email'
0 row(s) in 1.7900 seconds

Verify that its there with list.

list 'enron'
TABLE                                                                                                                                
enron                                                                                                                                
1 row(s) in 0.0690 seconds

We can put, get and scan records easily. The beauty of HBase is that we can update records in realtime from our application, and then scan them in batch using Hadoop without worrying about stale data.

> put 'enron', 'row1', 'email:address', 'bob@enron.com'
0 row(s) in 0.0190 seconds
> put 'enron', 'row2', 'email:address', 'stevo@enron.com'
0 row(s) in 0.0190 seconds
 
> get 'enron', 'row2'
COLUMN                             CELL                                                                                              
 email:address                     timestamp=1345691920800, value=stevo@enron.com                                                    
1 row(s) in 0.0110 seconds
 
> scan 'enron'
ROW                                COLUMN+CELL                                                                                       
 row1                              column=email:address, timestamp=1345691847565, value=bob@enron.com                                
 row2                              column=email:address, timestamp=1345691920800, value=stevo@enron.com                              
2 row(s) in 0.1190 seconds

Note that HBase doesn’t care what kind of data we store into it, and it returns a timestamp. HBase can hold a history of values for each cell, and we can even use this feature in our applications to store historical data!

Storing Data in HBase with Pig

Pig supports HBase via HBaseStorage. An excellent guide is here, and a here is a good presentation on Pig and HBase at Twitter from 2010.

We need to tell Pig where to find HBase via the HBASE_HOME environment variable.

$ echo 'export HBASE_HOME=/me/hbase-0.94.1' >> ~/.bash_profile
source ~/.bash_profile

We also need to replace the HBase jar distributed with Pig with 0.94.1.

$ rm /me/pig/build/ivy/lib/Pig/hbase-0.90.0.jar
$ cp target/hbase-0.94.1.jar /me/pig/build/ivy/lib/Pig/

Now we can load records in Avro, process them and store them in HBase.

/* Load Avro jars and define shortcut */
register /me/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
 
/* HBaseStorage libraries */
register /me/pig/build/ivy/lib/Pig/hbase-0.94.1.jar
register /me/pig/build/ivy/lib/Pig/zookeeper-3.3.3.jar
register /me/pig/build/ivy/lib/Pig/guava-11.0.jar
 
/* Load JRuby to validat emails and creat UUIDs for HBase rowIds */
register 'udf.rb' using jruby as udfs;
 
emails = load '/me/tmp/enron.avro' using AvroStorage();
/* Project all unique combinations of from/to for each email to more than one person */
from_to = foreach emails generate from.address as from_address, FLATTEN(tos.(address)) as to_address;
 
/* Group by email from/to pairs and count emails between those addresses.
   Also, generate a UUID for storing rows in HBase. */
by_pair = group from_to by (from_address, to_address);
sent_counts = foreach by_pair generate udfs.uuid() as id, 
                                       FLATTEN(group) as (from_address, to_address), 
                                       COUNT_STAR(from_to) as total_sent;
 
/* Store to the HBase table 'enron' using a UUID as row key with the loadKey option. */
store sent_counts into 'enron' using 
     org.apache.pig.backend.hadoop.hbase.HBaseStorage('address.pairs:from_address address.pairs:to_address address.pairs:total_sent', 'loadKey true');

ILLUSTRATE shows us our dataflow:

-----------------------------------------------------------------------
| from_to     | from_address:chararray     | to_address:chararray     | 
-----------------------------------------------------------------------
|             | jane.mcbride@enron.com     | tana.jones@enron.com     | 
|             | jane.mcbride@enron.com     | tana.jones@enron.com     | 
-----------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| by_pair     | group:tuple(from_address:chararray,to_address:chararray)             | from_to:bag{:tuple(from_address:chararray,to_address:chararray)}                                 | 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|             | (jane.mcbride@enron.com, tana.jones@enron.com)                       | {(jane.mcbride@enron.com, tana.jones@enron.com), (jane.mcbride@enron.com, tana.jones@enron.com)} | 
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------
| sent_counts     | id:chararray                         | from_address:chararray     | to_address:chararray     | total_sent:long     | 
----------------------------------------------------------------------------------------------------------------------------------------
|                 | 8d0434e4-b50d-47ac-8278-fc1bb16ad8e4 | jane.mcbride@enron.com     | tana.jones@enron.com     | 2                   | 
----------------------------------------------------------------------------------------------------------------------------------------

Loading Data from HBase with Pig

Loading data from HBase in Pig is easy, and you can pick individual columns or filter the data. Lets look at the top 100 most prolific email relationships, which could be a measure of how strong a relationship is.

/* HBaseStorage shortcut */
register /me/pig/build/ivy/lib/Pig/hbase-0.94.1.jar
register /me/pig/build/ivy/lib/Pig/zookeeper-3.3.3.jar
register /me/pig/build/ivy/lib/Pig/guava-11.0.jar
 
/* Grab the top 100 most prolific email relationships from HBase and dump them. */
address_pairs = LOAD 'hbase://enron4' using 
  org.apache.pig.backend.hadoop.hbase.HBaseStorage('address.pairs:from_address address.pairs:to_address address.pairs:total_sent')
  as (from_address:chararray, to_address:chararray, total_sent:long);
sorted = order address_pairs by total_sent DESC;
top_100 = limit address_pairs 100;
dump top_100
(pete.davis@enron.com,pete.davis@enron.com,4489)
(vince.kaminski@enron.com,vkaminski@aol.com,1143)
(jeff.dasovich@enron.com,susan.mara@enron.com,935)
(jeff.dasovich@enron.com,paul.kaufman@enron.com,879)
...

Pig UDFs in JRuby

Checkout the id:chararray field in the above example. We created that with a JRuby UDF.

Pig added JRuby UDFs in version 0.10.0. Writing UDFs in JRuby is much simpler than in Java. Our UDF class in udf.rb looks like this:

require 'pigudf'
require 'lib/data_utils'
 
# Refer to our Utils class to share JRuby code between Pig and Sinatra
class Udfs < PigUdf  
  outputSchema "uuid:chararray"
  def uuid()
    DataUtils.uuid()
  end
end

Notice how we employ an external utility class, which in turn calls Java’s java.util.UUID.randomUUID().toString() method. We put our code in a utility class called DataUtils so that it might be shared with other applications, like our Sinatra web app. Code sharing between Hadoop and other systems using JRuby is efficient.

Our utility class looks like this:

# Magic line
require 'java'
 
import java.util.UUID
 
class DataUtils
  # Create Unique IDs - code adapted from https://github.com/jdamick/uuid/blob/master/lib/uuid.rb
  def self.uuid()
    self.generate()
  end
 
  def self.generate()
    java.util.UUID.randomUUID().toString()
  end
end

Using ILLUSTRATE lets us see our UDF code run on real data, without waiting on Hadoop jobs to finish. This is great for development!

HBase and JRuby

You can download JRuby at http://jruby.org/download or better yet, install it via rvm, which you can install via the instructions here: https://rvm.io/rvm/install/.

$ rvm install jruby

JRuby can use the Java native HBase client, which is fast and efficient (the HBase shell is actually a modified JRuby Interactive Ruby Shell). Thrift and JSON APIs are provided for other languages. Details about JRuby and HBase are available at http://wiki.apache.org/hadoop/Hbase/JRuby, although the example uses old APIs that we’ve updated for this example.

A great resource to see JRuby in action against HBase is in the HBase shell itself: https://github.com/apache/hbase/tree/trunk/hbase-server/src/main/ruby.

To connect to HBase in JRuby, we’ll need to setup our CLASSPATH to import the HBase jars.

cd $HBASE_HOME
wget http://central.maven.org/maven2/org/jruby/jruby-complete/1.6.7.2/jruby-complete-1.6.7.2.jar
export CLASSPATH=$CLASSPATH:`java -jar $HBASE_HOME/jruby-complete-1.6.7.2.jar -e "puts Dir.glob('$HBASE_HOME/{.,build,lib}/*.jar').join(':')"`

Now lets import the Java HBase client classes in JRuby and connect to HBase. htable.rb from the Hbase Shell is helpful: https://github.com/apache/hbase/blob/trunk/hbase-server/src/main/ruby/hbase/table.rb.

$ jirb

We begin by importing the relevant Java classes into JRuby.

# Adapted from obsolete example at http://wiki.apache.org/hadoop/Hbase/JRuby
 
include Java
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.ResultScanner
import org.apache.hadoop.hbase.util.Writables
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text

Next, we take a queue from the HBase shell and create a to_string utility method.

# Make a String of the passed kv
def to_string(column, kv, maxlength = -1)
  if kv.isDelete
    val = "timestamp=#{kv.getTimestamp}, type=#{org.apache.hadoop.hbase.KeyValue::Type::codeToType(kv.getType)}"
  else
    val = "timestamp=#{kv.getTimestamp}, value=#{org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getValue)}"
  end
  val
end

Connecting to HBase is easy. Lets wrap the code in a connect method.

# Connect to HBase and our table
def connect(table_name)
  @conf = HBaseConfiguration.create
  admin = HBaseAdmin.new(@conf)
  @table = HTable.new(@conf, table_name)
end

Fetching a record is fairly easy. Lets make a get method.

def get_key(key)
  my_get = Get.new(key.to_java_bytes)
  result = @table.get(my_get)
  result_ary = []
  for kv in result.list
    family = String.from_java_bytes(kv.get_family)
    qualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.get_qualifier)
    column = "#{family}:#{qualifier}"
    value = to_string(column, kv, -1)
    timestamp = kv.get_timestamp
    str_value = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.get_value)
    result_ary << str_value.to_s
  end
  result_ary
end

I’ve wrapped connect and get in a simple JRuby HBase Client: https://github.com/rjurney/enron-jruby-sinatra-hbase-pig/blob/master/lib/hbase_client.rb.

A simple unit test verifies things work:

$ jruby test/hbase_client.rb
require 'lib/hbase_client'
 
hclient = HBaseClient.new
hclient.connect('enron')
hclient.get('row1')
1345691847565
bob@enron.com

JRuby and Sinatra

Sinatra is a simple Ruby framework for web applications. It is summarized nicely in its README. Installing Sinatra is easy.

jgem install sinatra

Our sinatra app, sinatra.rb, is simple enough:

require 'rubygems'
require 'sinatra'
require 'json'
require 'lib/hbase_client'
require 'lib/data_utils'
 
hclient = HBaseClient.new
hclient.connect('enron')
 
get '/:message_id' do |message_id|
  JSON hclient.get(message_id)
end
 
get '/create_uuid' do
  "This web service shares code with a Pig JRuby UDF to produce this UUID: " + DataUtils.uuid
end

Run our app: jruby ./web.rb and navigate to our UUID web service.

Now pick out a message ID from a scan in the HBase shell and fetch it as JSON:

> scan 'enron'
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:from_address, timestamp=1345904219080, value=stanley.horton@enron.com
 2b1                                                                                                                                 
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:to_address, timestamp=1345904219080, value=maria.pavlou@enron.com            
 2b1                                                                                                                                 
 ffffe8b6-2abb-43ba-aad8-6ee8c4526 column=address.pairs:total_sent, timestamp=1345904219080, value=1                                 
 2b1                                                                                                                                 
310192 row(s) in 246.2150 seconds
> get 'enron', 'ffffe8b6-2abb-43ba-aad8-6ee8c45262b1'
COLUMN                             CELL                                                                                              
 address.pairs:from_address        timestamp=1345904219080, value=stanley.horton@enron.com                                           
 address.pairs:to_address          timestamp=1345904219080, value=maria.pavlou@enron.com                                             
 address.pairs:total_sent          timestamp=1345904219080, value=1                                                                  
3 row(s) in 0.0300 seconds

Our web service publishes stats on messages between email addresses from HBase.

Conclusion

Starting with emails in Avro format, we have processed our data using Pig and published it to HBase, where a simple JRuby Sinatra app serves it as JSON. We’ve also managed to share UUID code between our JRuby Pig UDF and our Sinatra web application.

About the Author

Russell Jurney is Hortonworks Hadoop Evangelist and the author of the book Agile Data (O’Reilly, Dec 2012), which teaches a flexible toolset and methodology for building effective analytics applications using Apache Hadoop and cloud computing.

Categorized by :
HBase Other Pig

Comments

|
August 27, 2012 at 8:21 am
|

Instead of “Next, we take a queue from the HBase shell and create a to_string utility method.” I think you meant “.. we take a cue from …”.

Leave a Reply

Your email address will not be published. Required fields are marked *

If you have specific technical questions, please post them in the Forums

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Hortonworks Data Platform
The Hortonworks Data Platform is a 100% open source distribution of Apache Hadoop that is truly enterprise grade having been built, tested and hardened with enterprise rigor.
Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.
Get started with Sandbox
Hortonworks Sandbox is a self-contained virtual machine with Apache Hadoop pre-configured alongside a set of hands-on, step-by-step Hadoop tutorials.