How to use R and other non-Java languages in MapReduce and Hive

I teach for Hortonworks and in class just this week I was asked to provide an example of using the R statistics language with Hadoop and Hive. The good news was that it can easily be done. The even better news is that it is actually possible to use a variety of tools: Python, Ruby, shell scripts and R to perform distributed fault tolerant processing of your data on a Hadoop cluster.

In this blog post I will provide an example of using R, http://www.r-project.org with Hive. I will also provide an introduction to other non-Java MapReduce tools.

If you wanted to follow along and run these examples in the Hortonworks Sandbox you would need to install R.

In order to do that log in to the sandbox and as root (password: ‘hadoop’) run:

yum install R

Before we get into the details of using R with Hive, let’s review some important details about Hadoop.

The processing framework for Hadoop has been MapReduce. MapReduce provides a framework where a developer would write a Map Class in java that receives individual records and filters or transforms those records. If there is a Reduce phase then the Mapper output is partitioned by key, sorted by key and Reducers fetch a subset of the keys and processes groups of values with matching keys.

If you already have a collection of R scripts that you are comfortable using, or you have Python, or Ruby, or Shell scripts and you want to process large amounts of data fast on your hadoop cluster, this is the blog post for you.

Streaming

The first tool I will demonstrate is Streaming. Streaming runs a MapReduce Job from the command line. You specify a map script, a reduce script, an input and an output and streaming takes care of the rest. Streaming takes care of the Map Reduce details such as making sure that your job is split into separate tasks, that the map tasks are executed where the data is stored.

The streaming jar is typically located here: /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar. To use the Streaming Jar You simply specify on the command line the following parameters.

  • -input – The data in hdfs that you want to process
  • -output – The directory in hdfs where you want to store the output
  • -mapper – the program script command line or process that you want to use for your mapper
  • -reducer – the program script command or process that you want to use for your reducer.

Suppose I have data that looks like this:

CA <tab> San Francisco <tab> 3,273,190
CO <tab> Denver<tab> 634,265
OH <tab> Athens<tab> 23,832
GA <tab> Atlanta <tab> 443,775
TN <tab> Chatanooga <tab> 167,674
CA <tab> San Jose <tab> 984,299
OH <tab> Columbus <tab> 797,434

If I place that file in HDFS and call it cities.txt I can process it with Streaming like this:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout -mapper /bin/cat -reducer /bin/cat -numReduceTasks 2

This will run a Streaming job that has the “cat” command as the mapper. The data is passed through the mapper unchanged. After the map phase comes the shuffle and sort. In streaming the key is the text in the line before the first tab, and the value is the rest of the line. One key value pair per line.  In the example above  OH, CA, etc would be the keys. Records – in this case lines of text – are partitioned and sorted by the state field and then retrieved by two separate Reduce tasks. As we see below one Reducer got OH, and the rest of the states went to the second Reducer. Notice that when a reducer gets more than one key, in this case state they are received in order.

Here is a view of the output of this command:

hadoop fs -cat streamout/part-00000


OH "Athens" 23,832
OH "Columbus" 797,434
[train@node ~]$ hadoop fs -cat streamout/part-00001
CA "San Francisco" 3,273,190
CA "San Jose" 984,299
CO "Denver" 634,265
GA "Atlanta" 443,775
TN "Chatanooga" 167,674

So the example using /bin/cat just shows you a default MapReduce job. You should see from the above example that data is split into tasks on the map side based upon storage location, and that data is split on the reduce side based upon the intermediate key. In this case the intermediate key was the state. Note how all of the cities in ohio ended up at Reducer 1, and all of the cities in CA ended up at reducer 2. That is the rule, all values for same key end up at same Reducer.

How would we do something more interesting or useful using the Streaming jar? We could replace the Reducer with a grep command.

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout -mapper /bin/cat -reducer 'grep -i CA' 

That command would launch a MapReduce job that would filter out all records that do not match CA for California. I leave a demonstration of that as an exercise for the reader.

The command used in streaming as the mapper or the reducer does not have to be a shell command, anything that can execute on the worker nodes of the cluster.

My students had asked for an R example. Here is the example I provided them.

#!/usr/bin/env Rscript
f <- file("stdin")
open(f)
state_data = read.table(f)
summary(state_data)

In order for an R script to work as a streaming mapper or reducer it has to take stdin and print appropriate content to stdout. If used as a mapper the script would emit lines of text in the format of key<tab>value and the output would be partitioned by key and sorted by key. The output of the Reducer gets written into hdfs so format can be more freeform. The R script above reads from stdin creates a table from it then prints a summary of that table. Since the emitted from this script is not in key<tab>value format you could use it as a Reducer, as a Mapper it might not work so well.

Unlike cat, or grep or awk, the R script is not by default on the worker nodes of the cluster.  In order to get the R script to the mappers and reducers you use the -file option to the streaming jar.

The command below ships the raw data through cat, the shuffle and sort guarantees lines are aggregated and sorted by state, the numReduceTasks guarantees 2 Reducers, and each Reducer’s data is shipped through our R script, the output is written into HDFS.

[train@node ~]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout_r -mapper /bin/cat -reducer script.r -numReduceTasks 2 -file script.r

Here is the output when run against our cities.txt file shown in the previous example.

[train@node ~]$ hadoop fs -cat streamout_r/part-00000
OH:2 Athens :1 23,832 :1
Columbus:1 797,434:1

[train@node ~]$ hadoop fs -cat streamout_r/part-00001
V1 V2 V3
CA:2 Atlanta :1 167,674 :1
CO:1 Chatanooga :1 3,273,190:1
GA:1 Denver :1 443,775 :1
TN:1 San Francisco:1 634,265 :1
San Jose :1 984,299 :1

So there you have it, a working example of running an R script across the data in the cluster. The data was partitioned by state, and each Reducer sent it’s partition through the R script and the output of the R script was written into HDFS.

I hope that provides a good starting point if you want to use R or other tools on your cluster instead of using Java. My R skills are basic, I leave it to your Data Scientists to work some magic on slices of your data.

Hive and R

Streaming data through R using the streaming jar is just one option available to you.  Hive and Pig also providing the ability to pass data through a script. Just like streaming the script can be R, Python ,Ruby,  shell scripts, or linux commands.

Our Data Analyst course can provide you with much more information but here is a short summary on how hive allows you to express your logic as a SQL query, and optionally take that sql generated data set and pass it through a script.

Imagine a scenario where the total volume of your data is too large to be represented in a single R data frame. You might be able to gain some insight by using hive to slice your data into partitions, perform some joins and aggregation and then ship it the slices of the dataset through the R script. This can easily be done.

Here is an example using hive and R.

I changed the R script so that it takes in two columns of data. For example it might receive

OH Cleveland
OH Columbus
CO Denver

And the script will return a count of each distinct first column. In the case above when processed through this R script it will return:

OH  2
CO  1

The reason I did this is because Hive requires consistent number of columns per record processed. So I needed an R script that emitted a consistent number of columns.

The R script below will ingest two columns and emit 2 columns. If the script does not return data in that format, of a tab delimited record per line then hive will complain. This  R script counts the number of occurrences of the first column and returns the first column as is, and replaces the second column with the count.

#!/usr/bin/env Script
f <- file("stdin") ## read the contents of stdin
open(f) ## open the handle on stdin
my_data = read.table(f) ## read stdin as a table
my_data_count=table(my_data$V1) ## count the number of occurance of column 1
write.table(my_data_count,quote = FALSE,row.names = FALSE,col.names = FALSE,sep = " ") ## format the output so we see column1<tab>count

To use the data in Hive we would do the following.

hive> add file script.r;
Added resource: script.r
hive> set mapred.reduce.tasks=2;
hive> from (
> select state, city
> from cities distribute by state)
> t1
> insert overwrite directory 'R_example'
> reduce state,city using 'script.r';

You might ask, what the cities table looks like, actually that is not to important here, you could use this example by selecting any two columns from any hive table you have and allowing the R script to generate the count.

The statement is a SQL statement but in reverse, it specifies what data will be read – the select clause –, what columns will be sent to the reducer – state, city–, how the keys will be partitioned –distribute by state– , and what script the reducer will use to process the data – reduce state,city using ‘script.r’ , and also what directory the final output will be stored in – insert overwrite directory ‘R_example‘.

A quick summary would be Hive selects two columns, send all occurrences of the same state to the same reducer, and when the reducer gets its values transform the dataset with the R script.

The output looks like this. We can use Hive to slice and dice our data and then ship the output through R.

[root@sandbox ~]# hadoop fs -cat R_example/000000_0
CA 2
CO 1
GA 1
TN 1

[root@sandbox ~]# hadoop fs -cat R_example/000001_0
OH 2

And that’s it. Pig provides similar functionality but I leave that as an exercise for the reader, or for a later blog post.

Happy Hadooping from the Hortonworks Education team, see you in class.

You can get started with Hadoop here, and register for training here.

Categorized by :
Data Analyst & Scientist Developer Hive Sandbox

Comments

|
December 14, 2013 at 11:26 am
|

Why make your life so painful? Why not use rmr2? Hortonworks has been a great advocate for it in the past.

Tom Hanlon
|
November 12, 2013 at 1:16 pm
|

There is a typo in the first R script.

The last “[” is a typo.

Here is the complete script.
#!/usr/bin/env Rscript
f <- file("stdin")
open(f)
state_data = read.table(f)
summary(state_data)

srikrishna
|
November 11, 2013 at 10:56 pm
|

#!/usr/bin/env Rscript
f <- file("stdin")
open(f)
state_data = read.table(f)
summary(state_data)[

Is there something missing in the script after [ . please clarify. I am trying it in sandbox 1.3

    Tom Hanlon
    |
    November 12, 2013 at 3:10 pm
    |

    The typo has been fixed above, the last “[” was a typo.

    Should work now.

    Thanks for the comment that alerted me to the typo.


    Tom

      srikrishna
      |
      November 12, 2013 at 9:31 pm
      |

      what version of hadoop streaming is required to run the above R code. I am getting the following.after the reduce step. I am running under sandbox.

      # of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201311121729_0012_r_000000

      Can you suggest some ideas ?

        |
        November 12, 2013 at 11:08 pm
        |

        The code in this blog works on a HDP 1.3 sandbox and an HDP 2.0 sandbox.

        The error might be caused by R seeing this line.
        CA San Francisco 3,273,190
        And parsing that into 4 fields.
        CA|San|Francisco|3,273,190
        When we would rather it was parsed into 3 fields and keep the string “San Francisco” as one field.

        The fix would be to quote the city names.
        Try it with a data file that looks like this.

        CA “San Francisco” 3,273,190
        CO “Denver” 634,265
        OH “Athens” 23,832
        GA “Atlanta” 443,775
        TN “Chatanooga” 167,674
        CA “San Jose” 984,299
        OH “Columbus” 797,434

        You can test the r script simply by
        chmod + x script.r
        cat cities.txt | ./script.r

        If it works then it should work when executed by hadoop.


        Tom

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Try it with Sandbox
Hortonworks Sandbox is a self-contained virtual machine with Apache Hadoop pre-configured alongside a set of hands-on, step-by-step Hadoop tutorials.

Get Sandbox
HDP 2.1 Webinar Series
Join us for a series of talks on some of the new enterprise functionality available in HDP 2.1 including data governance, security, operations and data access :
Contact Us
Hortonworks provides enterprise-grade support, services and training. Discuss how to leverage Hadoop in your business with our sales team.
Integrate with existing systems
Hortonworks maintains and works with an extensive partner ecosystem from broad enterprise platform vendors to specialized solutions and systems integrators.