Category Archives: Performance


6 Key Hardware Considerations for Deploying Hadoop in Your Environment

To deploy, configure, manage and scale Hadoop clusters in a way that optimizes performance and resource utilization there is a lot to consider. Here are  6 key things to think about as part of your planning:

hp

  1. Operating system:  Using a 64-bit operating system helps to avoid constraining the amount of memory that can be used on worker nodes. For example, 64-bit Red Hat Enterprise Linux 6.1 or greater is often preferred, due to better ecosystem support, more comprehensive functionality for components such as RAID controllers.
  2. Computation: Computational (or processing) capacity is determined by the aggregate number of Map/Reduce slots available across all nodes in a cluster. Map/Reduce slots are configured on a per-server basis. I/O performance issues can arise from sub-optimal disk-to-core ratios (too many slots and too few disks). HyperThreading improves process scheduling, allowing you to configure more Map/Reduce slots.
  3. Memory: Depending on the application, your system’s memory requirements will vary. They differ between the management services and the worker services. For the worker services, sufficient memory is needed to manage the TaskTracker and FileServer services in addition to the sum of all the memory assigned to each of the Map/Reduce slots. If you have a memory-bound Map/Reduce Job, you may need to increase the amount of memory on all the nodes running worker services. When increasing memory, you should always populate all the memory channels available to ensure optimum performance.
  4. Storage: A Hadoop platform that’s designed to achieve performance and scalability by moving the compute activity to the data is preferable. Using this approach, jobs are distributed to nodes close to the associated data, and tasks are run against data on local disks. Data storage requirements for the worker nodes may be best met by direct attached storage (DAS) in a Just a Bunch of Disks (JBOD) configuration and not as DAS with RAID or Network Attached Storage (NAS).
  5. Capacity:  The number of disks and their corresponding storage capacity determines the total amount of the FileServer storage capacity for your cluster. Large Form Factor (3.5”) disks cost less and store more, compared to Small Form Factor disks. A number of block copies should be available to provide redundancy. The more disks you have, the less likely it is that you will have multiple tasks accessing a given disk at the same time. More tasks will be able to run against node-local data, as well.
  6. Network: Configuring only a single Top of Rack (TOR) switch per rack introduces a single point of failure for each rack. In a multi-rack system, such a failure will result in a flood of network traffic as Hadoop rebalances storage. In a single-rack system, this type of failure can bring down the whole cluster. Configuring two TOR switches per rack provides better redundancy, especially if link aggregation is configured between the switches. This way, if either switch fails, the servers will still have full network functionality. Not all switches have the ability to do link aggregation from individual servers to multiple switches. Incorporating dual power supplies for the switches can also help mitigate failures.

Thanks to HP for pulling this information together and testing Hortonworks Data Platform on HP hardware. For the full report, download the “HP Reference Architecture for Hortonworks Data Platform” whitepaper.

Week in Review: Sandboxes, HDP 2.0 Alpha 2, Hive Performance and Summits

Hadoop Summit It’s almost time for that final drive home of the week, and what a week it has been with a few new releases, a summit, and a little bit of technical fun. Here’s what happened:

New Sandbox Release. Yes, your favorite Hadoop VM image just got even better. Cheryle took us through the new features which included Ambari integration and Russell followed up with a quick tour of Ambari. There’s still plenty of time to download Sandbox for a weekend of data crunching fun.

HDP 2.0 Alpha 2 was released. This preview release demonstrates some of the performance improvements in store for the final HDP 2.0 release via YARN, enhancements to Hive per the Stinger Initiative, and Apache Tez. Just before the release, we posted some early test results which showed a 45X (yes, that’s forty five) performance improvement for Hive interactive queries. But that’s just the beginning as we push to 100X, and Microsoft also talked about their contributions to the Stinger Initiative with the same aim in mind.

If you’ve downloaded Sandbox and are looking for some inspiration for a little fun, then Russell also posted a two part series on extracting, loading, querying and analyzing your own Twitter archive with Hive. Part 1 is here, and Part 2 is here.

And finally, there was just the small matter of the Hadoop Summit in AmsterdamWe had a great time and hope you did too. Thank you for attending, contributing to the conversation and supporting Hadoop. If you’re now really excited to learn Hadoop, we posted about available training we have in Europe and Palo Alto.

And that was the week that was. Has your Sandbox downloaded yet?

Stinger Early Results: 45X Performance Increase for Hive

Written with Vinod Kumar Vavilapalli and Gopal Vijayaraghavan

A few weeks back we blogged about the Stinger Initiative and set a promise to work within the open community to make Apache Hive 100 times faster for SQL interaction with Hadoop. We have a broad set of scenarios queued up for testing but are so excited about the early results of this work that we thought we’d take the time to share some of this with you.

In order to get a fair assessment we styled our tests after the TPC Benchmark™ DS (TPC-DS). For this initial report, we provide detail around two of the most common use cases and as we execute more queries and make more improvements we will provide more detail.

Performance Tests & Environment

In this report we provide results for two of our performance queries.  In the first, we perform a star schema join where we load all the small tables in memory and do a scan through the fact table independently on all nodes. In the second query, we perform a join between two large fact tables, both of which are too large to fit in memory.

Our test environment comprised of a 10 node ec2 cluster with a total of 100 containers over 40 disks, to obtain query execution times with Hive on raw data and with Hive with all the optimizations enabled on partitioned data stored in RCFile format. We also use a scale factor of 200, which implies a data set of around 200GB.

Results

For the first query, we’ve calculated as much as a 35X improvement over native Hive and have reduced query times from around 1400 seconds to 39! And for second query we calculate a whopping 45X improvement… all in open source Apache Hive.

StingerQuery1  StingerQuery2

This is a preliminary look at test results but it indicates significant improvements already. We are pretty excited about the results, as the work has only just begun.  The test results above do not even include the Tez improvements, nor do they include the new ORCFile format!  And there are still a handful of other Hive specific improvements coming.  Some of the improvements are included in these tests include HIVE-3784, HIVE-3952 and HIVE-2340.  A before and after of the execution looks like this:

BeforeStinger

AfterStinger

In subsequent posts we will deliver more explicit results and provide a more in-depth specifications for hardware/software used for the benchmarks. We’ll also describe few other efforts, which will complete our story of attaining 100x gains we talked about earlier, so stay tuned.

We truly believe that the fastest path to innovation is the open community and this is a great example of how quick the community can prove this true.  These advances are not attributed to Hortonworks alone; they were completed in partnership with the community with resources from Yahoo!, Facebook, Twitter, SAP and Microsoft all contributing.

NOTE: We are talking about all of this and much more at Hadoop Summit Amsterdam. Attend our talk “Innovations In Apache Hadoop MapReduce, Pig and Hive for improving query performance” to learn more about this effort. “Optimizing Hive Queries” by Owen O’Malley and “What’s New and What’s Next in Apache Hive” by Gunther Hagleitner are two other talks that you should attend to learn more about other threads in our stinger initiative.

Imperative and Declarative Hadoop: TPC-H in Pig and Hive

According to the Transaction Processing Council, TPC-H is:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

TPC-H was implemented for Hive in HIVE-600 and for Pig in PIG-2397 by Hortonworks intern Jie Li. In going over this work, I was struck by how it outlined differences between Pig and SQL.

There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.

Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:

DROP TABLE lineitem;
DROP TABLE q1_pricing_summary_report;

-- create tables and load data
Create external table lineitem (
    L_ORDERKEY INT, L_PARTKEY INT, 
    L_SUPPKEY INT, 
    L_LINENUMBER INT, 
    L_QUANTITY DOUBLE, 
    L_EXTENDEDPRICE DOUBLE, 
    L_DISCOUNT DOUBLE, 
    L_TAX DOUBLE, 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    L_SHIPDATE STRING, 
    L_COMMITDATE STRING, 
    L_RECEIPTDATE STRING, 
    L_SHIPINSTRUCT STRING, 
    L_SHIPMODE STRING, 
    L_COMMENT STRING) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem';

-- create the target table
CREATE TABLE q1_pricing_summary_report ( 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    SUM_QTY DOUBLE, 
    SUM_BASE_PRICE DOUBLE, 
    SUM_DISC_PRICE DOUBLE, 
    SUM_CHARGE DOUBLE, 
    AVE_QTY DOUBLE, 
    AVE_PRICE DOUBLE, 
    AVE_DISC DOUBLE, 
    COUNT_ORDER INT);

set mapred.min.split.size=536870912;

-- the query
INSERT OVERWRITE TABLE q1_pricing_summary_report 
SELECT 
    L_RETURNFLAG, 
    L_LINESTATUS, 
    SUM(L_QUANTITY), 
    SUM(L_EXTENDEDPRICE), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), 
    AVG(L_QUANTITY),
    AVG(L_EXTENDEDPRICE), 
    AVG(L_DISCOUNT), 
    COUNT(1) 
FROM 
  lineitem 
WHERE 
  L_SHIPDATE<='1998-09-02' 
GROUP BY L_RETURNFLAG, L_LINESTATUS 
ORDER BY L_RETURNFLAG, L_LINESTATUS;

One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:

 SET default_parallel $reducers;

LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (
    orderkey:long, 
    partkey:long, 
    suppkey:long, 
    linenumber:long, 
    quantity:double, 
    extendedprice:double, 
    discount:double, 
    tax:double, 
    returnflag, 
    linestatus, 
    shipdate, 
    commitdate, 
    receiptdate, 
    shipinstruct, 
    shipmode, 
    comment);

SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02';

SubLine = FOREACH SubLineItems GENERATE 
    returnflag, 
    linestatus, 
    quantity, 
    extendedprice, 
    extendedprice * (1-discount) AS disc_price, 
    extendedprice * (1-discount) * (1+tax) AS charge, 
    discount;

StatusGroup = GROUP SubLine BY (returnflag, linestatus);

PriceSummary = FOREACH StatusGroup GENERATE 
    group.returnflag AS returnflag, 
    group.linestatus AS linestatus, 
    SUM(SubLine.quantity) AS sum_qty, 
    SUM(SubLine.extendedprice) AS sum_base_price, 
    SUM(SubLine.disc_price) as sum_disc_price, 
    SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, 
    AVG(SubLine.extendedprice) as avg_price, 
    AVG(SubLine.discount) as avg_disc, 
    COUNT(SubLine) as count_order;

SortedSummary = ORDER PriceSummary BY returnflag, linestatus;

STORE SortedSummary INTO '$output/Q1out';

Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:

DROP TABLE customer;
DROP TABLE orders;
DROP TABLE q22_customer_tmp;
DROP TABLE q22_customer_tmp1;
DROP TABLE q22_orders_tmp;
DROP TABLE q22_global_sales_opportunity;

-- create tables and load data
create external table customer (
    C_CUSTKEY INT, 
    C_NAME STRING, 
    C_ADDRESS STRING, 
    C_NATIONKEY INT, 
    C_PHONE STRING, 
    C_ACCTBAL DOUBLE, 
    C_MKTSEGMENT STRING, 
    C_COMMENT STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer';

create external table orders (
    O_ORDERKEY INT, 
    O_CUSTKEY INT, 
    O_ORDERSTATUS STRING, 
    O_TOTALPRICE DOUBLE, 
    O_ORDERDATE STRING, 
    O_ORDERPRIORITY STRING, 
    O_CLERK STRING, 
    O_SHIPPRIORITY INT, 
    O_COMMENT STRING) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders';

-- create target tables
create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string);
create table q22_customer_tmp1(avg_acctbal double);
create table q22_orders_tmp(o_custkey int);
create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double);

-- the query
insert overwrite table q22_customer_tmp
select 
  c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode
from 
  customer
where 
  substr(c_phone, 1, 2) = '13' or
  substr(c_phone, 1, 2) = '31' or
  substr(c_phone, 1, 2) = '23' or
  substr(c_phone, 1, 2) = '29' or
  substr(c_phone, 1, 2) = '30' or
  substr(c_phone, 1, 2) = '18' or
  substr(c_phone, 1, 2) = '17';

insert overwrite table q22_customer_tmp1
select
  avg(c_acctbal)
from
  q22_customer_tmp
where
  c_acctbal > 0.00;

insert overwrite table q22_orders_tmp
select 
  o_custkey 
from 
  orders
group by 
  o_custkey;

insert overwrite table q22_global_sales_opportunity
select
  cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal
from
(
  select cntrycode, c_acctbal, avg_acctbal from
  q22_customer_tmp1 ct1 join
  (
    select cntrycode, c_acctbal from
      q22_orders_tmp ot 
      right outer join q22_customer_tmp ct 
      on
        ct.c_custkey = ot.o_custkey
    where
      o_custkey is null
  ) ct2
) a
where
  c_acctbal > avg_acctbal
group by cntrycode
order by cntrycode;

The Pig is comparably simple:

 SET default_parallel $reducers;

customer = load '$input/customer' USING PigStorage('|') as (
    c_custkey:long,
    c_name:chararray, 
    c_address:chararray, 
    c_nationkey:int, 
    c_phone:chararray, 
    c_acctbal:double, 
    c_mktsegment:chararray, 
    c_comment:chararray);
orders = load '$input/orders' USING PigStorage('|') as (
    o_orderkey:long, 
    o_custkey:long, 
    o_orderstatus:chararray, 
    o_totalprice:double, 
    o_orderdate:chararray, 
    o_orderpriority:chararray, 
    o_clerk:chararray, 
    o_shippriority:long, 
    o_comment:chararray);

customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_filter_group = group customer_filter all;
avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal;

customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey;

customer_trd_filter = filter customer_orders_left by o_custkey is null;
customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal;

customer_result_group = group customer_rows by cntrycode;
customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal;
customer_result_inorder = order customer_result by group;

store customer_result_inorder into '$output/Q22out' USING PigStorage('|');

Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.

Hadoop in Perspective: Systems for Scientific Computing

When the term scientific computing comes up in a conversation it’s usually just the occasional science geek who shows signs of recognition. But although most people have little or no knowledge of the field’s existence, it has been around since the second half of the twentieth century and has played an increasingly important role in many technological and scientific developments. Internet search engines, DNA analysis, weather forecasting, seismic analysis, renewable energy, and aircraft modeling are just a small number of examples where scientific computing is nowadays indispensible.

Apache Hadoop is a newcomer in scientific computing, and is welcomed as a great new addition to already existing systems. In this post I mean to give an introduction to systems for scientific computing, and I make an attempt at giving Hadoop a place in this picture. I start by discussing arguably the most important concept in scientific computing: parallel computing; what is it, how does it work, and what tools are available? Then I give an overview of the systems that are available for scientific computing at SURFsara, the Dutch center for academic IT and home to some of the world’s most powerful computing systems. I end with a short discussion on the questions that arise when there’s many different systems to choose from.

Read More

Why not RAID-0? It’s about Time and Snowflakes

A recurrent question on the various Hadoop mailing lists is “why does Hadoop prefer a set of separate disks to the same set managed as a RAID-0 disks array?”

It’s about time and snowflakes.

JBOD and the Allure of RAID-0

In Hadoop clusters, we recommend treating each disk separately, in a configuration that is known, somewhat disparagingly as “JBOD”: Just a Box of Disks.

In comparison RAID-0, which is a bit of misnomer, there being no redundancy, stripes data across all the disks in the array. This promises some advantages:

  • Higher IO rates on small accesses
  • Higher bandwidth on larger accesses -especially write operations
  • Eliminates a hot-spot of a single disk overloaded if it’s data is more in demand

In RAID=0, data is striped across disks. When data needs to be written, it is divided up into small blocks (64KB or more). One of these blocks is written to each disk simultaneously. When the data is read back, all the blocks can again be read from all disks simultaneously. The result of this is that your disk bandwidth increases with the size of the array. If you had eight disks mounted as RAID-0, then the theoretical maximum write and read bandwidth is eight times faster than a single disk.

With the disk controllers built into modern servers, RAID-0 is an option that can be turned on: so why not?

Reliability

Reliability is one issue.

Disks can get slower as they age, as they start to get read errors and have to retry reading bits of the disk platter. A slow disk is a warning sign that maybe you should think about replacing that disk. With Hadoop in a JBOD setting, you can unmount the disk; the Datanode will notice it is missing and report to the Namenode that all the data on it needs re-replication. If you have a RAID-0 disk, everything across all disks is missing – you need to add a new disk to bring the array back up to size, reformat all the disks, and bring up the Datanode without any storage. Over time it will pick up more data, from rebalancing and jobs run on it.

You have to do that whenever any of the disks fails – the more disks you have, the more common it is.

Before panicking – disk failures are rare. Google’s 2007 paper, Failure Trends in a Large Disk Drive Population, reported that in their datacenters, 1.7% of disks failed in the first year of their life, while three-year-old disks were failing at a rate of 8.6%. About 9% isn’t a good number. Returning to the hypothetical eight-disk server, the probability of each disk lasting the year would be:

1 – 0.086 =0.914

The probability that all disks make it to their next birthday becomes:

0.914^8 = 0.487

If those google numbers matched that of the disks in your servers – and weren’t due to a really bad batch of disks – then during that third year, about half the datanodes would lose all their data and need to be rebuilt. If you have one of the latest twelve-disk servers, things get even worse.

Hadoop copes with reliability by duplicating data across servers: if one copy of an HDFS data block (64MB of greater) is lost or corrupt, there are usually two copies elsewhere to recover.

Only now, with all the data in a server lost, the amount of data to replicate on a disk failure increases linearly with the number of disks in each server – while the probability of the server failing also increases. Whereas before, each those failing year-three disks would have a probability of failing of 0.914 %, with the amount of data being the size of the disk: 1-3 TB of data.

That eight disk cluster would have to transmit 8-24 TB of data, and do it eight times as frequently. That’s going to be slightly more noticeable.

If you do want to use RAID-0 storage, configuring an eight-disk server as four pairs of RAID-0 storage is much less risky. The IO performance could be double that of a single drive, but so the risk of either failing would be less, and the cost of recovering the data also very much reduced.

Disk failures, then, are the first reason you don’t want to use RAID-0 storage – now to the second.

Every Disk is a Unique Snowflake

Hadoop job performance depends on disk bandwidth, especially the read bandwidth.

On RAID-0 Storage the disk accesses go at the rate of the slowest disk. It’s always been believed that the disks would all start out taking the same speed, and only degrade over time. Recent research shows things are worse than this: that the current generations of hard disks vary in performance from day one.

The 2011 paper, Disks Are Like Snowflakes: No Two Are Alike, measured the performance of modern disk drives, and discovered that they can vary in data IO rates by 20%, even when they are all writing to same part of the hard disk. This is because the latest manufacturing processes produce disks with different surface characteristics – altering the density at which the disks can store data. Rather than set the disk electronics up to only support the lowest measured performance, or to discard the slowest disks, manufacturers now use a technique called Adaptive Zoning. The newly manufactured disks are calibrated to their performance, and the the controllers configured to drive each zone in the disk at the highest rate that zone supports.

This is profound – and it’s not something that the disk manufacturers have been publicising. Modern CPU’s are “binned” into parts that support different rates – but those rates are published and you get to pay more for the faster parts. Here the speed of the disk varies, and you just have to hope your parts are the fast ones. In the experiments the authors of the paper conducted, some disks could deliver 105 Megabytes of data a second, with the actual range being 90-111 MB/s.

If you have eight disks, some will be faster than the others, right from day one. And your RAID-0 storage will deliver the performance of the slowest disk right from the day you unpack it from its box and switch it on.

That is the other why we don’t recommend configuring your servers’ storage as one large RAID-0 array.

Summary

To summarize: RAID-0 storage appears to increase disk I/O times, but it will deliver data at the rate of the slowest disk in the array – an array whose disk speeds can vary by up to 20%. When a single disk eventually fails, all the data on the server is lost, forcing you to reformat all the disks and waiting for HDFS to repopulate the server with new data.

  1. We use JBOD storage for all our worker nodes – and recommend our customers to do too.
  2. If anyone does insist on RAID-0 storage, restrict it to pairs of disks – so keeping the risk and cost of failures down.

Update: Single Drive RAID-0

Some people asked us what about RAID-0 and single drives – so we’d like to clarify this: Hadoop works perfectly well if you configure each drive as a single RAID-0 volume.

This configuration comes about with disk controllers that expect everything to be RAIDed, or if the controller can support JBOD or RAID modes -and you declare one pair of disks as RAID-1. Why would anyone want do declare two disks as RAID-1 – mirrored disks? If you put the OS on that RAID volume then the failure of either of those disks will not stop the server. In the very large 500+ node clusters people don’t do this -they have enough spare servers around, and would rather have the extra storage and bandwidth. On smaller clusters, having the OS on a mirrored pair of disks downgrades a server failure from a serious problem (re-replication of all the server’s data, new disk needed in a few hours) to a task (get a replacement disk and swap it into the server when you get a chance).

The problems with RAID-0 -amplified data replication on a disk failure, performance of the slowest disk- increase with the number of disks. With a single disk, you get exactly the same numbers as you would if the disk controller considered it a JBOD drive.