Category Archives: Apache Hadoop


Seamless Reporting & Analytics for Apache Hadoop & Big Data Users

Jaspersoft, a Hortonworks certified technology partner, recently completed a survey on the early use of Apache Hadoop in the enterprise. The company found 38% of respondents require real-time or near real-time analytics for their Big Data with Hadoop. Also, within the enterprise, there is a diverse group of people who use Hadoop for such insights: 63% are application developers, 15% are BI report developers and 10% are BI admins or casual business users. Register for a free webinar to hear more.

So, for Hadoop users, the partnership between Hortonworks and Jaspersoft provides a good combination– Jaspersoft provides the ideal complement for reporting and analysis of Hadoop-based Big Data systems through a full suite of ETL, Apache Hive, and native Apache HBase connectors for low-latency data exploration. Not only does the company have an open source model that empowers users to deploy Big Data reporting and analytics quickly and cost-effectively, pre-defined reports make it easy for a wide group of users to gain and share immediate insight.

Jaspersoft joined the Hortonworks Technology Partner Program in 2012, extending advanced reporting capabilities to Hadoop users. The Hortonworks Technology Partner Program is designed to assist ISVs and other solution providers to integrate and extend their solutions for Hadoop, and includes a variety of technical enablement, technical support and training offerings. According to Hortonworks’ CTO Eric Baldschwieler, “Jaspersoft’s industry-leading reporting, analysis, and dashboard products, together with the Hortonworks Data Platform, make it easy and cost-effective for customers to derive maximum insights and value from their largest data stores.”

Choosing the right analytical approach

As easy as this sounds, there are still several approaches to analyzing and reporting on Big Data and numerous use cases— web analytics, fraud detection, security monitoring and healthcare just to name a few. Choosing the right approach depends on what insights you need and why you need them, and can make all the difference in how much value you extract from your data.

An upcoming webinar hosted by Hortonworks and Jaspersoft on March 13 will delve into the various architectural choices used in Hadoop reporting and analytics, and several use cases will be discussed. Register now.

 

Getting Ready for The Elephant Party in Europe

We are just under two weeks away from start of the first ever Hadoop Summit Europe and with all of the final preparations being made we thought we would highlight some of the not to be missed activities in and around the event. The event is filling fast but you can still register here.

Here are 10 great reasons to attend!

1)   Great track content – there are 35 informative sessions on Apache Hadoop and related technologies for you to choose from selected by the community and delivered by the experts themselves.

2)   Great keynotes – leading industry analyst Matt Aslett will present the opening keynote and we will also hear from open source veteran Shaun Connolly as well as Hortonworks CTO Eric Baldeschwieler

3)   Hadoop in the Enterprise expert panel – We will have a live panel discussion from industry leaders incuding eBay, HSBC and Neustar discussing how and why they use Apache Hadoop.

4)   Meetups – the NLHUG and other communities will be meeting around the event.

5)   Lightening talks – we’ve got rapid fire content coming to you in the form of community selected lightening talks. These 5 minute sessions will give you a taste of a wide range of technologies and initiatives

6)   It’s Amsterdam – historic, edgy and fun!

7)   Ecosystem – The conference has the support of the broader Hadoop ecosystem so you can come and discuss Hadoop and big data in the solutions showcase.

8)   Community – The Apache Hadoop community is big and getting bigger. Come meet and mingle with other community members to learn about the latest goings on and make new connections.

9)   Get Hadoop certified – Calling all Hadoop Experts! We’re bringing certification to you! If you are ready to take the exam to become a Hortonworks Certified Apache Hadoop Developer (HCAHD) or a Hortonworks Certified Apache Hadoop Administrator (HCAHA).

10)   Get trained on Hadoop – we’ve got a host of classes available during the event to help you learn or sharpen your Hadoop skills. This includes a newly added Applying Data Science class. Check out the classes.

11)  BONUS reason – have a beer on us at the Hadoop Summit Party at the Heineken Experience a cool venue at a historic location.

Register now, don’t miss the party hope to see you there!

Putting the Elephant in the Window

 

For several years now Apache Hadoop has been fueling the fast growing big data market and has become the defacto platform for Big Data deployments and the technology foundation for an explosion of new analytic applications. Many organizations turn to Hadoop to help tame the vast amounts of new data they are collecting but in order to do so with Hadoop they have had to use servers running the Linux operating system. That left a large number of organizations who standardize on Windows (According to IDC, Windows Server owned 73 percent of the market in 2012 – IDC, Worldwide and Regional Server 2012–2016 Forecast, Doc # 234339, May 2012) without the ability to run Hadoop natively, until today.

windoweleWe are very pleased to announce the availability of Hortonworks Data Platform for Windows providing organizations with an enterprise-grade, production-tested platform for big data deployments on Windows. HDP is the first and only Hadoop-based platform available on both Windows and Linux and provides interoperability across Windows, Linux and Windows Azure. With this release we are enabling a massive expansion of the Hadoop ecosystem. New participants in the community of developers, data scientist, data management professionals and Hadoop fans to build and run applications for Apache Hadoop natively on Windows. This is great news for Windows focused enterprises, service provides, software vendors and developers and in particular they can get going today with Hadoop simply by visiting our download page.

This release would not be possible without a strong partnership and close collaboration with Microsoft. Through the process of creating this release, we have remained true to our approach of community-driven enterprise Apache Hadoop by collecting enterprise requirements, developing them in open source and applying enterprise rigor to produce a 100-precent open source enterprise-grade Hadoop platform.

One of our goals at Hortonworks is to make Hadoop and enterprise viable data platform available on as many platforms as possible. In fact, it is already available today in a range of deployment options including: on-premise, virtual, cloud and an appliance. For organizations looking to leverage Apache Hadoop, they now have even more choices of deployment options between Linux and Windows, giving them more freedom to meet their internal policies and standards. For Microsoft Windows customers, they have complete portability of their Apache Hadoop applications between on premise and cloud deployments, as Hortonworks Data Platform for Windows and HDInsight Service on Windows Azure are built on exactly the same code line.

If you are in the SF Bay Area this week, you can talk to us live about the power of the Hortonworks Data Platform for Windows at booth #316 at the Strata Conference, taking place February 26-28 at the Santa Clara Convention Center in Santa Clara, Calif.

 We will also be conducting the webinar, “Unlocking the Other Half: Introduction to Hortonworks Data Platform for Windows,” on Tuesday, March 12 at 10 a.m. PST / 1 p.m. EST.

To register for the webinar, please visit http://info.hortonworks.com/Hortonworks_HDPonWindows_webcast.html.

 

Pig Eye for the SQL Guy

Cat Miller is an engineer at Mortar Data, a Hadoop-as-a-service provider, and creator of mortar, an open source framework for data processing.

Introduction

For anyone who came of programming age before cloud computing burst its way into the technology scene, data analysis has long been synonymous with SQL. A slightly awkward, declarative language whose production can more resemble logic puzzle solving than coding, SQL and the relational databases it builds on have been the pervasive standard for how to deal with data.

As the world has changed, so too has our data; an ever-increasing amount of data is now stored without a rigorous schema, or must be joined to outside data sets to be useful. Compounding this problem, often the amounts of data are so large that working with them on a traditional SQL database is so non-performant as to be impractical.

Enter Pig, a SQL-like language that gracefully tolerates inconsistent schemas, and that runs on Hadoop. (Hadoop is a massively parallel platform for processing the largest of data sets in reasonable amounts of time. Hadoop powers Facebook, Yahoo, Twitter, and LinkedIn, to name a few in a growing list.)

This then is a brief guide for the SQL developer diving into the waters of Pig Latin for the first time. Pig is similar enough to SQL to be familiar, but divergent enough to be disorienting to newcomers. The goal of this guide is to ease the friction in adding Pig to an existing SQL skillset.

What’s Similar?

The basic concepts in SQL map pretty well onto Pig. There are analogues for the major SQL keywords, and as a result you can write a query in your head as SQL and then translate it into Pig Latin without undue mental gymnastics.

WHERE → FILTER

The syntax is different, but conceptually this is still putting your data into a funnel to create a smaller dataset.

HAVING → FILTER

Because a FILTER is done in a separate step from a GROUP or an aggregation, the distinction between HAVING and WHERE doesn’t exist in Pig.

ORDER BY → ORDER

This keyword behaves pretty much the same in Pig as in SQL.

JOIN

In Pig, joins can have their execution specified, and they look a little different, but in essence these are the same joins you know from SQL, and you can think about them in the same way. There are INNER and OUTER joins, RIGHT and LEFT specifications, and even CROSS for those rare moments that you actually want a Cartesian product.

Because Pig is most appropriately used for data pipelines, there are often fewer distinct relations or tables than you would expect to see in a traditional normalized relational database.

Control over Execution

SQL performance tuning generally involves some fiddling with indexes, punctuated by the occasional yelling at an explain plan that has inexplicably decided to join the two largest tables first. It can mean getting a different plan the second time you run a query, or having the plan suddenly change after several weeks of use because the statistics have evolved, throwing your query’s performance into the proverbial toilet.

Various SQL implementations offer hints to combat this problem—you can use a hint to tell your SQL optimizer that it should use an index, or to force a given table to be first in the join order. Unfortunately, because hints are dependent on the particular SQL implementation, what you actually have at your disposal varies by platform.

Pig offers a few different ways to control the execution plan. The first is just the explicit ordering of operations. You can write your FILTER before your JOIN (the reverse of SQL’s order) and be clever about eliminating unused fields along the way, and have confidence that the executed order will not be worse.

Secondly, the philosophy of Pig is to allow users to choose implementations where multiple ones are possible. As a result, there are three specialized joins that a can be used when the features of the data are known, and are less appropriate for a regular join. For regular joins, the order of the arguments dictates execution—the larger data set should appear last in this type of join.

As with SQL, in Pig you can pretty much ignore the performance tweaks until you can’t. Because of the explicit control of ordering, it can be useful to have a general sense of the “good” order to do things in, though Pig’s optimizer will also try to push up FILTERs and LIMITs, taking some of the pressure off.

What’s Different?

A Row By Any Other Name

The SQL paradigm is very straightforward—there are tables, and tables contain rows. Every select statement yields a set of rows, and each field in a row is a basic data type. Conceptually, the result of any SQL select statement can be imported into Excel without loss of information.

Pig introduces a mature nesting notion in tuples and data bags that changes the game significantly. Pig consists of data sets called relations (sometimes called aliases for the names they are given), and those contain records that are data tuples, which can in turn recursively contain data bags, data tuples, or data items. There is distinct lack of flatness in Pig, and the best way to see it is to explore how GROUP works.

Not Your Grandmother’s GROUP

In the handling of GROUP, SQL and Pig diverge significantly. SQL’s GROUP doesn’t exist outside of the aggregation performed on it; you would never SELECT * GROUP by field1–it just doesn’t make sense. Because everything in SQL is a row, the grouping created isn’t persistent—only the data produced aggregating over it remains.

Pig’s GROUP is an entirely different beast, albeit used for the same purpose. It is a persistent relation that can be used again and again, independent of what aggregations you might choose to perform on it later.

student_grades = GROUP grades by student;

This student_grades relation has two fields: one called group and populated with the value of student, the other called grades populated with a data bag containing tuples for all the entries with the same value of student.

group	grades
alyssa 	{< hacking101, 95 >, < english, 60 >}
ben	{< math, 90 >}

Now to do an aggregation, perform it on the student_grades alias.

average_grades = FOREACH student_grades GENERATE group, AVG(grades.value);
Procedural Paradise

This is the first thing any Google search on Pig will tell you, and it is the most glaringly obvious change from SQL. After having taught your brain for years how to turn an idea inside-out and mash all of its pieces into one query, Pig makes query writing feel like writing Java or C++. In addition to to obvious potential cognitive benefits, this has some technical ones as well.

Subquery Reuse

Ever write a query with a subselect, and then realize you actually needed to use that subtable twice in the query? Did you feel absolutely awful as you cut and pasted that subtable? Did you wonder whether your SQL plan would successfully manage to not calculate it twice? (Note: the WITH clause mitigates this pain in a lot of cases, but isn’t available in all flavors of SQL.)

Because in Pig Latin every step has a declared alias, reusing “subquery” tables is natural and intuitive, and generally does not involve building them twice.

Getting multiple queries out of one pipeline

In SQL you can find yourself in a place where you want to use the data, do some manipulation on it, and then take it in a few different directions. To do this in one query requires profligate use of JOIN, and enough parens to intimidate a LISP hacker.

In a Pig pipeline, any and all aliases produced along the way can be stored, and all it takes is adding a new STORE statement to the script.

User-Defined Functions

SQL has had decades for people to figure out what analytic functions they need for arbitrary data analysis, and so when you find yourself suddenly interested in extracting the day of the week from a date, that function is ready and waiting.

Pig’s list of built-in functions is growing, but is still dwarfed by what Oracle or MYSQL provides. What turns this into a tolerable constraint is that Pig allows the user to define aggregate or analytic functions in other languages (Java, Python, and others) and then apply them in Pig quickly and without fuss.

REGISTER udf.jar;

new_data = FOREACH my_data GENERATE udf.ImportantFunction(field1);

The Well-Disguised SQLer

In general, if you can think about it in SQL, you can do it in Pig. Be aware of the nested data structures, have a cheat sheet for syntax, and relish the ability to write queries the way your brain thinks them, and not the way SQL demands.

As a final thought, let’s resurrect our old friend the emp table, and take a look at some SQL to Pig Latin examples.

Average Salary by Location
SQL
SELECT loc, AVG(sal) FROM emp JOIN dept USING(deptno) WHERE sal > 3000 GROUP BY loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY loc;

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);
Ordered Average Salary by Location

Suppose now that the following is true.

● The ‘loc’ field is a string/varchar field, and we have two pieces of software that automatically populate it. One stores values as lowercase, one as uppercase. (If this seems like a contrived example to you, you have chosen your employers and software vendors well.)

The new parts of the queries appear in bold.

SQL
SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno) WHERE sal > 3000) std_table GROUP BY standard_loc;
Pig Latin
filtered_emp = FILTER emp BY sal > 3000;

emp_join_dept = JOIN filtered_emp BY deptno, dept BY deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

avg_salary = FOREACH grouped_by_loc GENERATE group, AVG(emp_join_dept.sal);

This kind of change is friendlier to Pig because of the limitations in SQL’s GROUP BY clause; the trade-off of verboseness for clarity increases in value the more complex your query gets.

Above Average Salary for Location

Now suppose instead of arbitrarily selecting 3,000 as a threshold, which is going to overselect people living in large expensive cities, we want to select those employees who make more than twice the average for their location.

SQL

There are several ways to accomplish this, of course, but for illustration purposes the most vanilla version is shown here.

SELECT empno FROM 
(SELECT empno, UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table1 JOIN
(SELECT standard_loc, AVG(sal) avg_salary FROM
(SELECT UPPER(loc) standard_loc, sal FROM emp JOIN dept USING(deptno)) std_table2 GROUP BY standard_loc) grp_table2
USING(standard_loc) WHERE sal > (2 * avg_salary);
Pig Latin
emp_join_dept = JOIN emp BY deptno, dept by deptno;

grouped_by_loc = GROUP emp_join_dept BY UPPER(loc);

loc_avg_salary = FOREACH grouped_by_loc GENERATE AVG(emp_join_dept.sal) as avg_salary, FLATTEN(emp_join_dept);

highly_paid = FILTER loc_avg_salary BY sal > (2 * avg_salary);

Here the dreaded subquery reuse rears its unattractive head in SQL and leads to a bit of a frankenquery. In contrast, the Pig script stays the same length, because we can effectively just shift the FILTER from the beginning to the end of our data flow. FLATTEN is a new entry to our function arena, for which there is no SQL analogue. What FLATTEN does is unnest tuples and data bags; in this example it’s taking the data bag ‘emp_join_dept’ created by the GROUP function, and removing the nesting so that the fields within it will be at the same level as avg_salary.

Run It Yourself for the Swine-Curious

If you want to to try out some Pig examples hands-on, you can get a free Mortar account here. We’ve generated a one million row emp table data set, so to run these examples on your own all you need to do is:

    1. Sign up at app.mortardata.com for a free account.
    2. Go to Web Projects
    3. Select My Web Projects -> New Blank project
    4. To load the two data files, you need LOAD statements, and to store the resulting data you need a STORE statement, so in the end your pig script should look like this:
dept = LOAD 's3n://mortar-example-data/employee/dept.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (deptno:int, dname:chararray, loc:chararray);

emp = LOAD 's3://mortar-example-data/employee/emp.csv' USING org.apache.pig.piggybank.storage.CSVExcelStorage('SKIP_HEADER')
AS (empno:int, ename:chararray, job:chararray, mgr:int, hiredate:chararray, sal:float, deptno:int);

-- Pig example code here

STORE avg_salary INTO 's3n://mortar-example-output-data/$MORTAR_EMAIL_S3_ESCAPED/emp_example’ USING PigStorage('\t');
  1. Click Run and select a 2-node cluster. Even with one million rows, it runs in under three minutes. Sit back and watch the power of modern computing tackle the problems of the 1980s.

You can also try out Pig in the Hortonworks Sandbox.

Apache Hadoop YARN Meetup II @Hortonworks

Introduction

Hortonworks hosted the second Apache Hadoop YARN meetup at Hortonworks office in Palo Alto on last Friday (22 February 2013). Following the success with the first one, this meetup continues to enjoy a good attendance from the YARN community. About 40 joined the meetup in person and nearly another 30 attended via phone/webex.

Meetup sessions

Update from Yahoo!

The Yahoo! grid team responsible for YARN rollout on their clusters gave an update of the current deployments and their state. Robert Evans and others from their team threw some very impressive numbers about the YARN clusters – 10s of million jobs till now on YARN, averaging ~100,000 jobs on some clusters per day. Please go ahead and read their recent blog on Yahoo! developer network: Hadoop at Yahoo!: More Than Ever Before. They then fielded several questions from the community like any pain-points for the users during the upgrade, big issues that only surfaced at scale. The software is deemed sufficiently stable, churning jobs out impressively and with maximum uptime with downtime mostly happening during upgrades.

Bikas Saha on ResourceManager restart functionality

After the update from Yahoo!, Bikas Saha from Hortonworks talked about the ResoureManager restart functionality. Most of his work is captured on the Apache JIRA issue YARN-128. The effort is divided into phases and the first phase involves:

  • Putting in place mechanisms to save application state and read them back after restart.
  • Upon restart, the NM’s are asked to reboot and the previously running AM’s are restarted.
  • AMs which support recovery automatically read back their own saved state on restart and try to resume work from before RM restart.

This first phase to restart all the running applications on RM recovery is done and shipped with the latest hadoop release 2.0.3-alpha. He discussed the overall design on a whiteboard, explaining the implementation.

YARN at LinkedIn

Chris Riccomini then talked a bit about what he continues to do with YARN (see his notes from last meetup).

Arun C Murthy on CPU scheduling in YARN

Arun talked about the enhancements to YARN resource scheduling to also account for CPU cores in addition to the memory based scheduling we already have. This effort is capture on Apache JIRA at YARN-2. Arun walked us through the DRF algorithm on which this work is based on, described various scheduling scenarios and summed it up with possible future directions.

Alejandro also gave a brief summary about adding support for CPU isolation/monitoring of containers. YARN-3 enhances YARN to use cgroups to control the cpu usage of containers. There is still a little work left to make this feature exposed to the end-users.

CPU scheduling and support for isolation via cgroups in YARN are both available in the most recent hadoop release 2.0.3-alpha. Both these features are big steps for YARN in realizing its goal of becoming the foremost generic resource management layer and making Hadoop the ‘distributed operating system’ on which rest of the data systems build on.

YARN progress and Roadmap

I did a quick recap of what we discussed in the last YARN meetup and what we’ve achieved so far. Few things, the community has delivered on its promises from last time:

Libraries for helping application writers: YARN-418 is the umbrella ticket for tracking this and we made quite some progress. YARN-29 helps application submission, YARN-103 is helpful to simply the usage of the AM RM protocol.
CPU isolation and scheduling: YARN-2 and YARN-3 are checked in as noted above
RM restart: The first phase to just restart running AMs and NMs is already in as part of YARN-128.

I then summed it up with our roadmap. The goal of YARN community for the next version of hadoop is to address some rough corners in YARN that are thwarting its march beyond its alpha use. Some areas of focus include:

  • RM restart: Finish testing RM restart at scale and progress toward the next phases
  • YARN usability: Address the minor and major usability issues with YARN client and web interfaces.
  • YARN API cleanup: Cleanup YARN APIs now itself to make them future proof and help us support backwards compatibility of stable APIs into the future
  • Security: YARN’s mostly been secure from the word get-go, a couple of minor things need to addressed to close this loop.

Conclusion

Thanks to everyone for making YARN meetups a continued success story. All help is welcome from the community to focus on solidifying our next release. Looking forward to meeting you all again at the next meetup!

Apache Pig 0.11 Released!

Apache Pig version 0.11 was released last week. An Apache Pig blog post summarized the release. New features include:

  • A DateTime datatype, documentation here.
  • A RANK function, documentation here.
  • A CUBE operator, documentation here.
  • Groovy UDFs, documentation here.

And many improvements. Oink it up for Pig 0.11! Hortonworks’ Daniel Dai gave a talk on Pig 0.11 at Strata NY, check it out:

Apache HBase 0.94.5 is out!

Last week, the HBase community released 0.94.5, which is the most stable release of HBase so far. The release includes 76 jira issues resolved, with 61 bug fixes, 8 improvements, and 2 new features.

Most of the bug fixes went against the REST server, replication, region assignment, secure client, flaky unit tests, 0.92 compatibility and various stability improvements. Some of the interesting patches in this release are:
[HBASE-3996] – Support multiple tables and scanners as input to the mapper in map/reduce jobs
[HBASE-5416] – Improve performance of scans with some kind of filters.
[HBASE-7757] – Add web UI to REST server and Thrift server
[HBASE-7748] – Add DelimitedKeyPrefixRegionSplitPolicy
[HBASE-6669] – Add BigDecimalColumnInterpreter for doing aggregations using AggregationClient
[HBASE-7728] – Deadlock occurs between hlog roller and blog syncer’

The release candidate has been extensively tested by Hortonworks and many others in the community. You can roll out the 0.94.5 bits using rolling upgrade on top of 0.92 or 0.94 releases. In addition, Apache HBase 0.94.5 will be incorporated into an upcoming update to HDP 1.2.

You can download the new release from here, and find full release notes here.

Last, but not least, we would like to thank Lars Hofhansl, who is the release manager of 0.94 branch for driving the release train, and all 30 individuals, who have contributed to this release.

The Fastest Path to Innovation: Community Driven Open Source

 

blogpicLast week, we outlined our approach for delivering an enterprise viable Apache Hadoop distribution in the open.  Simply put: we believe the fastest way to innovate is to do our work within the open source community, introduce enterprise feature requirements into that public domain, and to work diligently to progress existing open source projects and incubate new projects to meet those needs.

In support of our approach, this week we’ve announced the submission of two new incubation projects to the Apache Software foundation together with the launch of the “Stinger Initiative”, all aimed at enhancing the security and performance of Hadoop applications.  These efforts focus on enterprise requirements that are essential to enable broad adoption across the Hadoop ecosystem.

  • The Stinger initiative aims to dramatically speed up Apache Hive in support of interactive query use cases.
  • The Knox Gateway addresses the need for a single point of authentication and secure access for Apache Hadoop services in a cluster.
  • The Tez framework provides an alternative next-generation runtime built on Hadoop YARN that significantly improves latency and throughput of Hadoop applications.

We feel these efforts are strong examples of our commitment to driving innovation from within the open source community, and as stated in our approach blog, we do this by::

  • identifying and articulating the enterprise requirements within the community,
  • taking an active role in addressing those requirements within the community, and
  • applying enterprise rigor to the build, test and release process to ensure that the open source projects as well as the larger product distribution we provide is enterprise grade and interoperable with other elements in the enterprise.

Since it takes a community to build enterprise-class platforms like Hadoop, if you have interest in helping with Knox, Tez, or Stinger, we encourage you to work with us and the others in the Apache community!

Introducing… Tez: Accelerating processing of data stored in HDFS

 

MapReduce has served us well.  For years it has been THE processing engine for Hadoop and has been the backbone upon which a huge amount of value has been created.  While it is here to stay, new paradigms are also needed in order to enable Hadoop to serve an even greater number of usage patterns.  A key and emerging example is the need for interactive query, which today is challenged by the batch-oriented nature of MapReduce.  A key step to enabling this new world was Apache YARN and today the community proposes the next step… Tez

What is Tez?

Tez – Hindi for “speed” – (currently under incubation vote within Apache) provides a general-purpose, highly customizable framework that creates simplifies data-processing tasks across both small scale (low-latency) and large-scale (high throughput) workloads in Hadoop. It generalizes the MapReduce paradigm to a more powerful framework by providing the ability to execute a complex DAG (directed acyclic graph) of tasks for a single job so that projects in the Apache Hadoop ecosystem such as Apache Hive, Apache Pig and Cascading can meet requirements for human-interactive response times and extreme throughput at petabyte scale (clearly MapReduce has been a key driver in achieving this).

With the emergence of Apache Hadoop YARN as the basis of next generation data-processing architectures, there is a strong need for an application which can execute a complex DAG of tasks which can then be shared by Apache Pig, Apache Hive, Cascading and others.  The constrained DAG expressible in MapReduce (one set of maps followed by one set of reduces) often results in multiple MapReduce jobs which harm latency for short queries (overhead of launching multiple jobs) and throughput for large-scale queries (too much overhead for materializing intermediate job outputs to the filesystem). With Tez, we introduce a more expressive DAG of tasks, within a single application or job, that is better aligned with the required processing task – thus, for e.g., any given SQL query can be expressed as a single job using Tez.

The below graphic illustrates the advantages provided by Tez for complex SQL queries in Apache Hive or complex Apache Pig scripts.

pighivetez

Tez is critical to the Stinger Initiative and goes a long way in helping Hive support both interactive queries and batch queries. Tez provides a single underlying framework to support both latency and throughput sensitive applications, there-by obviating the need for multiple frameworks and systems to be installed, maintained and supported, a key advantage to enterprises looking to rationalize their data architectures. .

Essentially, Tez is the logical next step for Apache Hadoop after Apache Hadoop YARN. With YARN the community generalized Hadoop MapReduce to provide a general-purpose resource management framework (YARN) where-in MapReduce became merely one of the applications that could process data in your Hadoop cluster. With Tez, we build on YARN and our experience with the MapReduce to provide a more general data-processing application to the benefit of the entire ecosystem i.e. Apache Hive, Apache Pig etc.

What has been completed? Where can Tez go?

An early version of the project has been donated to the ASF as part of the initial code grant to establish the Incubation project.   Through the work done in the Stinger initiative, it is already clear that Tez enables and order of magnitude increase in the performance of Apache Hive.

The community is also designing a re-usable set of libraries of data-processing primitives such as sorting, merging, data-shuffling, intermediate data management etc. which are necessary for Tez and may be used directly by other projects.  This is just the beginning.  It is an extensible architecture that will undoubtedly be contributed to widely.

For the community, by the community

At Hortonworks we believe that innovation happens fastest by working with a community of like-minded individuals to address the requirements for Hadoop without being bounded by artificial boundaries such as employment. As such, even though the Hortonworks MapReduce/Hive/Pig team seeded the project, we’ve had the benefit of both positive feedback and constructive criticism from several leading contributors and committers across the Apache Hadoop MapReduce, Apache Hive & Apache Pig projects.  These inventors and peers are employed at Hortonworks, Yahoo, Facebook, Microsoft, Twitter and many others.  The initial committer list has 22 participants with deep domain expertise in these unique challenges and comprises a who’s who in the Hadoop world.  Of course, now that we are nearly in a position where we can co-develop via the Apache Software Foundation where we have proposed Tez as an Incubator project, we expect a very quick acceleration of project development.

When will it be available?

We plan to donate the code from our internal repository to the ASF as part of the Incubator proposal.  Also, Hortonworks will ship Tez in the next alpha release of Hortonworks Data Platform 2 (HDP2), based on Apache Hadoop 2.0, very soon to showcase some of the very exciting advances we have made for Apache Hive via Project Stinger.

We are very excited by the reception Tez has received so far, and we do hope you can join us in this initiative via the Apache Software Foundation project to make Hadoop better!

The Stinger Initiative: Making Apache Hive 100 Times Faster

 

Introduced by Facebook in 2007, Apache Hive and its HiveQL interface has become the de facto SQL interface for Hadoop.  Today, companies of all types and sizes use Hive to access Hadoop data in a familiar way and to extend value to their organization or customers either directly or though a broad ecosystem of existing BI tools that rely on this key proven interface.  The who’s who of business analytics have already adopted Hive.

Apache Hive was originally built for large-scale operational batch processing and it is very effective with reporting, data mining and data preparation use cases.  These usage patterns remain very important but with widespread adoption of Hadoop, the enterprise requirement for Hadoop to become more real time or interactive has increased in importance as well. At Hortonworks, we believe in the power of the open source community to innovate faster than any proprietary offering and the Stinger initiative is proof of this once again as we collaborate with others to improve Hive performance.

So, What is Stinger?

Enabling Hive to answer human-time use cases (i.e. queries in the 5-30 second range) such as big data exploration, visualization, and parameterized reports without needing to resort to yet another tool to install, maintain and learn can deliver a lot of value to the large community of users with existing Hive skills and investments.

To this end, we have launched the Stinger Initiative, with input and participation from the broader community, to enhance Hive with more SQL and better performance for these human-time use cases. All the while, HiveQL remains the same before and after these advancements so it just gets better. And in keeping with the ecosystem of existing tools, it is complementary to best-of-breed data warehouses and analytic platforms.

  • stingerRoadFirst, we are making Hive a more suitable tool for the decision support queries people want to perform on Hadoop.  This includes adding analytics features like the OVER clause, support for subqueries in WHERE, and aligning Hive’s type system more with the standard SQL model.
  • Second, we are optimizing Hive’s query execution plans and based on our initial changes, we have already seen query time drop by 90% on some of our test queries. We are also looking at additional changes inside Hive’s execution engine that we believe will significantly increase the number of records per second that a Hive task can process.
  • Third, we have introduced a new columnar file format (i.e. ORCFile) within the Hive community to provide a more modern, efficient, and high performing way to store Hive data.
  • And lastly, we’ve introduced a new runtime framework, called Tez, which aims to eliminate Hive’s latency and throughput constraints that result from its reliance on MapReduce. Tez optimizes Hive job execution by eliminating unnecessary tasks, synchronization barriers, and reads from and write to HDFS.  This optimizes the execution chain within Hadoop and drastically speeds Hive’s workload processing.

All of these modifications to Hive are underway in the open and an initial preview will be available in advance of Hadoop Summit Amsterdam in March.

Embrace the community, Embrace Hive…

A diverse group of individuals within the Hive community are collaborating on these efforts. As part f the community, a wide group of people contributed to this effort, including resources from SAP, Microsoft, Facebook and Hortonworks.

Harish Butani from SAP has led an effort to add analytics and windowing functions to Hive.  This will add the OVER clause for use with existing aggregate functions as well as adding analytics functions like RANK and NTILE and windowing functions like LEAD and LAG; you can see this work at HIVE-896.  Namit Jain from Facebook has been spending a lot of time lately optimizing Hive’s query execution planning so that it performs joins and other operations more efficiently and with less need for hints from the user.  Hortonworks engineers have been collaborating on these and other community efforts to improve Hive.

Owen O’Malley, a Hortonworks co-founder and early Hadoop developer, has been working with Facebook on the new ORCFile in order to greatly improve performance when Hive is reading, writing, and processing data; you can see this work at HIVE-3874. We are also working on farther reaching changes and optimizations such as reworking Hive’s operators to process records in blocks of a thousand or more and thus be much more efficient than it is today.

We believe the performance changes we are making today, along with the work being done in Tez will transform Hive into a single tool that Hadoop users can use to do report generation, ad hoc queries, and large batch jobs spanning 10s or 100s of terabytes.

Why reinvent the wheel?

Securing Hadoop with Knox Gateway

 

Back in the day, in order to secure a Hadoop cluster all you needed was a firewall that restricted network access to only authorized users. This eventually evolved into a more robust security layer in Hadoop… a layer that could augment firewall access with strong authentication. Enter Kerberos.  Around 2008, Owen O’Malley and a team of committers led this first foray into security and today, Kerberos is still the primary way to secure a Hadoop cluster.

Fast-forward to today… Widespread adoption of Hadoop is upon us.  The enterprise has placed requirements on the platform to not only provide perimeter security, but to also integrate with all types of authentication mechanisms. Oh yeah, and all the while, be easy to manage and to integrate with the rest of the secured corporate infrastructure. Kerberos can still be a great provider of the core security technology but with all the touch-points that a user will have with Hadoop, something more is needed.

The time has come for Knox.

The only path to security in Hadoop is the community

Screen Shot 2013-02-19 at 6.16.28 AM

The Knox Gateway aims to provide perimeter security that will integrate easily into existing security infrastructure.  Delivering this key component of the Apache Hadoop ecosystem is a critical community project.  Security is not an afterthought.  It needs to be woven into the very fabric of Hadoop in order to be effective. Being a part of the community will allow Knox to accomplish just that.

Already the community has rallied around the project and the vote has been positive thus far.  Tomorrow we should see community approval of a new incubation project in the Apache Software Foundation for Knox, a security layer for the Hadoop ecosystem.  The initial mentor list contains resources from Hortonworks, Microsoft and NASA among others.

What comprises the Knox Gateway?

The Knox Gateway (“Gateway” or “Knox”) is a system that provides a single point of authentication and access for Apache Hadoop services in a cluster. The goal is to simplify Hadoop security for both users (i.e. who access the cluster data and execute jobs) and operators (i.e. who control access and manage the cluster). The Gateway runs as a server (or cluster of servers) that serve one or more Hadoop clusters.  It has few key functions:

  • Provide perimeter security to make Hadoop security setup easier
  • Support authentication and token verification security scenarios
  • Deliver users a single cluster end-point that aggregates capabilities for data and jobs
  • Enable integration with enterprise and cloud identity management environments
  • Manage security across multiple clusters and multiple versions of Hadoop

Knox will be able to provide a security layer for multiple clusters and multiple versions of Hadoop simultaneously and will deliver a simple intuitive management interface.  Playing nice with others is always a security imperative, so Knox will integrate with the existing frameworks for Active Directory /LDAP and it will allow for extensions for custom authentication mechanisms.

Availability

The short term plan for the Knox team is to deliver a solid, working release in late March so that early adopters can begin to evaluate and provide valuable feedback.  This critical step will ensure that the gateway fits nicely into customers’ infrastructure and makes Hadoop easier to use… and more secure.

Pig, ToJson, and Redis to publish data with Flask


Pig can easily stuff Redis full of data. To do so, we’ll need to convert our data to JSON. We’ve previously talked about pig-to-json in JSONize anything in Pig with ToJson. Once we convert our data to json, we can use the pig-redis project to load redis.

Build the pig to json project:

git clone git@github.com:rjurney/pig-to-json.git
ant

Then run our Pig code:

/* Load Avro jars and define shortcut */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

register /me/Software/pig-to-json/dist/lib/pig-to-json.jar
register /me/Software/pig-redis/dist/pig-redis.jar

-- Enron emails are available at https://s3.amazonaws.com/rjurney_public_web/hadoop/enron.avro
emails = load '/me/Data/enron.avro' using AvroStorage();

json_test = foreach emails generate message_id, com.hortonworks.pig.udf.ToJson(tos) as bag_json;

store json_test into 'dummy-name' using com.hackdiary.pig.RedisStorer('kv', 'localhost');

Now run our Flask web server:

python server.py

redis-pig

Code for this post is available here: https://github.com/rjurney/enron-pig-tojson-redis-node.

Imperative and Declarative Hadoop: TPC-H in Pig and Hive

According to the Transaction Processing Council, TPC-H is:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications. The queries and the data populating the database have been chosen to have broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, execute queries with a high degree of complexity, and give answers to critical business questions.

TPC-H was implemented for Hive in HIVE-600 and for Pig in PIG-2397 by Hortonworks intern Jie Li. In going over this work, I was struck by how it outlined differences between Pig and SQL.

There seems to be tendency for simple SQL to provide greater clarity than Pig. At some point as the TPC-H queries become more demanding, complex SQL seems to have less clarity than the comparable Pig. Lets take a look.

Q1, the pricing summary report, is fairly simple, and a SQL GROUP BY is a good fit:

DROP TABLE lineitem;
DROP TABLE q1_pricing_summary_report;

-- create tables and load data
Create external table lineitem (
    L_ORDERKEY INT, L_PARTKEY INT, 
    L_SUPPKEY INT, 
    L_LINENUMBER INT, 
    L_QUANTITY DOUBLE, 
    L_EXTENDEDPRICE DOUBLE, 
    L_DISCOUNT DOUBLE, 
    L_TAX DOUBLE, 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    L_SHIPDATE STRING, 
    L_COMMITDATE STRING, 
    L_RECEIPTDATE STRING, 
    L_SHIPINSTRUCT STRING, 
    L_SHIPMODE STRING, 
    L_COMMENT STRING) 
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/lineitem';

-- create the target table
CREATE TABLE q1_pricing_summary_report ( 
    L_RETURNFLAG STRING, 
    L_LINESTATUS STRING, 
    SUM_QTY DOUBLE, 
    SUM_BASE_PRICE DOUBLE, 
    SUM_DISC_PRICE DOUBLE, 
    SUM_CHARGE DOUBLE, 
    AVE_QTY DOUBLE, 
    AVE_PRICE DOUBLE, 
    AVE_DISC DOUBLE, 
    COUNT_ORDER INT);

set mapred.min.split.size=536870912;

-- the query
INSERT OVERWRITE TABLE q1_pricing_summary_report 
SELECT 
    L_RETURNFLAG, 
    L_LINESTATUS, 
    SUM(L_QUANTITY), 
    SUM(L_EXTENDEDPRICE), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT)), 
    SUM(L_EXTENDEDPRICE * (1-L_DISCOUNT) * (1+L_TAX)), 
    AVG(L_QUANTITY),
    AVG(L_EXTENDEDPRICE), 
    AVG(L_DISCOUNT), 
    COUNT(1) 
FROM 
  lineitem 
WHERE 
  L_SHIPDATE<='1998-09-02' 
GROUP BY L_RETURNFLAG, L_LINESTATUS 
ORDER BY L_RETURNFLAG, L_LINESTATUS;

One thing to notice, though, that compared to Pig we have to specify schemas twice – once for the load, and again for the result. Compare that to the Pig, where we specify the schema once upon load, and then implicitly in Pig code itself:

 SET default_parallel $reducers;

LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (
    orderkey:long, 
    partkey:long, 
    suppkey:long, 
    linenumber:long, 
    quantity:double, 
    extendedprice:double, 
    discount:double, 
    tax:double, 
    returnflag, 
    linestatus, 
    shipdate, 
    commitdate, 
    receiptdate, 
    shipinstruct, 
    shipmode, 
    comment);

SubLineItems = FILTER LineItems BY shipdate <= '1998-09-02';

SubLine = FOREACH SubLineItems GENERATE 
    returnflag, 
    linestatus, 
    quantity, 
    extendedprice, 
    extendedprice * (1-discount) AS disc_price, 
    extendedprice * (1-discount) * (1+tax) AS charge, 
    discount;

StatusGroup = GROUP SubLine BY (returnflag, linestatus);

PriceSummary = FOREACH StatusGroup GENERATE 
    group.returnflag AS returnflag, 
    group.linestatus AS linestatus, 
    SUM(SubLine.quantity) AS sum_qty, 
    SUM(SubLine.extendedprice) AS sum_base_price, 
    SUM(SubLine.disc_price) as sum_disc_price, 
    SUM(SubLine.charge) as sum_charge, AVG(SubLine.quantity) as avg_qty, 
    AVG(SubLine.extendedprice) as avg_price, 
    AVG(SubLine.discount) as avg_disc, 
    COUNT(SubLine) as count_order;

SortedSummary = ORDER PriceSummary BY returnflag, linestatus;

STORE SortedSummary INTO '$output/Q1out';

Things change as the queries get more complex. With the use of temporary tables, the schema creation overhead starts to dominate, and the SQL becomes quite complex. Take a look at Q22, the Global Sales Opportunity Report:

DROP TABLE customer;
DROP TABLE orders;
DROP TABLE q22_customer_tmp;
DROP TABLE q22_customer_tmp1;
DROP TABLE q22_orders_tmp;
DROP TABLE q22_global_sales_opportunity;

-- create tables and load data
create external table customer (
    C_CUSTKEY INT, 
    C_NAME STRING, 
    C_ADDRESS STRING, 
    C_NATIONKEY INT, 
    C_PHONE STRING, 
    C_ACCTBAL DOUBLE, 
    C_MKTSEGMENT STRING, 
    C_COMMENT STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/customer';

create external table orders (
    O_ORDERKEY INT, 
    O_CUSTKEY INT, 
    O_ORDERSTATUS STRING, 
    O_TOTALPRICE DOUBLE, 
    O_ORDERDATE STRING, 
    O_ORDERPRIORITY STRING, 
    O_CLERK STRING, 
    O_SHIPPRIORITY INT, 
    O_COMMENT STRING) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION '/tpch/orders';

-- create target tables
create table q22_customer_tmp(c_acctbal double, c_custkey int, cntrycode string);
create table q22_customer_tmp1(avg_acctbal double);
create table q22_orders_tmp(o_custkey int);
create table q22_global_sales_opportunity(cntrycode string, numcust int, totacctbal double);

-- the query
insert overwrite table q22_customer_tmp
select 
  c_acctbal, c_custkey, substr(c_phone, 1, 2) as cntrycode
from 
  customer
where 
  substr(c_phone, 1, 2) = '13' or
  substr(c_phone, 1, 2) = '31' or
  substr(c_phone, 1, 2) = '23' or
  substr(c_phone, 1, 2) = '29' or
  substr(c_phone, 1, 2) = '30' or
  substr(c_phone, 1, 2) = '18' or
  substr(c_phone, 1, 2) = '17';

insert overwrite table q22_customer_tmp1
select
  avg(c_acctbal)
from
  q22_customer_tmp
where
  c_acctbal > 0.00;

insert overwrite table q22_orders_tmp
select 
  o_custkey 
from 
  orders
group by 
  o_custkey;

insert overwrite table q22_global_sales_opportunity
select
  cntrycode, count(1) as numcust, sum(c_acctbal) as totacctbal
from
(
  select cntrycode, c_acctbal, avg_acctbal from
  q22_customer_tmp1 ct1 join
  (
    select cntrycode, c_acctbal from
      q22_orders_tmp ot 
      right outer join q22_customer_tmp ct 
      on
        ct.c_custkey = ot.o_custkey
    where
      o_custkey is null
  ) ct2
) a
where
  c_acctbal > avg_acctbal
group by cntrycode
order by cntrycode;

The Pig is comparably simple:

 SET default_parallel $reducers;

customer = load '$input/customer' USING PigStorage('|') as (
    c_custkey:long,
    c_name:chararray, 
    c_address:chararray, 
    c_nationkey:int, 
    c_phone:chararray, 
    c_acctbal:double, 
    c_mktsegment:chararray, 
    c_comment:chararray);
orders = load '$input/orders' USING PigStorage('|') as (
    o_orderkey:long, 
    o_custkey:long, 
    o_orderstatus:chararray, 
    o_totalprice:double, 
    o_orderdate:chararray, 
    o_orderpriority:chararray, 
    o_clerk:chararray, 
    o_shippriority:long, 
    o_comment:chararray);

customer_filter = filter customer by c_acctbal>0.00 and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_filter_group = group customer_filter all;
avg_customer_filter = foreach customer_filter_group generate AVG(customer_filter.c_acctbal) as avg_c_acctbal;

customer_sec_filter = filter customer by c_acctbal > avg_customer_filter.avg_c_acctbal and SUBSTRING(c_phone, 0, 2) MATCHES '13|31|23|29|30|18|17';
customer_orders_left = join customer_sec_filter by c_custkey left, orders by o_custkey;

customer_trd_filter = filter customer_orders_left by o_custkey is null;
customer_rows = foreach customer_trd_filter generate SUBSTRING(c_phone, 0, 2) as cntrycode, c_acctbal;

customer_result_group = group customer_rows by cntrycode;
customer_result = foreach customer_result_group generate group, COUNT(customer_rows) as numcust, SUM(customer_rows.c_acctbal) as totacctbal;
customer_result_inorder = order customer_result by group;

store customer_result_inorder into '$output/Q22out' USING PigStorage('|');

Both Pig and Hive have a place, and their own strengths. It is illustrative to compare these identical queries in the two systems, to see where you might want to handoff queries from Hive to Pig. HCatalog facilitates this handoff – as Pig can read directly from the Hive tables in via HCatLoader and HCatStorer.

Apache HBase Region Splitting and Merging

For this post, we take a technical deep-dive into one of the core areas of HBase. Specifically, we will look at how Apache HBase distributes load through regions, and manages region splitting. HBase stores rows of data in tables. Tables are split into chunks of rows called “regions”. Those regions are distributed across the cluster, hosted and made available to client processes by the RegionServer process. A region is a continuous range within the key space, meaning all rows in the table that sort between the region’s start key and end key are stored in the same region. Regions are non-overlapping, i.e. a single row key belongs to exactly one region at any point in time. A region is only served by a single region server at any point in time, which is how HBase guarantees strong consistency within a single row#. Together with the -ROOT- and .META. regions, a table’s regions effectively form a 3 level B-Tree for the purposes of locating a row within a table.

A Region in turn, consists of many “Stores”, which correspond to column families. A store contains one memstore and zero or more store files. The data for each column family is stored and accessed separately.

A table typically consists of many regions, which are in turn hosted by many region servers. Thus, regions are the physical mechanism used to distribute the write and query load across region servers. When a table is first created, HBase, by default, will allocate only one region for the table. This means that initially, all requests will go to a single region server, regardless of the number of region servers. This is the primary reason why initial phases of loading data into an empty table cannot utilize the whole capacity of the cluster.

Pre-splitting

The reason HBase creates only one region for the table is that it cannot possibly know how to create the split points within the row key space. Making such decisions is based highly on the distribution of the keys in your data. Rather than taking a guess and leaving you to deal with the consequences, HBase does provide you with tools to manage this from the client. With a process called pre-splitting, you can create a table with many regions by supplying the split points at the table creation time. Since pre-splitting will ensure that the initial load is more evenly distributed throughout the cluster, you should always consider using it if you know your key distribution beforehand. However, pre-splitting also has a risk of creating regions, that do not truly distribute the load evenly because of data skew, or in the presence of very hot or large rows. If the initial set of region split points is chosen poorly, you may end up with heterogeneous load distribution, which will in turn limit your clusters performance.

There is no short answer for the optimal number of regions for a given load, but you can start with a lower multiple of the number of region servers as number of splits, then let automated splitting take care of the rest.

One issue with pre-splitting is calculating the split points for the table. You can use the RegionSplitter utility. RegionSplitter creates the split points, by using a pluggable SplitAlgorithm. HexStringSplit and UniformSplit are two predefined algorithms. The former can be used if the row keys have a prefix for hexadecimal strings (like if you are using hashes as prefixes). The latter divides up the key space evenly assuming they are random byte arrays. You can also implement your custom SplitAlgorithm and use it from the RegionSplitter utility.

$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1

where -c 10, specifies the requested number of regions as 10, and -f specifies the column families you want in the table, separated by “:”. The tool will create a table named “test_table” with 10 regions:

13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME =&gt; 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY =&gt; '', ENDKEY =&gt; '19999999', ENCODED =&gt; acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME =&gt; 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY =&gt; '19999999', ENDKEY =&gt; '33333332', ENCODED =&gt; 37ec12df6bd0078f5573565af415c91b,}
...

If you have split points at hand, you can also use the HBase shell, to create the table with the desired split points.

hbase(main):015:0&gt; create 'test_table', 'f1', SPLITS=&gt; ['a', 'b', 'c']

or

$ echo -e  "a\nb\nc" &gt;/tmp/splits
hbase(main):015:0&gt; create 'test_table', 'f1', SPLITSFILE=&gt;'/tmp/splits'

For optimum load distribution, you should think about your data model, and key distribution for choosing the correct split algorithm or split points. Regardless of the method you chose to create the table with pre determined number of regions, you can now start loading the data into the table, and see that the load is distributed throughout your cluster. You can let automated splitting take over once data ingest starts, and continuously monitor the total number of regions for the table.

Auto splitting

Regardless of whether pre-splitting is used or not, once a region gets to a certain limit, it is automatically split into two regions. If you are using HBase 0.94 (which comes with HDP-1.2), you can configure when HBase decides to split a region, and how it calculates the split points via the pluggable RegionSplitPolicy API. There are a couple predefined region split policies: ConstantSizeRegionSplitPolicy, IncreasingToUpperBoundRegionSplitPolicy, and KeyPrefixRegionSplitPolicy.

The first one is the default and only split policy for HBase versions before 0.94. It splits the regions when the total data size for one of the stores (corresponding to a column-family) in the region gets bigger than configured “hbase.hregion.max.filesize”, which has a default value of 10GB. This split policy is ideal in cases, where you are have done pre-splitting, and are interested in getting lower number of regions per region server.

The default split policy for HBase 0.94 and trunk is IncreasingToUpperBoundRegionSplitPolicy, which does more aggressive splitting based on the number of regions hosted in the same region server. The split policy uses the max store file size based on Min (R^2 * “hbase.hregion.memstore.flush.size”, “hbase.hregion.max.filesize”), where R is the number of regions of the same table hosted on the same regionserver. So for example, with the default memstore flush size of 128MB and the default max store size of 10GB, the first region on the region server will be split just after the first flush at 128MB. As number of regions hosted in the region server increases, it will use increasing split sizes: 512MB, 1152MB, 2GB, 3.2GB, 4.6GB, 6.2GB, etc. After reaching 9 regions, the split size will go beyond the configured “hbase.hregion.max.filesize”, at which point, 10GB split size will be used from then on. For both of these algorithms, regardless of when splitting occurs, the split point used is the rowkey that corresponds to the mid point in the “block index” for the largest store file in the largest store.

KeyPrefixRegionSplitPolicy is a curious addition to the HBase arsenal. You can configure the length of the prefix for your row keys for grouping them, and this split policy ensures that the regions are not split in the middle of a group of rows having the same prefix. If you have set prefixes for your keys, then you can use this split policy to ensure that rows having the same rowkey prefix always end up in the same region. This grouping of records is sometimes referred to as “Entity Groups” or “Row Groups”. This is a key feature when considering use of the “local transactions” (alternative link) feature in your application design.

You can configure the default split policy to be used by setting the configuration “hbase.regionserver.region.split.policy”, or by configuring the table descriptor. For you brave souls, you can also implement your own custom split policy, and plug that in at table creation time, or by modifying an existing table:

HTableDescriptor tableDesc = new HTableDescriptor("example-table");
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, AwesomeSplitPolicy.class.getName());
//add columns etc
admin.createTable(tableDesc);

If you are doing pre-splitting, and want to manually manage region splits, you can also disable region splits, by setting “hbase.hregion.max.filesize” to a high number and setting the split policy to ConstantSizeRegionSplitPolicy. However, you should use a safeguard value of like 100GB, so that regions does not grow beyond a region server’s capabilities. You can consider disabling automated splitting and rely on the initial set of regions from pre-splitting for example, if you are using uniform hashes for your key prefixes, and you can ensure that the read/write load to each region as well as its size is uniform across the regions in the table.

Forced Splits

HBase also enables clients to force split an online table from the client side. For example, the HBase shell can be used to split all regions of the table, or split a region, optionally by supplying a split point.

hbase(main):024:0&gt; split 'b07d0034cbe72cb040ae9cf66300a10c', 'b'
0 row(s) in 0.1620 seconds

With careful monitoring of your HBase load distribution, if you see that some regions are getting uneven loads, you may consider manually splitting those regions to even-out the load and improve throughput. Another reason why you might want to do manual splits is when you see that the initial splits for the region turns out to be suboptimal, and you have disabled automated splits. That might happen for example, if the data distribution changes over time.

How Region Splits are implemented

As write requests are handled by the region server, they accumulate in an in-memory storage system called the “memstore”. Once the memstore fills, its content are written to disk as additional store files. This event is called a “memstore flush”. As store files accumulate, the RegionServer will “compact” them into combined, larger files. After each flush or compaction finishes, a region split request is enqueued if the RegionSplitPolicy decides that the region should be split into two. Since all data files in HBase are immutable, when a split happens, the newly created daughter regions will not rewrite all the data into new files. Instead, they will create  small sym-link like files, named Reference files, which point to either top or bottom part of the parent store file according to the split point. The reference file will be used just like a regular data file, but only half of the records. The region can only be split if there are no more references to the immutable data files of the parent region. Those reference files are cleaned gradually by compactions, so that the region will stop referring to its parents files, and can be split further.

Although splitting the region is a local decision made at the RegionServer, the split process itself must coordinate with many actors. The RegionServer notifies the Master before and after the split, updates the .META. table so that clients can discover the new daughter regions, and rearranges the directory structure and data files in HDFS. Split is a multi task process. To enable rollback in case of an error, the RegionServer keeps an in-memory journal about the execution state. The steps taken by the RegionServer to execute the split are illustrated by Figure 1. Each step is labeled with its step number. Actions from RegionServers or Master are shown in red, while actions from the clients are show in green.

1. RegionServer decides locally to split the region, and prepares the split. As a first step, it creates a znode in zookeeper under /hbase/region-in-transition/region-name in SPLITTING state.
2. The Master learns about this znode, since it has a watcher for the parent region-in-transition znode.
3. RegionServer creates a sub-directory named “.splits” under the parent’s region directory in HDFS.
4. RegionServer closes the parent region, forces a flush of the cache and marks the region as offline in its local data structures. At this point, client requests coming to the parent region will throw NotServingRegionException. The client will retry with some backoff.
5. RegionServer create the region directories under .splits directory, for daughter regions A and B, and creates necessary data structures. Then it splits the store files, in the sense that it creates two Reference files per store file in the parent region. Those reference files will point to the parent regions files.
6. RegionServer creates the actual region directory in HDFS, and moves the reference files for each daughter.
7. RegionServer sends a Put request to the .META. table, and sets the parent as offline in the .META. table and adds information about daughter regions. At this point, there won’t be individual entries in .META. for the daughters. Clients will see the parent region is split if they scan .META., but won’t know about the daughters until they appear in .META.. Also, if this Put to .META. succeeds, the parent will be effectively split. If the RegionServer fails before this RPC succeeds, Master and the next region server opening the region will clean dirty state about the region split. After the .META. update, though, the region split will be rolled-forward by Master.
8. RegionServer opens daughters in parallel to accept writes.
9. RegionServer adds the daughters A and B to .META. together with information that it hosts the regions. After this point, clients can discover the new regions, and issue requests to the new region. Clients cache the .META. entries locally, but when they make requests to the region server or .META., their caches will be invalidated, and they will learn about the new regions from .META..
10. RegionServer updates znode /hbase/region-in-transition/region-name in zookeeper to state SPLIT, so that the master can learn about it. The balancer can freely re-assign the daughter regions to other region servers if it chooses so.
11. After the split, meta and HDFS will still contain references to the parent region. Those references will be removed when compactions in daughter regions rewrite the data files. Garbage collection tasks in the master periodically checks whether the daughter regions still refer to parents files.  If not, the parent region will be removed.

Region Merges

Unlike region splitting, HBase at this point does not provide usable tools for merging regions. Although there are HMerge, and Merge tools, they are not very suited for general usage. There currently is no support for online tables, and auto-merging functionality. However, with issues like OnlineMerge, Master initiated automatic region merges, ZK-based Read/Write locks for table operations, we are working to stabilize region splits and enable better support for region merges. Stay tuned!

Conclusion

As you can see, under-the-hood HBase does a lot of housekeeping to manage regions splits and do automated sharding through regions. However, HBase also provides the necessary tools around region management, so that you can manage the splitting process. You can also control precisely when and how region splits are happening via a RegionSplitPolicy.

The number of regions in a table, and how those regions are split are crucial factors in understanding, and tuning your HBase cluster load. If you can estimate your key distribution, you should create the table with pre-splitting to get the optimum initial load performance. You can start with a lower multiple of number of region servers as a starting point for initial number of regions, and let automated splitting take over. If you cannot correctly estimate the initial split points, it is better to just create the table with one region, and start some initial load with automated splitting, and use IncreasingToUpperBoundRegionSplitPolicy. However, keep in mind that, the total number of regions will stabilize over time, and the current set of region split points will be determined from the data that the table has received so far. You may want to monitor the load distribution across the regions at all times, and if the load distribution changes over time, use manual splitting, or set more aggressive region split sizes. Lastly, you can try out the upcoming online merge feature and contribute your use case.

DataFu: The WD-40 of Big Data

If Pig is the “duct tape for big data“, then DataFu is the WD-40. Or something.

No, seriously, DataFu is a collection of Pig UDFs for data analysis on Hadoop. DataFu includes routines for common statistics tasks (e.g., median, variance), PageRank, set operations, and bag operations.

It’s helpful to understand the history of the library. Over the years, we developed several routines that were used across LinkedIn and were thrown together into an internal package we affectionately called “littlepiggy.” The unfortunate part, and this is true of many such efforts, is that the UDFs were ill-documented, ill-organized, and easily got broken when someone made a change. Along came PigUnit, which allowed UDF testing, so we spent the time to clean up these routines by adding documentation and rigorous unit tests. From this “datafoo” package, we thought this would help the community at large, and there you have DataFu.

So what can this library do for you? Let’s look at one of the classical examples that showcase the power and flexibility of Pig: sessionizing a click steam.

A = load ‘clicks’;
B = group A by user;
C = foreach B {
  C1 = order A by timestamp;
  generate user, Sessonize(C1);
}
D = group C by session_id;
E = foreach D generate group as session_id, (MAX(C.timestamp) - MIN(C.timestamp)) as session_length;
F = group E all;
G = foreach F generate
  AVG(E.session_length) as avg_session_length,
  SQRT(VAR(E.session_length)) as sd_session_length,
  MEDIAN(E.session_length) as median_session_length,
  Q75(E.session_length) as session_length_75pct,
  Q90(E.session_length) as session_length_90pct,
  Q95(E.session_length) as session_length_95pct;

(In fact, this is basically the example for the Accumulator interface that was added in Pig 0.6.)

Here, we’re just computing some summary statistics on a sessionized click stream. Pig does the heavy lifting of transforming your query into MapReduce goodness, but DataFu fills in the gaps by providing the missing routines for every italicized function.

You can download DataFu at http://data.linkedin.com/opensource/datafu.

You can grab sample data and code you can run on your own for this sessionization example below.

Read More

Go to page:12345...10...Last »