Category Archives: Hive


Hive 0.11, Stinger and SQL-Compatibility

The release of Hive 0.11 is exciting and represents a big step forward to delivery of Project Stinger  and SQL-IN-Hadoop.  There is still some work to be done however.  We look forward to delivery of Hadoop 2 with YARN and the Apache Tez project as being huge increases to Hive performance, but this is not the only goal of Stinger.

SQL-In-Hadoop simply can’t be SQL without SQL compatibility

Today, HiveQL provides a fairly good set of SQL data types and semantics and while this (or a subset thereof) may be good enough for some of the “on” Hadoop solutions, we feel there needs to be more, especially if Hadoop and Hive are to meet the stringent requirements of enterprise class business analytics. To this end, we have set a goal of compatibility with most of SQL-92 and beyond with some SQL-2003 extensions.

The release of Apache Hive 0.11 pushes us further towards SQL-compatibility with the decimal data type becoming more usable (JIRA HIVE-4271) and the addition of analytic functions for windowing and aggregates.  It also vastly improves joins and all the while improves performance.  Awesome.

What else?

There is a lot more work to be done however and well work with the community to get it done.  Hive 0.11 had contributions from over 50 community members to close over 380 Jira tickets.  That is astounding and a huge proof point of the open community and its unrivaled capability to innovate faster than any proprietary solution.

We will reach our goal soon.  Here is what’s left to be done:

sqlcompat

We look forward to providing updates to Hive all summer long!

Apache Hive 0.11: Stinger Phase 1 Delivered

In February, we announced the Stinger Initiative, which outlined an approach to bring interactive SQL-query into Hadoop.  Simply put, our choice was to double down on Hive to extend it so that it could address human-time use cases (i.e. queries in the 5-30 second range). So, with input and participation from the broader community we established a fairly audacious goal of 100X performance improvement and SQL compatibility.

Introducing Apache Hive 0.11 – 386 JIRA tickets closed

As representatives of this open, community led effort we are very proud to announce the first release of the new and improved Apache Hive, version 0.11.  This substantial release embodies the work of a wide group of people from Microsoft, Facebook , Yahoo, SAP and others.  Together we have addressed 386 JIRA tickets, of which there were 28 new features and 276 bug fixes. There were FIFTY-FIVE developers involved in this and I would like to thank every one of them.  See below for a full list.

Delivering on the promise of Stinger Phase 1

As promised we have delivered phase 1 of the Stinger Initiative in late spring.  This release is another proof point that that the open community can innovate at a rate unequaled by any proprietary vendor.  As part of phase 1 we promised windowing, new data types, the optimized RC (ORC) file and base optimizations to the Hive Query engine and the community has delivered these key features.

Untitled

Key features in Hive 0.11

  • ORCFile.  It’s Optimized.
    The ORC File (Optimized RC File) presents key new features that speed access of data Apache Hive as it adds meta information at the file and block data level so that queries can be more intelligent and use meta data to optimize access.  Further, with the ORC file, only the bytes from the required columns are read from HDFS which minimizes I/O and speeds the query chain.  These are major advances for improved performance in Hive.
  • Improved Data Types
    As Apache Hive marches towards full SQL-compatibility, an update to the decimal data type was made more usable.
  • Analytic Functions
    Hive 0.11 introduces windowing functions for RANK, LEAD/LAG, ROW_NUMBER, FIRST_VALUE, LAST_VALUE and more. It also introduces aggregate OVER functions with PARTITION BY and ORDER BY
  • Joins improved in Hive 0.11
    Both the broadcast join and the SMB join were improved considerably in Hive 0.11.  Both joins work without user hints, so that the Hive optimizer now picks the correct join rather than depending on the user to do so. More broadcast joins are now packed into a single MapReduce job, making star join queries much more efficient.

Towards YARN and the Power of SQL-IN-Hadoop

Hadoop 2.0 and explicitly YARN turns Hadoop from a single application system to a multi-application operating system.  The next generation of Apache Hive, built on YARN, becomes part of the platform itself and can be managed by YARN to ensure that multiple use cases can be addressed beyond interactive query.  It is the delivery of a multi-application data system.  In this new world, Hive is a first class citizen along with a variety of workloads within a cluster and resources can be managed more discreetly.

Ultimately, this leads to further performance enhancements for Hive and with the inclusion of Tez, we will be able to demonstrate even more significant improvements as service startup times are removed a newly optimized execution chain within core Hadoop is delivered.  The near future is exciting!

Apache Hive is empowering an ecosystem of SQL Based Applications

This release represents significant enhancements to Hive that will improve direct SQL interaction with Hive and light up the hundreds of applications that already rely on Hive as the defacto SQL interface for Hadoop.  If you are one of the hundreds of software companies using Hive already, we hope you test out this new release and are happy with the results.  We look forward to supporting it in HDP 1.3 in the very near future.  ;)

Thank You to the Community

Thanks to 55 developers who contributed time and effort on this release: Alan Gates, Amareshwari Sriramadasu, Andrew Chalfant, Arup Malakar, Ashish Singh, Ashish Vaidya, Ashutosh Chauhan, Bennie Schut, Bhushan Mandhani, Billie Rinaldi, Brock Noland, Carl Steinbach, Chen Chun, Chris Drome, Dilip Joseph, Edward Capriolo, Gang Tim Liu, Gopal V, Gunther Hagleitner, Harish Butani, Ivan Gorbachev, Jarek Jarcec Cecho, Jean Xu, Jingwei Lu, Johnny Zhang, Jonathan Chang, Kevin Wilfong, Lars Francke, Li Yang, Mark Grover, Mayank Garg, Mikhail Bautin, Namit Jain, Navis, Nick Collins, Owen O’Malley, Pamela Vagata, Prajakta Kalmegh, Prasad Mujumdar, Roshan Naik, Sam Tunnicliffe, Samuel Yuan, Sean Busbey, Shreepadma Venugopalan, Sushanth Sowmyan, Teddy Choi, Thejas M Nair, Thiruvel Thirumoolan, Travis Crawford, Vikram Dixit K, Vinod Kumar Vavilapalli, Wonho Kim, Xiao Jiang, Zhenxiao Luo

Hadoop SDK and Tutorials for Microsoft .NET Developers

Microsoft has begun to treat its developer community to a number of Hadoop-y releases related to its HDInsight (Hadoop in the cloud) service, and it’s worth rounding up the material. It’s all Alpha and Preview so YMMV but looks like fun:

  • Microsoft .NET SDK for Hadoop. This kit provides .NET API access to aspects of HDInsight including HDFS, HCatalag, Oozie and Ambari, and also some Powershell scripts for cluster management. There are also libraries for MapReduce and LINQ to Hive. The latter is really interesting as it builds on the established technology for .NET developers to access most data sources to deliver the capabilities of the de facto standard for Hadoop data query.
  • HDInsight Labs Preview. Up on Github, there is a series of 5 labs covering C#, JavaScript and F# coding for MapReduce jobs, using Hive, and then bringing that data into Excel. It also covers some Mahout use to build a recommendation engine.
  • Microsoft Hive ODBC Driver. The examples above use this preview driver to enable the connection from Hive to Excel.

If all of the above excites you our Hadoop on Windows for Developers training course also similar content in a lot of depth.

You can read more about the partnership between Hortonworks and Microsoft here, and you can download a preview of HDP for Windows here, or sign up for HDInsight over here. And if you’re hungry for more Hadoop tutorials, grab our own Hortonworks Sandbox here.

Hive/HCatalog – Data Geeks & Big Data Glue

Unstructured data, semi-structured data, structured data… it is all very interesting and we are in conversations about big and small versions of each of these data types every day. We love it…  we are data geeks at Hortonworks. We passionately understand that if you want to use any piece of data for some computation, there needs to be some layer of metadata and structure to interact with it.  Within Hadoop, this critical metadata service is provided by HCatalog.

As a key component of Apache Hive, HCatalog is a metadata and table management system for the broader Hadoop platform. It enables the storage of data in any format regardless of structure. Hadoop can then process both structured and unstructured data and it can store and share information about data’s structure in HCatalog. This capability combined with the ‘schema on read’ nature of Hadoop versus traditional EDW ‘schema on write’ reduces cycle time for data scientists seeking insight as it encourages exploration and discovery on a continuous basis.

Similarly, Hive/HCatalog also enables sharing of data structure with external systems including traditional data management tools. It is the glue that enables these systems to interact effectively and efficiently and is a key component in helping Hadoop fit into the enterprise.

SQL Interface for Hadoop? HCatalog as enabler…

Since 2008, Hive has reigned as the defacto SQL interface for Hadoop as it provides a relational view through SQL like language to data within Hadoop. HCatalog publishes this same interface but abstracts it for data beyond Hive.  It also publishes a REST interface for external use so that your existing tools can interact with Hadoop in the way you expect… via ODBC and JDBC into SQL!

Good for the ecosystem is good for you

HCatalog intends to enable the ecosystem to more general SQL interaction to Hadoop. Our partners are building dedicated interfaces on top of this key interaction point to drive a Hadoop strategy within their products.  For instance, Teradata has created SQL-H on top of HCatalog as their default interface to Hadoop, enabling their users to query across this big data resource from existing tools. So now, as performance enhancements of Hive through the Stinger initiative progresses, their tools get better and better.

Hadoop Developer productivity and HCatalog

HCatalog also allows developers to share data and metadata across internal Hadoop tools such as Hive, Pig, and MapReduce. It allows them to create applications without being concerned how or where the data is stored, and insulates users from schema and storage format changes.  It is a repository for schema that can be referred to in these programming models so that you don’t have to explicitly type your structures in each program. It provides a command line tool for users who do not use Hive to operate on the metastore with Hive DDL statements.  It also provides a notification service so that workflow tools, such as Oozie, can be notified when new data becomes available in the warehouse.

HCatalog in Use

So how might you use HCatalog? Organizations today are using HCatalog in a variety of different ways, however, the key uses could be summarized as the following:

  • Enabling the Right Tool for the Right Job
    The majority of heavy Hadoop users do not use a single tool for data processing.  Often users and teams will begin with a single tool:  Hive, Pig, MapReduce, or another tool.  As their use of Hadoop deepens they will discover that the tool they chose is not optimal for the new tasks they are taking on.  Users who start with analytics queries using Hive discover they would like to use Pig for ETL processing or constructing their data models.  Users who start with Pig discover they would like to use Hive for analytics type queries.  While tools such as Pig and MapReduce do not require metadata, they can benefit from it when it is present.  Sharing a metadata store also enables users across tools to share data more easily.  A workflow where data is loaded and normalized using Map Reduce or Pig and then analyzed via Hive is very common.  When all these tools share one metastore users of each tool have immediate access to data created with another tool.  No loading or transfer steps are required.
  • Capture Processing States to Enable Sharing
    When used for analytics, users will discover information using Hadoop.  Again, they will often use Hive, Pig and Map Reduce to uncover information.  The information is valuable but typically only in the context of a larger analysis.  With HCatalog you can publish results so they can be accessed by your analytics platform via REST.  In this case, the schema defines the discovery. These discoveries are also useful to other data scientists.  Often they will want to build on what others have created or use results as input into a subsequent discovery.
  • Integrate Hadoop with everything
    Hadoop as a processing and storage environment opens up a lot of opportunity for the enterprise; however, to fuel adoption it must work with and augment existing tools.  Hadoop should serve as input into your analytics platform or integrate with your operational data stores and web applications.  The organization should enjoy the value of Hadoop without having to learn an entirely new toolset.  REST services opens up the platform to the enterprise with a familiar API and SQL-like language.  Enterprise data management systems use HCatalog to more deeply integrate with the Hadoop platform. By tieing in more closely they can hide complexity from users and create a better experience. A great example of this is the SQL-H integration from Teradata Aster. SQL-H queries the structure of data stored in HCatalog and exposes that back through to Aster enabling Aster to access just the relevant data stored within the Hortonworks Data Platform.

HCatalog is just one of many components of Apache Hadoop and the Hortonworks Data Platform. You can find out more here, including further integration points, and how Hortonworks provides the enterprise rigor to Apache Hadoop.

Week in Review: Sandboxes, HDP 2.0 Alpha 2, Hive Performance and Summits

Hadoop Summit It’s almost time for that final drive home of the week, and what a week it has been with a few new releases, a summit, and a little bit of technical fun. Here’s what happened:

New Sandbox Release. Yes, your favorite Hadoop VM image just got even better. Cheryle took us through the new features which included Ambari integration and Russell followed up with a quick tour of Ambari. There’s still plenty of time to download Sandbox for a weekend of data crunching fun.

HDP 2.0 Alpha 2 was released. This preview release demonstrates some of the performance improvements in store for the final HDP 2.0 release via YARN, enhancements to Hive per the Stinger Initiative, and Apache Tez. Just before the release, we posted some early test results which showed a 45X (yes, that’s forty five) performance improvement for Hive interactive queries. But that’s just the beginning as we push to 100X, and Microsoft also talked about their contributions to the Stinger Initiative with the same aim in mind.

If you’ve downloaded Sandbox and are looking for some inspiration for a little fun, then Russell also posted a two part series on extracting, loading, querying and analyzing your own Twitter archive with Hive. Part 1 is here, and Part 2 is here.

And finally, there was just the small matter of the Hadoop Summit in AmsterdamWe had a great time and hope you did too. Thank you for attending, contributing to the conversation and supporting Hadoop. If you’re now really excited to learn Hadoop, we posted about available training we have in Europe and Palo Alto.

And that was the week that was. Has your Sandbox downloaded yet?

Microsoft’s Contributions to the Stinger Initiative and Apache Hive

Guest blog post from Eric Hanson, Principal Program Manager, Microsoft

Hadoop had a crazy and collaborative beginning as an OSS project, and that legacy continues. There have been over 1,200 contributors across 80 companies since its beginning. Microsoft has been contributing to Hadoop since October 2011, and we’re committed to giving back and keeping it open.

Our first wave of contributions, in collaboration with Hortonworks, has been to port Hadoop to Windows, to enable it both for our HDInsight service on Windows Azure and for on-premises Big Data installations on Windows. Now, we’re starting to contribute to the Stinger initiative to dramatically speed up Hive and make it more enterprise-ready.

Contribution to the core of Apache Hadoop through Stinger

Our main activity in Stinger right now is around Tez, and vectorized query execution. One of our developers, Mike Liddell, has experience with DAG-based computations in Microsoft’s internal Dryad-LINQ effort, and has just joined Tez as a founding committer. I kick-started and helped guide our project to introduce columnstore data formats and vectorized (a.k.a. “batch mode”) query execution into SQL Server 2012.  After moving to the SQL Server Big Data team, I’ve been collaborating with Hortonworks developers since late last fall regarding how to make Hive faster. We heard about the ORC project, led by Owen O’Malley of Hortonworks, to improve the RCFile columnstore format. I’ve had several productive design discussions with Owen about ORC, and we really like the way it’s shaping up.

Based on our experience, we knew that a great columnstore format is only part of the story about making data warehouse-style queries run really fast. Good process and communication architecture is one – Tez is a great step there. Another is fast query execution (QE), and vectorized query execution research and field experience has shown it can speed up queries on the order of 10X-100X.

Some people were saying that fast QE required a total-rewrite in C++. I didn’t buy that, and I prototyped vectorized scan and filter operators in Java and shared this with Hortonworks. For simple conditions like column = constant, we’ve seen the ability to filter about 150 million rows per second on one thread in Java. We now have a two-company team introducing vectorized QE to Hive, consisting of two Hortonworks folks (Jitendra Pandey and Owen) and several Microsoft engineers. We’re going to take it in small steps, adding vectorized scans over ORC, and basic filter operations first. Then we’ll move on to vectorized aggregates and joins.

We think that the functional surface area of Hive, including its SQL query language, the open, extensible storage model over HDFS, and its easy programmer extensibility with Java UDFs, is quite compelling. It gives non-procedural access to Big Data, with ability for programmers to create custom Java add-ins that let them do complex calculations more easily that they can with Map-Reduce programs. Hive also has a strong community of OSS developers and users. It works on ultra-scale clusters on data sets vastly bigger than total cluster memory. Stinger aims to boost the speed of Hive to complement its rich functionality in a way that users will love.

An active participant in the open community

We’ve been part of OSS Big Data world for about a year and half now. Through the combined efforts of the overall Hadoop community, Microsoft, and Hortonworks, Hadoop is now accessible on Windows Server and Windows Azure. We’ve gained so much from the community. Now we’re helping return the favor by contributing to Stinger, with our eye on 100X performance gains.

HOWTO use Hive to SQLize your own Tweets – Part Two: Loading Hive, SQL Queries

In part one of this series, we covered how to download your tweet archive from Twitter, ETL it into json/newline format, and to extract a Hive schema. In this post, we will load our tweets into Hive and query them to learn about our little world.

To load our tweet-JSON into Hive, we’ll use the rcongiu Hive-JSON-Serde. Download and build it via:

wget http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar
mvn install:install-file -DgroupId=javax.jdo -DartifactId=jdo2-api \
  -Dversion=2.3-ec -Dpackaging=jar -Dfile=jdo2-api-2.3-ec.jar
mvn package

Find the jar it generated via:

find .|grep jar
./target/json-serde-1.1.4-jar-with-dependencies.jar
./target/json-serde-1.1.4.jar

Run hive, and create our table with the following commands:

add jar /path/to/my/Hive-Json-Serde/target/json-serde-1.1.4-jar-with-dependencies.jar;

create table tweets (
   created_at string,
   entities struct <
      hashtags: array ,
            text: string>>,
      media: array ,
            media_url: string,
            media_url_https: string,
            sizes: array >,
            url: string>>,
      urls: array ,
            url: string>>,
      user_mentions: array ,
            name: string,
            screen_name: string>>>,
   geo struct <
      coordinates: array ,
      type: string>,
   id bigint,
   id_str string,
   in_reply_to_screen_name string,
   in_reply_to_status_id bigint,
   in_reply_to_status_id_str string,
   in_reply_to_user_id int,
   in_reply_to_user_id_str string,
   retweeted_status struct <
      created_at: string,
      entities: struct <
         hashtags: array ,
               text: string>>,
         media: array ,
               media_url: string,
               media_url_https: string,
               sizes: array >,
               url: string>>,
         urls: array ,
               url: string>>,
         user_mentions: array ,
               name: string,
               screen_name: string>>>,
      geo: struct <
         coordinates: array ,
         type: string>,
      id: bigint,
      id_str: string,
      in_reply_to_screen_name: string,
      in_reply_to_status_id: bigint,
      in_reply_to_status_id_str: string,
      in_reply_to_user_id: int,
      in_reply_to_user_id_str: string,
      source: string,
      text: string,
      user: struct <
         id: int,
         id_str: string,
         name: string,
         profile_image_url_https: string,
         protected: boolean,
         screen_name: string,
         verified: boolean>>,
   source string,
   text string,
   user struct <
      id: int,
      id_str: binary,
      name: string,
      profile_image_url_https: string,
      protected: boolean,
      screen_name: string,
      verified: boolean>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;

Load it full of data from the tweet JSON file we created last tutorial:

LOAD DATA LOCAL INPATH '/path/to/all_tweets.json' OVERWRITE INTO TABLE tweets;

Verify our data loaded with a count:

SELECT COUNT(*) from tweets;
OK
24655

Our tweets are loaded! Some fun queries to run:

    • Sample some tweets
SELECT text from tweets limit 5

Which gets us:

OK
Paddled out, tried to psyche myself into wave for 30 minutes...
Waves twice as tall as me are scary
No waves here yet, nap time
Doin 80 on i10w
Gustav and panama city beach here I come
    • Top people we reply to:
SELECT in_reply_to_screen_name, 
  COUNT(*) as total from tweets 
  GROUP BY in_reply_to_screen_name 
  ORDER BY total DESC 
  LIMIT 30;

Which gets us the top N people I reply to:

OK
NULL	13447
sanjay	356
Urvaksh	282
ChrisDiehl	268
pfreet	230
mikeschinkel	222
mmealling	193
keithmcgreggor	191
peteskomoroch	183
semil	183
...

Hive has some builtin n-gram analysis utilities, documented here that we can use. For example:

SELECT sentences(lower(text)) FROM tweets;
[["dear","twitter","send","me","my","tweets","plz","you","promised","me"]]
[["pig","eye","for","the","sql","guy","http","t.co","vjx4rcugix","via","sharethis"]]
[["rt","hortonworks","pig","eye","for","the","sql","guy","with","mortardata","http","t.co","vnkwsswnkv","hadoop"]]

We can use these to do n-gram analysis:

SELECT ngrams(sentences(lower(text), 3, 10) FROM tweets;

Which is kind of amusing:

[{"ngram":["http","instagr.am","p"],"estfrequency":136.0},
{"ngram":["i","want","to"],"estfrequency":100.0},
{"ngram":["on","hacker","news"],"estfrequency":92.0},
{"ngram":["you","have","to"],"estfrequency":66.0},
{"ngram":["a","lot","of"],"estfrequency":65.0},
{"ngram":["i","need","to"],"estfrequency":63.0},
{"ngram":["is","looking","for"],"estfrequency":59.0},
{"ngram":["hortonworks","is","looking"],"estfrequency":59.0},
{"ngram":["there","is","no"],"estfrequency":56.0},{"ngram":["is","there","a"],"estfrequency":53.0}]

You can see common phrases, as well as hortonworks job offerings that are auto-tweeted, and of course – ‘on hacker news’ – talking about Hacker News :)

We can also check out our tweets that are RTs.

SELECT retweeted_status.user.screen_name, COUNT(*) as total 
  FROM tweets 
  WHERE retweeted_status.user is not null 
  GROUP BY retweeted_status.user.screen_name 
  ORDER BY total desc 
  LIMIT 20;

This gets me:

OK
peteskomoroch	99
hortonworks	97
ChrisDiehl	56
newsycombinator	55
newsyc20	38
adamnash	31
bradfordcross	29
kjurney	29

Once we have our tweets in Hive, there’s no limit to what we can do to them! This is what Hive excels at.

Hortonworks Data Platform 2.0 Alpha 2 now available: focus on performance

We are very pleased to announce the Alpha 2 release of the Hortonworks Data Platform 2.0 (HDP 2.0 Alpha2) is now available for download!

A key focus in HDP 2.0 Alpha 2 is on performance as announced in the Stinger initiative, and includes a series of enhancements to the performance of Apache Hive for interactive SQL queries.  In fact HDP 2.0 Alpha 2 was used to perform the tests announced yesterday, showing a 45X performance increase using Hive.  There is much more to come but we are pleased with the early results, and encourage Hive users to take a look and continue to give us feedback.

Consistent with HDP 2.0 Alpha 1, this version is built from the developmental Apache Hadoop 2.0 line and includes Apache YARN, a next-generation resource-management and application framework that enables Hadoop to support an ever-expanding range of use cases.  We are extremely excited about the opportunities that YARN enables – for background, check out Arun Murthy’s blog post series where he provides a YARN overview.

Notable new components over Alpha 1 include:

  • Apache Tez: A new Apache project that provides an optimized data processing framework on top of YARN. Tez is a general-purpose, highly customizable framework that simplifies data processing tasks across both small-scale, low-latency and large-scale, high-throughput workloads in Hadoop. Tez can provide an order of magnitude performance boost for the broader ecosystem of data processing tools such as Apache Hive and Apache Pig.
  • Apache Hive Interactive Query: Beyond the speedups made possible by Apache Tez, several new features were added to speed Hive queries. A new file format called the ORCFile (optimized RC file) optimizes how data is stored and accessed in Hive, and significant query optimizations reduce latency and improve performance.

Note that Tez is not enabled by default.  Instructions for doing so, and allowing Hive to use Tez, are in the installation guide.

Learn More
Please take a look at the Hortonworks Documentation to learn more about installing and using HDP 2.0 Alpha 2.

Download It
You can download HDP 2.0 Alpha 2 from the Hortonworks Download site.

Tell Us About It
Please visit the HDP 2.0 Alpha Forum to ask questions, get help, provide feedback and hear what others are doing with HDP. 

We are excited about the opportunities that Hadoop 2 provides for the future of Hadoop and large-scale data processing. HDP 2.0 Alpha 2 is a key milestone that provides organizations with a packaged release to evaluate and gain experience with the upcoming Apache Hadoop 2 technology stack. We look forward to your feedback on HDP 2.0 Alpha 2 while we work with the community to make Hadoop 2 a stable reality. Enjoy!

Note: This Alpha release is a technology preview to gather feedback from outside of Hortonworks. Some features are missing or incomplete. Some APIs may change. Do not use Alpha 2 for production use. Keep away from open flame. Support is only available via Forums.

HOWTO use Hive to SQLize your own Tweets – Part One: ETL and Schema Discovery

Note: Continued in part two

Your Twitter Archive

Twitter has a new feature, Your Twitter Archive, that enables any user to download their tweets as an archive. To view this feature, look at the bottom of the page at your account settings page. There should be an option for ‘Your Twitter archive,’ which will generate your tweets as a json/javascript web application and send them to you in email as a zip file.

download_tweets_email
Be patient: this process can take several days, in particular if you’ve lots of tweets (I personally have 24K tweets, and it took 4-5 days to get my tweets).

save_tweets
After a few hours or days, you’ll receive an email with a download link. Download your tweets, and unzip them to reveal their contents.

tweet_files

Digging In: ETL

There is a file called tweets.csv, but that is not the file we are interested in. It has very little detail. The files we are interested is in, which contain all the tweet data, are in the data/js/tweets directory. There is one file per month, laid out like this:

2008_08.js	2009_03.js	2009_10.js	2010_05.js	2010_12.js	2011_07.js	2012_02.js	2012_09.js
2008_09.js	2009_04.js	2009_11.js	2010_06.js	2011_01.js	2011_08.js	2012_03.js	2012_10.js
2008_10.js	2009_05.js	2009_12.js	2010_07.js	2011_02.js	2011_09.js	2012_04.js	2012_11.js
2008_11.js	2009_06.js	2010_01.js	2010_08.js	2011_03.js	2011_10.js	2012_05.js	2012_12.js
2008_12.js	2009_07.js	2010_02.js	2010_09.js	2011_04.js	2011_11.js	2012_06.js	2013_01.js
2009_01.js	2009_08.js	2010_03.js	2010_10.js	2011_05.js	2011_12.js	2012_07.js	2013_02.js
2009_02.js	2009_09.js	2010_04.js	2010_11.js	2011_06.js	2012_01.js	2012_08.js	2013_03.js

Inconveniently, the first line of each file is javascript:

Grailbird.data.tweets_2008_08 =

The first thing we’ll need to do us remove that line. Once we do so, the file is a large json array. Once we have an array, we can easily convert to the JSON format that Hive expects: one json object per newline.

I’ve created a python script that removes the first line of text, and converts and prints a one-json-object-per-newline format here: convert.py.

#!/usr/bin/env python

import sys, os, glob
import json

tweet_base = sys.argv[1]

# Open tweets
os.chdir(tweet_base + "/data/js/tweets")
for filename in glob.glob("*.js"):
  f = open(filename)
  lines = f.readlines()

  # Chop the first line (its javascript), parse the resulting array of tweets
  entire = ''
  for line in lines[1:]:
    entire += line
  tweets = json.loads(entire)

  # Now print, one json object per line
  for tweet in tweets:
    print json.dumps(tweet)

# Done - we've printed the tweets, one json record per line

To run the code:

convert.py ~/Downloads/tweets > all_tweets.json

Amusingly, my first tweet is about being terrified surfing Hurricane Gustav:

{
    "entities": {
        "user_mentions": [],
        "media": [],
        "hashtags": [],
        "urls": []
    },
    "text": "Paddled out, tried to psyche myself into wave for 30 minutes, then was afraid to come in for 30 more. \"I jusy want to go home.\"",
    "created_at": "Mon Sep 01 01:16:47 +0000 2008",
    "source": "Twinkle",
    "id_str": "905266904",
    "geo": {},
    "id": 905266904,
    "user": {
        "verified": false,
        "name": "Russell Jurney",
        "profile_image_url_https": "http://si0.twimg.com/profile_images/2964060639/9a98c1eb08f57472743caa4a5ae3260b_normal.jpeg",
        "protected": false,
        "id_str": "15831927",
        "id": 15831927,
        "screen_name": "rjurney"
    }
}

Schema Discovery

To load our tweets into Hive, we need a schema. There is no explicit schema for json data, we must infer it. Along these lines, Hortonworks co-founder Owen O’Malley created a tool, available on github as hive-json, that extracts a Hive schema from a collection of JSON documents. Given a collection of schemas, the output schema will be the superset of them all, which creates a reasonable SQL schema: optional fields are often null.

bin/find-json-schema ~/Downloads/all_tweets.json

Reading /Users/rjurney/Software/hive_tweets/tweets.json
24655 records read

create table tbl (
   created_at string,
   entities struct <
      hashtags: array ,
            text: string>>,
      media: array ,
            media_url: string,
            media_url_https: string,
            sizes: array >,
            url: string>>,
      urls: array ,
            url: string>>,
      user_mentions: array ,
            name: string,
            screen_name: string>>>,
   geo struct <
      coordinates: array ,
      type: string>,
   id bigint,
   id_str string,
   in_reply_to_screen_name string,
   in_reply_to_status_id bigint,
   in_reply_to_status_id_str string,
   in_reply_to_user_id int,
   in_reply_to_user_id_str string,
   retweeted_status struct <
      created_at: string,
      entities: struct <
         hashtags: array ,
               text: string>>,
         media: array ,
               media_url: string,
               media_url_https: string,
               sizes: array >,
               url: string>>,
         urls: array ,
               url: string>>,
         user_mentions: array ,
               name: string,
               screen_name: string>>>,
      geo: struct <
         coordinates: array ,
         type: string>,
      id: bigint,
      id_str: string,
      in_reply_to_screen_name: string,
      in_reply_to_status_id: bigint,
      in_reply_to_status_id_str: string,
      in_reply_to_user_id: int,
      in_reply_to_user_id_str: string,
      source: string,
      text: string,
      user: struct <
         id: int,
         id_str: string,
         name: string,
         profile_image_url_https: string,
         protected: boolean,
         screen_name: string,
         verified: boolean>>,
   source string,
   text string,
   user struct <
      id: int,
      id_str: binary,
      name: string,
      profile_image_url_https: string,
      protected: boolean,
      screen_name: string,
      verified: boolean>
)

So we’ve got JSON/newline version of our tweets, and a schema for them in Hive. In our next post, we’ll use the Hive-JSON-Serde to load the tweets and begin our analysis!

Continued in part two

Stinger Early Results: 45X Performance Increase for Hive

Written with Vinod Kumar Vavilapalli and Gopal Vijayaraghavan

A few weeks back we blogged about the Stinger Initiative and set a promise to work within the open community to make Apache Hive 100 times faster for SQL interaction with Hadoop. We have a broad set of scenarios queued up for testing but are so excited about the early results of this work that we thought we’d take the time to share some of this with you.

In order to get a fair assessment we styled our tests after the TPC Benchmark™ DS (TPC-DS). For this initial report, we provide detail around two of the most common use cases and as we execute more queries and make more improvements we will provide more detail.

Performance Tests & Environment

In this report we provide results for two of our performance queries.  In the first, we perform a star schema join where we load all the small tables in memory and do a scan through the fact table independently on all nodes. In the second query, we perform a join between two large fact tables, both of which are too large to fit in memory.

Our test environment comprised of a 10 node ec2 cluster with a total of 100 containers over 40 disks, to obtain query execution times with Hive on raw data and with Hive with all the optimizations enabled on partitioned data stored in RCFile format. We also use a scale factor of 200, which implies a data set of around 200GB.

Results

For the first query, we’ve calculated as much as a 35X improvement over native Hive and have reduced query times from around 1400 seconds to 39! And for second query we calculate a whopping 45X improvement… all in open source Apache Hive.

StingerQuery1  StingerQuery2

This is a preliminary look at test results but it indicates significant improvements already. We are pretty excited about the results, as the work has only just begun.  The test results above do not even include the Tez improvements, nor do they include the new ORCFile format!  And there are still a handful of other Hive specific improvements coming.  Some of the improvements are included in these tests include HIVE-3784, HIVE-3952 and HIVE-2340.  A before and after of the execution looks like this:

BeforeStinger

AfterStinger

In subsequent posts we will deliver more explicit results and provide a more in-depth specifications for hardware/software used for the benchmarks. We’ll also describe few other efforts, which will complete our story of attaining 100x gains we talked about earlier, so stay tuned.

We truly believe that the fastest path to innovation is the open community and this is a great example of how quick the community can prove this true.  These advances are not attributed to Hortonworks alone; they were completed in partnership with the community with resources from Yahoo!, Facebook, Twitter, SAP and Microsoft all contributing.

NOTE: We are talking about all of this and much more at Hadoop Summit Amsterdam. Attend our talk “Innovations In Apache Hadoop MapReduce, Pig and Hive for improving query performance” to learn more about this effort. “Optimizing Hive Queries” by Owen O’Malley and “What’s New and What’s Next in Apache Hive” by Gunther Hagleitner are two other talks that you should attend to learn more about other threads in our stinger initiative.

Pig Eye for the SQL Guy

Cat Miller is an engineer at Mortar Data, a Hadoop-as-a-service provider, and creator of mortar, an open source framework for data processing.

Introduction

For anyone who came of programming age before cloud computing burst its way into the technology scene, data analysis has long been synonymous with SQL. A slightly awkward, declarative language whose production can more resemble logic puzzle solving than coding, SQL and the relational databases it builds on have been the pervasive standard for how to deal with data.

As the world has changed, so too has our data; an ever-increasing amount of data is now stored without a rigorous schema, or must be joined to outside data sets to be useful. Compounding this problem, often the amounts of data are so large that working with them on a traditional SQL database is so non-performant as to be impractical.

Enter Pig, a SQL-like language that gracefully tolerates inconsistent schemas, and that runs on Hadoop. (Hadoop is a massively parallel platform for processing the largest of data sets in reasonable amounts of time. Hadoop powers Facebook, Yahoo, Twitter, and LinkedIn, to name a few in a growing list.)

This then is a brief guide for the SQL developer diving into the waters of Pig Latin for the first time. Pig is similar enough to SQL to be familiar, but divergent enough to be disorienting to newcomers. The goal of this guide is to ease the friction in adding Pig to an existing SQL skillset.

What’s Similar?

The basic concepts in SQL map pretty well onto Pig. There are analogues for the major SQL keywords, and as a result you can write a query in your head as SQL and then translate it into Pig Latin without undue mental gymnastics.

WHERE → FILTER

The syntax is different, but conceptually this is still putting your data into a funnel to create a smaller dataset.

HAVING → FILTER

Because a FILTER is done in a separate step from a GROUP or an aggregation, the distinction between HAVING and WHERE doesn’t exist in Pig.

ORDER BY → ORDER

This keyword behaves pretty much the same in Pig as in SQL.

JOIN

In Pig, joins can have their execution specified, and they look a little different, but in essence these are the same joins you know from SQL, and you can think about them in the same way. There are INNER and OUTER joins, RIGHT and LEFT specifications, and even CROSS for those rare moments that you actually want a Cartesian product.

Because Pig is most appropriately used for data pipelines, there are often fewer distinct relations or tables than you would expect to see in a traditional normalized relational database.

Control over Execution

SQL performance tuning generally involves some fiddling with indexes, punctuated by the occasional yelling at an explain plan that has inexplicably decided to join the two largest tables first. It can mean getting a different plan the second time you run a query, or having the plan suddenly change after several weeks of use because the statistics have evolved, throwing your query’s performance into the proverbial toilet.

Various SQL implementations offer hints to combat this problem—you can use a hint to tell your SQL optimizer that it should use an index, or to force a given table to be first in the join order. Unfortunately, because hints are dependent on the particular SQL implementation, what you actually have at your disposal varies by platform.

Pig offers a few different ways to control the execution plan. The first is just the explicit ordering of operations. You can write your FILTER before your JOIN (the reverse of SQL’s order) and be clever about eliminating unused fields along the way, and have confidence that the executed order will not be worse.

Secondly, the philosophy of Pig is to allow users to choose implementations where multiple ones are possible. As a result, there are three specialized joins that a can be used when the features of the data are known, and are less appropriate for a regular join. For regular joins, the order of the arguments dictates execution—the larger data set should appear last in this type of join.

As with SQL, in Pig you can pretty much ignore the performance tweaks until you can’t. Because of the explicit control of ordering, it can be useful to have a general sense of the “good” order to do things in, though Pig’s optimizer will also try to push up FILTERs and LIMITs, taking some of the pressure off.

What’s Different?

A Row By Any Other Name

The SQL paradigm is very straightforward—there are tables, and tables contain rows. Every select statement yields a set of rows, and each field in a row is a basic data type. Conceptually, the result of any SQL select statement can be imported into Excel without loss of information.

Pig introduces a mature nesting notion in tuples and data bags that changes the game significantly. Pig consists of data sets called relations (sometimes called aliases for the names they are given), and those contain records that are data tuples, which can in turn recursively contain data bags, data tuples, or data items. There is distinct lack of flatness in Pig, and the best way to see it is to explore how GROUP works.

Not Your Grandmother’s GROUP

In the handling of GROUP, SQL and Pig diverge significantly. SQL’s GROUP doesn’t exist outside of the aggregation performed on it; you would never SELECT * GROUP by field1–it just doesn’t make sense. Because everything in SQL is a row, the grouping created isn’t persistent—only the data produced aggregating over it remains.

Pig’s GROUP is an entirely different beast, albeit used for the same purpose. It is a persistent relation that can be used again and again, independent of what aggregations you might choose to perform on it later.

student_grades = GROUP grades by student;

This student_grades relation has two fields: one called group and populated with the value of student, the other called grades populated with a data bag containing tuples for all the entries with the same value of student.

group	grades
alyssa 	{< hacking101, 95 >, < english, 60 >}
ben	{< math, 90 >}

Now to do an aggregation, perform it on the student_grades alias.

average_grades = FOREACH student_grades GENERATE group, AVG(grades.value);
Procedural Paradise

This is the first thing any Google search on Pig will tell you, and it is the most glaringly obvious change from SQL. After having taught your brain for years how to turn an idea inside-out and mash all of its pieces into one query, Pig makes query writing feel like writing Java or C++. In addition to to obvious potential cognitive benefits, this has some technical ones as well.

Subquery Reuse

Ever write a query with a subselect, and then realize you actually needed to use that subtable twice in the query? Did you feel absolutely awful as you cut and pasted that subtable? Did you wonder whether your SQL plan would successfully manage to not calculate it twice? (Note: the WITH clause mitigates this pain in a lot of cases, but isn’t available in all flavors of SQL.)

Because in Pig Latin every step has a declared alias, reusing “subquery” tables is natural and intuitive, and generally does not involve building them twice.

Getting multiple queries out of one pipeline

In SQL you can find yourself in a place where you want to use the data, do some manipulation on it, and then take it in a few different directions. To do this in one query requires profligate use of JOIN, and enough parens to intimidate a LISP hacker.

In a Pig pipeline, any and all aliases produced along the way can be stored, and all it takes is adding a new STORE statement to the script.

User-Defined Functions

SQL has had decades for people to figure out what analytic functions they need for arbitrary data analysis, and so when you find yourself suddenly interested in extracting the day of the week from a date, that function is ready and waiting.

Pig’s list of built-in functions is growing, but is still dwarfed by what Oracle or MYSQL provides. What turns this into a tolerable constraint is that Pig allows the user to define aggregate or analytic functions in other languages (Java, Python, and others) and then apply them in Pig quickly and without fuss.

REGISTER udf.jar;

new_data = FOREACH my_data GENERATE udf.ImportantFunction(field1);

The Well-Disguised SQLer

In general, if you can think about it in SQL, you can do it in Pig. Be aware of the nested data structures, have a cheat sheet for syntax, and relish the ability to write queries the way your brain thinks them, and not the way SQL demands.

As a final thought, let’s resurrect our old friend the emp table, and take a look at some SQL to Pig Latin examples.

Average Salary by Location
SQL
SELECT loc, AVG(sal) FROM emp JOIN dept USING(deptno) WHERE sal > 3000 GROUP BY loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY loc;

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);
Ordered Average Salary by Location

Suppose now that the following is true.

● The ‘loc’ field is a string/varchar field, and we have two pieces of software that automatically populate it. One stores values as lowercase, one as uppercase. (If this seems like a contrived example to you, you have chosen your employers and software vendors well.)

The new parts of the queries appear in bold.

SQL
SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno) WHERE sal > 3000) std_table GROUP BY standard_loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);

This kind of change is friendlier to Pig because of the limitations in SQL’s GROUP BY clause; the trade-off of verboseness for clarity increases in value the more complex your query gets.

Above Average Salary for Location

Now suppose instead of arbitrarily selecting 3,000 as a threshold, which is going to overselect people living in large expensive cities, we want to select those employees who make more than twice the average for their location.

SQL

There are several ways to accomplish this, of course, but for illustration purposes the most vanilla version is shown here.

SELECT empno FROM 
(SELECT empno, UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table1 JOIN
(SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table2 GROUP BY standard_loc) grp_table2
USING(standard_loc) WHERE sal > (2 * avg_salary);
Pig Latin
emp_join_dept = JOIN emp BY deptno, dept by deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

loc_avg_salary = FOREACH grouped_by_loc GENERATE AVG(emp_join_dept.sal) as avg_salary, FLATTEN(emp_join_dept);

highly_paid = FILTER loc_avg_salary BY sal > (2 * avg_salary);

Here the dreaded subquery reuse rears its unattractive head in SQL and leads to a bit of a frankenquery. In contrast, the Pig script stays the same length, because we can effectively just shift the FILTER from the beginning to the end of our data flow. FLATTEN is a new entry to our function arena, for which there is no SQL analogue. What FLATTEN does is unnest tuples and data bags; in this example it’s taking the data bag ‘emp_join_dept’ created by the GROUP function, and removing the nesting so that the fields within it will be at the same level as avg_salary.

Run It Yourself for the Swine-Curious

If you want to to try out some Pig examples hands-on, you can get a free Mortar account here. We’ve generated a one million row emp table data set, so to run these examples on your own all you need to do is:

    1. Sign up at app.mortardata.com for a free account.
    2. Go to Web Projects
    3. Select My Web Projects -> New Blank project
    4. To load the two data files, you need LOAD statements, and to store the resulting data you need a STORE statement, so in the end your pig script should look like this:
dept = LOAD 's3n://mortar-example-data/employee/dept.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (deptno:int, dname:chararray, loc:chararray);

emp = LOAD 's3://mortar-example-data/employee/emp.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (empno:int, ename:chararray, job:chararray, mgr:int, hiredate:chararray, sal:float, deptno:int);

-- Pig example code here

STORE avg_salary INTO 's3n://mortar-example-output-data/$MORTAR_EMAIL_S3_ESCAPED/emp_example’ USING PigStorage('\t');
  1. Click Run and select a 2-node cluster. Even with one million rows, it runs in under three minutes. Sit back and watch the power of modern computing tackle the problems of the 1980s.

You can also try out Pig in the Hortonworks Sandbox.

Introducing… Tez: Accelerating processing of data stored in HDFS

 

MapReduce has served us well.  For years it has been THE processing engine for Hadoop and has been the backbone upon which a huge amount of value has been created.  While it is here to stay, new paradigms are also needed in order to enable Hadoop to serve an even greater number of usage patterns.  A key and emerging example is the need for interactive query, which today is challenged by the batch-oriented nature of MapReduce.  A key step to enabling this new world was Apache YARN and today the community proposes the next step… Tez

What is Tez?

Tez – Hindi for “speed” – (currently under incubation vote within Apache) provides a general-purpose, highly customizable framework that creates simplifies data-processing tasks across both small scale (low-latency) and large-scale (high throughput) workloads in Hadoop. It generalizes the MapReduce paradigm to a more powerful framework by providing the ability to execute a complex DAG (directed acyclic graph) of tasks for a single job so that projects in the Apache Hadoop ecosystem such as Apache Hive, Apache Pig and Cascading can meet requirements for human-interactive response times and extreme throughput at petabyte scale (clearly MapReduce has been a key driver in achieving this).

With the emergence of Apache Hadoop YARN as the basis of next generation data-processing architectures, there is a strong need for an application which can execute a complex DAG of tasks which can then be shared by Apache Pig, Apache Hive, Cascading and others.  The constrained DAG expressible in MapReduce (one set of maps followed by one set of reduces) often results in multiple MapReduce jobs which harm latency for short queries (overhead of launching multiple jobs) and throughput for large-scale queries (too much overhead for materializing intermediate job outputs to the filesystem). With Tez, we introduce a more expressive DAG of tasks, within a single application or job, that is better aligned with the required processing task – thus, for e.g., any given SQL query can be expressed as a single job using Tez.

The below graphic illustrates the advantages provided by Tez for complex SQL queries in Apache Hive or complex Apache Pig scripts.

pighivetez

Tez is critical to the Stinger Initiative and goes a long way in helping Hive support both interactive queries and batch queries. Tez provides a single underlying framework to support both latency and throughput sensitive applications, there-by obviating the need for multiple frameworks and systems to be installed, maintained and supported, a key advantage to enterprises looking to rationalize their data architectures. .

Essentially, Tez is the logical next step for Apache Hadoop after Apache Hadoop YARN. With YARN the community generalized Hadoop MapReduce to provide a general-purpose resource management framework (YARN) where-in MapReduce became merely one of the applications that could process data in your Hadoop cluster. With Tez, we build on YARN and our experience with the MapReduce to provide a more general data-processing application to the benefit of the entire ecosystem i.e. Apache Hive, Apache Pig etc.

What has been completed? Where can Tez go?

An early version of the project has been donated to the ASF as part of the initial code grant to establish the Incubation project.   Through the work done in the Stinger initiative, it is already clear that Tez enables and order of magnitude increase in the performance of Apache Hive.

The community is also designing a re-usable set of libraries of data-processing primitives such as sorting, merging, data-shuffling, intermediate data management etc. which are necessary for Tez and may be used directly by other projects.  This is just the beginning.  It is an extensible architecture that will undoubtedly be contributed to widely.

For the community, by the community

At Hortonworks we believe that innovation happens fastest by working with a community of like-minded individuals to address the requirements for Hadoop without being bounded by artificial boundaries such as employment. As such, even though the Hortonworks MapReduce/Hive/Pig team seeded the project, we’ve had the benefit of both positive feedback and constructive criticism from several leading contributors and committers across the Apache Hadoop MapReduce, Apache Hive & Apache Pig projects.  These inventors and peers are employed at Hortonworks, Yahoo, Facebook, Microsoft, Twitter and many others.  The initial committer list has 22 participants with deep domain expertise in these unique challenges and comprises a who’s who in the Hadoop world.  Of course, now that we are nearly in a position where we can co-develop via the Apache Software Foundation where we have proposed Tez as an Incubator project, we expect a very quick acceleration of project development.

When will it be available?

We plan to donate the code from our internal repository to the ASF as part of the Incubator proposal.  Also, Hortonworks will ship Tez in the next alpha release of Hortonworks Data Platform 2 (HDP2), based on Apache Hadoop 2.0, very soon to showcase some of the very exciting advances we have made for Apache Hive via Project Stinger.

We are very excited by the reception Tez has received so far, and we do hope you can join us in this initiative via the Apache Software Foundation project to make Hadoop better!

The Stinger Initiative: Making Apache Hive 100 Times Faster

 

Introduced by Facebook in 2007, Apache Hive and its HiveQL interface has become the de facto SQL interface for Hadoop.  Today, companies of all types and sizes use Hive to access Hadoop data in a familiar way and to extend value to their organization or customers either directly or though a broad ecosystem of existing BI tools that rely on this key proven interface.  The who’s who of business analytics have already adopted Hive.

Apache Hive was originally built for large-scale operational batch processing and it is very effective with reporting, data mining and data preparation use cases.  These usage patterns remain very important but with widespread adoption of Hadoop, the enterprise requirement for Hadoop to become more real time or interactive has increased in importance as well. At Hortonworks, we believe in the power of the open source community to innovate faster than any proprietary offering and the Stinger initiative is proof of this once again as we collaborate with others to improve Hive performance.

So, What is Stinger?

Enabling Hive to answer human-time use cases (i.e. queries in the 5-30 second range) such as big data exploration, visualization, and parameterized reports without needing to resort to yet another tool to install, maintain and learn can deliver a lot of value to the large community of users with existing Hive skills and investments.

To this end, we have launched the Stinger Initiative, with input and participation from the broader community, to enhance Hive with more SQL and better performance for these human-time use cases. All the while, HiveQL remains the same before and after these advancements so it just gets better. And in keeping with the ecosystem of existing tools, it is complementary to best-of-breed data warehouses and analytic platforms.

  • stingerRoadFirst, we are making Hive a more suitable tool for the decision support queries people want to perform on Hadoop.  This includes adding analytics features like the OVER clause, support for subqueries in WHERE, and aligning Hive’s type system more with the standard SQL model.
  • Second, we are optimizing Hive’s query execution plans and based on our initial changes, we have already seen query time drop by 90% on some of our test queries. We are also looking at additional changes inside Hive’s execution engine that we believe will significantly increase the number of records per second that a Hive task can process.
  • Third, we have introduced a new columnar file format (i.e. ORCFile) within the Hive community to provide a more modern, efficient, and high performing way to store Hive data.
  • And lastly, we’ve introduced a new runtime framework, called Tez, which aims to eliminate Hive’s latency and throughput constraints that result from its reliance on MapReduce. Tez optimizes Hive job execution by eliminating unnecessary tasks, synchronization barriers, and reads from and write to HDFS.  This optimizes the execution chain within Hadoop and drastically speeds Hive’s workload processing.

All of these modifications to Hive are underway in the open and an initial preview will be available in advance of Hadoop Summit Amsterdam in March.

Embrace the community, Embrace Hive…

A diverse group of individuals within the Hive community are collaborating on these efforts. As part f the community, a wide group of people contributed to this effort, including resources from SAP, Microsoft, Facebook and Hortonworks.

Harish Butani from SAP has led an effort to add analytics and windowing functions to Hive.  This will add the OVER clause for use with existing aggregate functions as well as adding analytics functions like RANK and NTILE and windowing functions like LEAD and LAG; you can see this work at HIVE-896.  Namit Jain from Facebook has been spending a lot of time lately optimizing Hive’s query execution planning so that it performs joins and other operations more efficiently and with less need for hints from the user.  Hortonworks engineers have been collaborating on these and other community efforts to improve Hive.

Owen O’Malley, a Hortonworks co-founder and early Hadoop developer, has been working with Facebook on the new ORCFile in order to greatly improve performance when Hive is reading, writing, and processing data; you can see this work at HIVE-3874. We are also working on farther reaching changes and optimizations such as reworking Hive’s operators to process records in blocks of a thousand or more and thus be much more efficient than it is today.

We believe the performance changes we are making today, along with the work being done in Tez will transform Hive into a single tool that Hadoop users can use to do report generation, ad hoc queries, and large batch jobs spanning 10s or 100s of terabytes.

Why reinvent the wheel?

Imperative and Declarative Hadoop: TPC-H in Pig and Hive

According to the Transaction Processing Council, TPC-H is:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

TPC-H was implemented for Hive in HIVE-600 and for Pig in PIG-2397 by Hortonworks intern Jie Li. In going over this work, I was struck by how it outlined differences between Pig and SQL.

There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.

Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:

DROP TABLE lineitem;
DROP TABLE q1_pricing_summary_report;

-- create tables and load data
Create external table lineitem (
    L_ORDERKEY INT, L_PARTKEY INT, 
    L_SUPPKEY INT, 
    L_LINENUMBER INT, 
    L_QUANTITY DOUBLE, 
    L_EXTENDEDPRICE DOUBLE, 
    L_DISCOUNT DOUBLE, 
    L_TAX DOUBLE, 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    L_SHIPDATE STRING, 
    L_COMMITDATE STRING, 
    L_RECEIPTDATE STRING, 
    L_SHIPINSTRUCT STRING, 
    L_SHIPMODE STRING, 
    L_COMMENT STRING) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem';

-- create the target table
CREATE TABLE q1_pricing_summary_report ( 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    SUM_QTY DOUBLE, 
    SUM_BASE_PRICE DOUBLE, 
    SUM_DISC_PRICE DOUBLE, 
    SUM_CHARGE DOUBLE, 
    AVE_QTY DOUBLE, 
    AVE_PRICE DOUBLE, 
    AVE_DISC DOUBLE, 
    COUNT_ORDER INT);

set mapred.min.split.size=536870912;

-- the query
INSERT OVERWRITE TABLE q1_pricing_summary_report 
SELECT 
    L_RETURNFLAG, 
    L_LINESTATUS, 
    SUM(L_QUANTITY), 
    SUM(L_EXTENDEDPRICE), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), 
    AVG(L_QUANTITY),
    AVG(L_EXTENDEDPRICE), 
    AVG(L_DISCOUNT), 
    COUNT(1) 
FROM 
  lineitem 
WHERE 
  L_SHIPDATE<='1998-09-02' 
GROUP BY L_RETURNFLAG, L_LINESTATUS 
ORDER BY L_RETURNFLAG, L_LINESTATUS;

One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:

 SET default_parallel $reducers;

LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (
    orderkey:long, 
    partkey:long, 
    suppkey:long, 
    linenumber:long, 
    quantity:double, 
    extendedprice:double, 
    discount:double, 
    tax:double, 
    returnflag, 
    linestatus, 
    shipdate, 
    commitdate, 
    receiptdate, 
    shipinstruct, 
    shipmode, 
    comment);

SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02';

SubLine = FOREACH SubLineItems GENERATE 
    returnflag, 
    linestatus, 
    quantity, 
    extendedprice, 
    extendedprice * (1-discount) AS disc_price, 
    extendedprice * (1-discount) * (1+tax) AS charge, 
    discount;

StatusGroup = GROUP SubLine BY (returnflag, linestatus);

PriceSummary = FOREACH StatusGroup GENERATE 
    group.returnflag AS returnflag, 
    group.linestatus AS linestatus, 
    SUM(SubLine.quantity) AS sum_qty, 
    SUM(SubLine.extendedprice) AS sum_base_price, 
    SUM(SubLine.disc_price) as sum_disc_price, 
    SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, 
    AVG(SubLine.extendedprice) as avg_price, 
    AVG(SubLine.discount) as avg_disc, 
    COUNT(SubLine) as count_order;

SortedSummary = ORDER PriceSummary BY returnflag, linestatus;

STORE SortedSummary INTO '$output/Q1out';

Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:

DROP TABLE customer;
DROP TABLE orders;
DROP TABLE q22_customer_tmp;
DROP TABLE q22_customer_tmp1;
DROP TABLE q22_orders_tmp;
DROP TABLE q22_global_sales_opportunity;

-- create tables and load data
create external table customer (
    C_CUSTKEY INT, 
    C_NAME STRING, 
    C_ADDRESS STRING, 
    C_NATIONKEY INT, 
    C_PHONE STRING, 
    C_ACCTBAL DOUBLE, 
    C_MKTSEGMENT STRING, 
    C_COMMENT STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer';

create external table orders (
    O_ORDERKEY INT, 
    O_CUSTKEY INT, 
    O_ORDERSTATUS STRING, 
    O_TOTALPRICE DOUBLE, 
    O_ORDERDATE STRING, 
    O_ORDERPRIORITY STRING, 
    O_CLERK STRING, 
    O_SHIPPRIORITY INT, 
    O_COMMENT STRING) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders';

-- create target tables
create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string);
create table q22_customer_tmp1(avg_acctbal double);
create table q22_orders_tmp(o_custkey int);
create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double);

-- the query
insert overwrite table q22_customer_tmp
select 
  c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode
from 
  customer
where 
  substr(c_phone, 1, 2) = '13' or
  substr(c_phone, 1, 2) = '31' or
  substr(c_phone, 1, 2) = '23' or
  substr(c_phone, 1, 2) = '29' or
  substr(c_phone, 1, 2) = '30' or
  substr(c_phone, 1, 2) = '18' or
  substr(c_phone, 1, 2) = '17';

insert overwrite table q22_customer_tmp1
select
  avg(c_acctbal)
from
  q22_customer_tmp
where
  c_acctbal > 0.00;

insert overwrite table q22_orders_tmp
select 
  o_custkey 
from 
  orders
group by 
  o_custkey;

insert overwrite table q22_global_sales_opportunity
select
  cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal
from
(
  select cntrycode, c_acctbal, avg_acctbal from
  q22_customer_tmp1 ct1 join
  (
    select cntrycode, c_acctbal from
      q22_orders_tmp ot 
      right outer join q22_customer_tmp ct 
      on
        ct.c_custkey = ot.o_custkey
    where
      o_custkey is null
  ) ct2
) a
where
  c_acctbal > avg_acctbal
group by cntrycode
order by cntrycode;

The Pig is comparably simple:

 SET default_parallel $reducers;

customer = load '$input/customer' USING PigStorage('|') as (
    c_custkey:long,
    c_name:chararray, 
    c_address:chararray, 
    c_nationkey:int, 
    c_phone:chararray, 
    c_acctbal:double, 
    c_mktsegment:chararray, 
    c_comment:chararray);
orders = load '$input/orders' USING PigStorage('|') as (
    o_orderkey:long, 
    o_custkey:long, 
    o_orderstatus:chararray, 
    o_totalprice:double, 
    o_orderdate:chararray, 
    o_orderpriority:chararray, 
    o_clerk:chararray, 
    o_shippriority:long, 
    o_comment:chararray);

customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_filter_group = group customer_filter all;
avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal;

customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey;

customer_trd_filter = filter customer_orders_left by o_custkey is null;
customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal;

customer_result_group = group customer_rows by cntrycode;
customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal;
customer_result_inorder = order customer_result by group;

store customer_result_inorder into '$output/Q22out' USING PigStorage('|');

Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.

Hortonworks Sandbox — the Fastest On Ramp to Apache Hadoop

Go from Zero to Big Data in 15 Minutes!

Today Hortonworks announced the availability of the Hortonworks Sandbox, an easy-to-use, flexible and comprehensive learning environment that will provide you with fastest on-ramp to learning and exploring enterprise Apache Hadoop.

The Hortonworks Sandbox is:

  • A free download
  • A complete, self contained virtual machine with Apache Hadoop pre-configured
  • A personal, portable and standalone Hadoop environment
  • A set of hands-on, step-by-step tutorials that allow you to learn and explore Hadoop on your own

The Hortonworks Sandbox is designed to help close the gap between people wanting to learn and evaluate Hadoop, and the complexities of spinning up an evaluation cluster of Hadoop. The Hortonworks Sandbox provides a powerful combination of hands-on, step-by-step tutorials paired with an easy to use Web interface designed to lower the learning curve for people who just want to explore and evaluate Hadoop, as quickly as possible.

One of our key focus areas is enabling Hadoop as an enterprise-viable platform that is easy to use and consume by our customers and the broader ecosystem. Over the past year or so, we have seen the complex and disjointed experience people face trying to learn Hadoop, and with the Sandbox, it allows you to have the fastest onramp to Apache Hadoop. We want the Sandbox to deliver an integrated, easy-to-use, easily updateable learning environment. Ongoing updates to the tutorials are planned, delivering new, interesting hands-on exercises, exploring different features and use cases.

These tutorials are built based on the experience gained training thousands of people in our Hortonworks University Training classes. As we continue to build out the Sandbox, we will provide additional levels of sophistication – think of it as the Hadoop 101, 201 and 301 levels of learning. And, the process of updating the tutorials is easy through the click of the “Update” button, initiating a lightweight download of just the tutorial content.

The Sandbox is a single node implementation of the Hortonworks Data Platform (HDP) 1.2 that behaves just like a normal Hadoop environment, which allows you to add your own datasets in an isolated protected environment to evaluate the use of Hadoop in your own data architectures.

Use the Sandbox to:

  • Explore Hadoop on your own
  • Plan out the integration points of your proof of concept project
  • Prepare for a more complex pilot deployment

When you are ready, you can download and deploy the Hortonworks Data Platform with the confidence that you have thought through exactly how and where Hadoop can help.

What can you expect from us in the coming months with the Hortonworks Sandbox?

  1. Join us for a special launch webinar on February 5, “Go from Zero to Big Data in 15 Minutes“. I will be hosting this webinar with one of our awesome Solution Engineers who will give you a sneak peek at some cool use cases for the Sandbox.
  2. New tutorials released on roughly a monthly basis.
  3. Demos and exercises of the integration with the tools and applications from our eco-system partners like Teradata, Alteryx, Datameer, and Microsoft. How cool would it be to run Excel on top of a personal Hadoop environment?? Well, that’s coming, so check back often.

I’m excited that you will be able to go from Zero to Big Data in 15 Minutes in a simple, easy-to-use fashion. And, I’m eager to hear your feedback – please let me know what you think of the Sandbox, what kinds of tutorials you would like to see and I would love to hear about your creative uses of the Sandbox. Leave your comments on this blog, Tweet out using #hwsandbox, comment in the Sandbox Forum, or email. The Hortonworks Sandbox is free and available for download here.

Go to page:12