Imperative and Declarative Hadoop: TPC-H in Pig and Hive
According to the Transaction Processing Council, TPC-H is:
The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.
There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.
Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:
DROP TABLE lineitem; DROP TABLE q1_pricing_summary_report; -- create tables and load data Create external table lineitem ( L_ORDERKEY INT, L_PARTKEY INT, L_SUPPKEY INT, L_LINENUMBER INT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE STRING, L_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem'; -- create the target table CREATE TABLE q1_pricing_summary_report ( L_RETURNFLAG STRING, L_LINESTATUS STRING, SUM_QTY DOUBLE, SUM_BASE_PRICE DOUBLE, SUM_DISC_PRICE DOUBLE, SUM_CHARGE DOUBLE, AVE_QTY DOUBLE, AVE_PRICE DOUBLE, AVE_DISC DOUBLE, COUNT_ORDER INT); set mapred.min.split.size=536870912; -- the query INSERT OVERWRITE TABLE q1_pricing_summary_report SELECT L_RETURNFLAG, L_LINESTATUS, SUM(L_QUANTITY), SUM(L_EXTENDEDPRICE), SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), AVG(L_QUANTITY), AVG(L_EXTENDEDPRICE), AVG(L_DISCOUNT), COUNT(1) FROM lineitem WHERE L_SHIPDATE<='1998-09-02' GROUP BY L_RETURNFLAG, L_LINESTATUS ORDER BY L_RETURNFLAG, L_LINESTATUS;
One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:
SET default_parallel $reducers; LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS ( orderkey:long, partkey:long, suppkey:long, linenumber:long, quantity:double, extendedprice:double, discount:double, tax:double, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment); SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02'; SubLine = FOREACH SubLineItems GENERATE returnflag, linestatus, quantity, extendedprice, extendedprice * (1-discount) AS disc_price, extendedprice * (1-discount) * (1+tax) AS charge, discount; StatusGroup = GROUP SubLine BY (returnflag, linestatus); PriceSummary = FOREACH StatusGroup GENERATE group.returnflag AS returnflag, group.linestatus AS linestatus, SUM(SubLine.quantity) AS sum_qty, SUM(SubLine.extendedprice) AS sum_base_price, SUM(SubLine.disc_price) as sum_disc_price, SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, AVG(SubLine.extendedprice) as avg_price, AVG(SubLine.discount) as avg_disc, COUNT(SubLine) as count_order; SortedSummary = ORDER PriceSummary BY returnflag, linestatus; STORE SortedSummary INTO '$output/Q1out';
Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:
DROP TABLE customer; DROP TABLE orders; DROP TABLE q22_customer_tmp; DROP TABLE q22_customer_tmp1; DROP TABLE q22_orders_tmp; DROP TABLE q22_global_sales_opportunity; -- create tables and load data create external table customer ( C_CUSTKEY INT, C_NAME STRING, C_ADDRESS STRING, C_NATIONKEY INT, C_PHONE STRING, C_ACCTBAL DOUBLE, C_MKTSEGMENT STRING, C_COMMENT STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer'; create external table orders ( O_ORDERKEY INT, O_CUSTKEY INT, O_ORDERSTATUS STRING, O_TOTALPRICE DOUBLE, O_ORDERDATE STRING, O_ORDERPRIORITY STRING, O_CLERK STRING, O_SHIPPRIORITY INT, O_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders'; -- create target tables create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string); create table q22_customer_tmp1(avg_acctbal double); create table q22_orders_tmp(o_custkey int); create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double); -- the query insert overwrite table q22_customer_tmp select c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode from customer where substr(c_phone, 1, 2) = '13' or substr(c_phone, 1, 2) = '31' or substr(c_phone, 1, 2) = '23' or substr(c_phone, 1, 2) = '29' or substr(c_phone, 1, 2) = '30' or substr(c_phone, 1, 2) = '18' or substr(c_phone, 1, 2) = '17'; insert overwrite table q22_customer_tmp1 select avg(c_acctbal) from q22_customer_tmp where c_acctbal > 0.00; insert overwrite table q22_orders_tmp select o_custkey from orders group by o_custkey; insert overwrite table q22_global_sales_opportunity select cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal from ( select cntrycode, c_acctbal, avg_acctbal from q22_customer_tmp1 ct1 join ( select cntrycode, c_acctbal from q22_orders_tmp ot right outer join q22_customer_tmp ct on ct.c_custkey = ot.o_custkey where o_custkey is null ) ct2 ) a where c_acctbal > avg_acctbal group by cntrycode order by cntrycode;
The Pig is comparably simple:
SET default_parallel $reducers; customer = load '$input/customer' USING PigStorage('|') as ( c_custkey:long, c_name:chararray, c_address:chararray, c_nationkey:int, c_phone:chararray, c_acctbal:double, c_mktsegment:chararray, c_comment:chararray); orders = load '$input/orders' USING PigStorage('|') as ( o_orderkey:long, o_custkey:long, o_orderstatus:chararray, o_totalprice:double, o_orderdate:chararray, o_orderpriority:chararray, o_clerk:chararray, o_shippriority:long, o_comment:chararray); customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17'; customer_filter_group = group customer_filter all; avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal; customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17'; customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey; customer_trd_filter = filter customer_orders_left by o_custkey is null; customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal; customer_result_group = group customer_rows by cntrycode; customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal; customer_result_inorder = order customer_result by group; store customer_result_inorder into '$output/Q22out' USING PigStorage('|');
Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.
Try it with Sandbox
Hortonworks Sandbox is a self-contained virtual machine with Apache Hadoop pre-configured alongside a set of hands-on, step-by-step Hadoop tutorials.