Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Sign up for the Developers Newsletter

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.


Get Started


Ready to Get Started?

Download sandbox

How can we help you?

* I understand I can unsubscribe at any time. I also acknowledge the additional information found in Hortonworks Privacy Policy.
closeClose button
November 08, 2013
prev slideNext slide

How to use R and other non-Java languages in MapReduce and Hive

I teach for Hortonworks and in class just this week I was asked to provide an example of using the R statistics language with Hadoop and Hive. The good news was that it can easily be done. The even better news is that it is actually possible to use a variety of tools: Python, Ruby, shell scripts and R to perform distributed fault tolerant processing of your data on a Hadoop cluster.

In this blog post I will provide an example of using R, with Hive. I will also provide an introduction to other non-Java MapReduce tools.

If you wanted to follow along and run these examples in the Hortonworks Sandbox you would need to install R.

In order to do that log in to the sandbox and as root (password: ‘hadoop’) run:

yum install R

Before we get into the details of using R with Hive, let’s review some important details about Hadoop.

The processing framework for Hadoop has been MapReduce. MapReduce provides a framework where a developer would write a Map Class in java that receives individual records and filters or transforms those records. If there is a Reduce phase then the Mapper output is partitioned by key, sorted by key and Reducers fetch a subset of the keys and processes groups of values with matching keys.

If you already have a collection of R scripts that you are comfortable using, or you have Python, or Ruby, or Shell scripts and you want to process large amounts of data fast on your hadoop cluster, this is the blog post for you.


The first tool I will demonstrate is Streaming. Streaming runs a MapReduce Job from the command line. You specify a map script, a reduce script, an input and an output and streaming takes care of the rest. Streaming takes care of the Map Reduce details such as making sure that your job is split into separate tasks, that the map tasks are executed where the data is stored.

The streaming jar is typically located here: /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar. To use the Streaming Jar You simply specify on the command line the following parameters.

  • -input – The data in hdfs that you want to process
  • -output – The directory in hdfs where you want to store the output
  • -mapper – the program script command line or process that you want to use for your mapper
  • -reducer – the program script command or process that you want to use for your reducer.

Suppose I have data that looks like this:

CA <tab> San Francisco <tab> 3,273,190
CO <tab> Denver<tab> 634,265
OH <tab> Athens<tab> 23,832
GA <tab> Atlanta <tab> 443,775
TN <tab> Chatanooga <tab> 167,674
CA <tab> San Jose <tab> 984,299
OH <tab> Columbus <tab> 797,434

If I place that file in HDFS and call it cities.txt I can process it with Streaming like this:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout -mapper /bin/cat -reducer /bin/cat -numReduceTasks 2

This will run a Streaming job that has the “cat” command as the mapper. The data is passed through the mapper unchanged. After the map phase comes the shuffle and sort. In streaming the key is the text in the line before the first tab, and the value is the rest of the line. One key value pair per line.  In the example above  OH, CA, etc would be the keys. Records – in this case lines of text – are partitioned and sorted by the state field and then retrieved by two separate Reduce tasks. As we see below one Reducer got OH, and the rest of the states went to the second Reducer. Notice that when a reducer gets more than one key, in this case state they are received in order.

Here is a view of the output of this command:

hadoop fs -cat streamout/part-00000

OH "Athens" 23,832
OH "Columbus" 797,434
[train@node ~]$ hadoop fs -cat streamout/part-00001
CA "San Francisco" 3,273,190
CA "San Jose" 984,299
CO "Denver" 634,265
GA "Atlanta" 443,775
TN "Chatanooga" 167,674

So the example using /bin/cat just shows you a default MapReduce job. You should see from the above example that data is split into tasks on the map side based upon storage location, and that data is split on the reduce side based upon the intermediate key. In this case the intermediate key was the state. Note how all of the cities in ohio ended up at Reducer 1, and all of the cities in CA ended up at reducer 2. That is the rule, all values for same key end up at same Reducer.

How would we do something more interesting or useful using the Streaming jar? We could replace the Reducer with a grep command.

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout -mapper /bin/cat -reducer 'grep -i CA' 

That command would launch a MapReduce job that would filter out all records that do not match CA for California. I leave a demonstration of that as an exercise for the reader.

The command used in streaming as the mapper or the reducer does not have to be a shell command, anything that can execute on the worker nodes of the cluster.

My students had asked for an R example. Here is the example I provided them.

#!/usr/bin/env Rscript
f <- file("stdin")
state_data = read.table(f)

In order for an R script to work as a streaming mapper or reducer it has to take stdin and print appropriate content to stdout. If used as a mapper the script would emit lines of text in the format of key<tab>value and the output would be partitioned by key and sorted by key. The output of the Reducer gets written into hdfs so format can be more freeform. The R script above reads from stdin creates a table from it then prints a summary of that table. Since the emitted from this script is not in key<tab>value format you could use it as a Reducer, as a Mapper it might not work so well.

Unlike cat, or grep or awk, the R script is not by default on the worker nodes of the cluster.  In order to get the R script to the mappers and reducers you use the -file option to the streaming jar.

The command below ships the raw data through cat, the shuffle and sort guarantees lines are aggregated and sorted by state, the numReduceTasks guarantees 2 Reducers, and each Reducer’s data is shipped through our R script, the output is written into HDFS.

[train@node ~]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streamingxxxx.jar -input cities.txt -output streamout_r -mapper /bin/cat -reducer script.r -numReduceTasks 2 -file script.r

Here is the output when run against our cities.txt file shown in the previous example.

[train@node ~]$ hadoop fs -cat streamout_r/part-00000
OH:2 Athens :1 23,832 :1
Columbus:1 797,434:1

[train@node ~]$ hadoop fs -cat streamout_r/part-00001
V1 V2 V3
CA:2 Atlanta :1 167,674 :1
CO:1 Chatanooga :1 3,273,190:1
GA:1 Denver :1 443,775 :1
TN:1 San Francisco:1 634,265 :1
San Jose :1 984,299 :1

So there you have it, a working example of running an R script across the data in the cluster. The data was partitioned by state, and each Reducer sent it’s partition through the R script and the output of the R script was written into HDFS.

I hope that provides a good starting point if you want to use R or other tools on your cluster instead of using Java. My R skills are basic, I leave it to your Data Scientists to work some magic on slices of your data.

Hive and R

Streaming data through R using the streaming jar is just one option available to you.  Hive and Pig also providing the ability to pass data through a script. Just like streaming the script can be R, Python ,Ruby,  shell scripts, or linux commands.

Our Data Analyst course can provide you with much more information but here is a short summary on how hive allows you to express your logic as a SQL query, and optionally take that sql generated data set and pass it through a script.

Imagine a scenario where the total volume of your data is too large to be represented in a single R data frame. You might be able to gain some insight by using hive to slice your data into partitions, perform some joins and aggregation and then ship it the slices of the dataset through the R script. This can easily be done.

Here is an example using hive and R.

I changed the R script so that it takes in two columns of data. For example it might receive

OH Cleveland
OH Columbus
CO Denver

And the script will return a count of each distinct first column. In the case above when processed through this R script it will return:

OH  2
CO  1

The reason I did this is because Hive requires consistent number of columns per record processed. So I needed an R script that emitted a consistent number of columns.

The R script below will ingest two columns and emit 2 columns. If the script does not return data in that format, of a tab delimited record per line then hive will complain. This  R script counts the number of occurrences of the first column and returns the first column as is, and replaces the second column with the count.

#!/usr/bin/env Script
f <- file("stdin") ## read the contents of stdin
open(f) ## open the handle on stdin
my_data = read.table(f) ## read stdin as a table
my_data_count=table(my_data$V1) ## count the number of occurance of column 1
write.table(my_data_count,quote = FALSE,row.names = FALSE,col.names = FALSE,sep = " ") ## format the output so we see column1<tab>count

To use the data in Hive we would do the following.

hive> add file script.r;
Added resource: script.r
hive> set mapred.reduce.tasks=2;
hive> from (
> select state, city
> from cities distribute by state)
> t1
> insert overwrite directory 'R_example'
> reduce state,city using 'script.r';

You might ask, what the cities table looks like, actually that is not to important here, you could use this example by selecting any two columns from any hive table you have and allowing the R script to generate the count.

The statement is a SQL statement but in reverse, it specifies what data will be read – the select clause –, what columns will be sent to the reducer – state, city–, how the keys will be partitioned —distribute by state— , and what script the reducer will use to process the data – reduce state,city using ‘script.r’ , and also what directory the final output will be stored in – insert overwrite directory ‘R_example‘.

A quick summary would be Hive selects two columns, send all occurrences of the same state to the same reducer, and when the reducer gets its values transform the dataset with the R script.

The output looks like this. We can use Hive to slice and dice our data and then ship the output through R.

[root@sandbox ~]# hadoop fs -cat R_example/000000_0
CA 2
CO 1
GA 1
TN 1

[root@sandbox ~]# hadoop fs -cat R_example/000001_0
OH 2

And that’s it. Pig provides similar functionality but I leave that as an exercise for the reader, or for a later blog post.

Happy Hadooping from the Hortonworks Education team, see you in class.

You can get started with Hadoop here, and register for training here.



srikrishna says:
Your comment is awaiting moderation.

Is the R program complete .

My students had asked for an R example. Here is the example I provided them.

#!/usr/bin/env Rscript
f <- file("stdin")
state_data = read.table(f)

I am getting error on the reducer step…Reduce tasks greater than allowed limit.

srikrishna says:

#!/usr/bin/env Rscript
f <- file("stdin")
state_data = read.table(f)

Is there something missing in the script after [ . please clarify. I am trying it in sandbox 1.3

Tom Hanlon says:

The typo has been fixed above, the last “[” was a typo.

Should work now.

Thanks for the comment that alerted me to the typo.


srikrishna says:

what version of hadoop streaming is required to run the above R code. I am getting the following.after the reduce step. I am running under sandbox.

# of failed Reduce Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201311121729_0012_r_000000

Can you suggest some ideas ?

Tom Hanlon says:

The code in this blog works on a HDP 1.3 sandbox and an HDP 2.0 sandbox.

The error might be caused by R seeing this line.
CA San Francisco 3,273,190
And parsing that into 4 fields.
When we would rather it was parsed into 3 fields and keep the string “San Francisco” as one field.

The fix would be to quote the city names.
Try it with a data file that looks like this.

CA “San Francisco” 3,273,190
CO “Denver” 634,265
OH “Athens” 23,832
GA “Atlanta” 443,775
TN “Chatanooga” 167,674
CA “San Jose” 984,299
OH “Columbus” 797,434

You can test the r script simply by
chmod + x script.r
cat cities.txt | ./script.r

If it works then it should work when executed by hadoop.


Tom Hanlon says:

There is a typo in the first R script.

The last “[” is a typo.

Here is the complete script.
#!/usr/bin/env Rscript
f <- file("stdin")
state_data = read.table(f)

Greg says:
Your comment is awaiting moderation.

Unfortunately R input/output is extremely slow and can’t be used in production

Kevin says:
Your comment is awaiting moderation.

I tried to do the same by selecting two categorical columns in a table, the selection was done with no error, but exception was thrown on the last line: write.table(my_data_count,quote = FALSE,row.names = FALSE,col.names = FALSE,sep = ” “). Any advice? Thanks!!

Diagnostic Messages for this Task:
java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error w k”,”_col1″:”debit”},”alias”:0}
at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(
at org.apache.hadoop.mapred.Child$
at Method)
at org.apache.hadoop.mapred.Child.main(
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing r “},”alias”:0}
at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(
… 7 more
Caused by: org.apache.hadoop.hive.ql.meta

FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
MapReduce Jobs Launched:
Job 0: Map: 7 Reduce: 1 Cumulative CPU: 114.58 sec HDFS Read: 381482305 HDFS Write: 378 SUCCE
Job 1: Map: 1 Reduce: 1 Cumulative CPU: 0.64 sec HDFS Read: 743 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 1 minutes 55 seconds 220 msec

Antonio says:

Why make your life so painful? Why not use rmr2? Hortonworks has been a great advocate for it in the past.

Tom Hanlon says:

We have and do support the Revolution Analytics stuff, see this blog here. . The purpose of this blog post was to show the flexibility of hive and that just about anything that takes STDIN and writes to STDOUT can be used.

KC says:
Your comment is awaiting moderation.

Thank you for this tutorial. I am new to HDFS and Map Reduce. I learned a lot from you. What if my data are many compressed .gz file, and further, they are all under a data directory in HDFS. Actually these data are kind of a daily log about some kind of daily usage by multiple users through out a month. How should I deal with this kind of stdin in R in context of HDFS? Second question: I want to get a picture on usage by user in the entire month. I am not sure what the effecient way is to deal with this.

KC says:
Your comment is awaiting moderation.

Hi Tom, thank you for the tutorial. I am new to HDFS and Map Reduce concept. I posted this question earlier but not sure if it went through. So here it again. I am dealing with compressed (.gz) files of daily usage log. Each log contains more than 100k records. I want to know how to modify the R example so I can read this kind of file. Second question, I want to analyze all these files throughout an entire month. I envision some kind of iteration to go through each daily uasge log. But I am not sure this is the right or efficeint way to take advantage of Map Reduce. Could you give me some guidance? Thanks a bunch.

Jobil says:
Your comment is awaiting moderation.

How do you call a R library using this way. Every time I included any library in the R script, I was getting an error. I have installed the library in all the nodes in the cluster but still getting the following error – Failure Info:# of failed Reduce Tasks exceeded allowed limit. FailedCount: 1.

But when I remove the library from the R script everything is working correctly.

Swapnil says:

Hi Tom,

Can you suggest a way in which I can partition the output from reducer to various directories using hadoop streaming via python without using external libraries like pydoop etc. I know it can be done using MultipleOutputFormat class in java.

Bruno says:


I hope someone reads this comment as i am in a middle of a HDFS related project and i am always getting the same error when i execute the hive code that makes use of the R script:

Execution Error, return code 20003 from An error occurred when trying to close the Operator running your custom script

I am quite new to this and i haven’t been able to find the solution, not sure if it’s because of java or some other setting. I am running Centos on Oracle Virtualbox.

Would appreciate a lot some kind of help with this, because i am unable to continue…

Thanks in advance.

sachin says:

Were you able fix this?

Akshay Jain says:

I am trying to implement hadoop streaming in python,but my data is not getting sorted in reduce phase when i am using more than 1 reducer.Can anybody help?

Leave a Reply

Your email address will not be published. Required fields are marked *