Query Federation with Apache Hive

Query Federation with Apache Hive

This blog post was published on Hortonworks.com before the merger with Cloudera. Some links, resources, or references may no longer be accurate.

Organizations commonly use a plethora of data storage and processing systems today. These different systems offer cost-effective performance for their respective use cases. Besides traditional RDBMSs such as Oracle DB, Teradata, or PostgreSQL, teams use Apache Kafka for streams and events data, Apache Druid for real-time series data, and Apache Phoenix for quick index lookups. Additionally, they may store their data in bulk using Cloud storage services or HDFS.

Platform teams deploy all these systems side-by-side to give application developers the flexibility to choose depending on the capabilities they need to satisfy their business’ analytical requirements.

Unifying access with Apache Hive 3.0 & SQL

Apache Hive plays a critical role in the environment describe above. More specifically, it can access the aforementioned systems and provide efficient, unified SQL access to them – out of the box. The benefits of this are enormous:

  • Single SQL dialect and API
  • Central security controls and audit trail
  • Unified governance
  • Ability to combine data from multiple sources
  • Data independence

Setting up and using Apache Hive as a federation layer simplifies application development and data access. In particular, all data can be processed using SQL and can be accessed through the same familiar JDBC/ODBC interfaces.

On top of that, since all data access goes through a single intermediary, Hive provides unified security controls (table, row and column access restrictions), lineage tracking, and audit trails for all the interactions with the different systems.

Finally, you also get a measure of data independence: Changing storage or processing system becomes easier if all access is abstracted through SQL tables and views.

Cost-based optimizer for intelligent push-down

Assume you want to execute a Hive query that accesses data from an external RDBMS behind a JDBC connection. A possible naïve way of doing this would treat the JDBC source as a “dumb” storage system, reading all the raw data over JDBC and processing it in Hive. In this case you would ignore the query capabilities of the RDBMS and pull too much data over the JDBC link, thus ending up with poor performance and an overloaded system.

For that reason, Hive implements smart push-down to other systems by relying on its storage handler interfaces and cost-based optimizer (CBO) powered by Apache Calcite. In particular, Calcite provides rules that match a subset of operators in the logical representation of the query and generates a new equivalent representation with more operations executed in the external system. Hive includes those rules that push computation to the external systems in its query planner, and then relies on Calcite to generate a valid query in the language that those systems support. The storage handler implementations are responsible to send the generated query to the external system, retrieve its results, and transform the incoming data into Hive internal representation so it can be processed further if needed.

This isn’t limited to SQL systems: Apache Hive can federate computation smartly to Apache Druid or Apache Kafka as well, for instance. As we have described in a recent blog post, Druid is very efficient for processing rollup and filtering operations on time-series data. Therefore, when a query is executed over a source stored in Druid, Hive may push filtering and aggregation to Druid, generating and sending JSON queries to the REST API exposed by the engine. On the other hand, when it comes to Kafka, Hive can push filters on partitions or offsets to read data selectively from the topics in the system.

Federation to JDBC sources

The flexibility and power of the storage handlers and Calcite adapters combination stand out especially when Hive acts as a federation layer to different SQL systems.

Apache Hive 3 includes a new implementation of the JDBC storage handler and enables JDBC adaptor rules from Calcite to push computation selectively to JDBC sources such as MySQL, PostgreSQL, Oracle, or Redshift. Hive can push a large variety of operations, including projections, filters, joins, aggregations, unions, sorting and limit.

More importantly, Calcite adapts its behavior depending on the system behind the JDBC source. For instance, it can recognize that a certain function used in a filter condition in the query is not supported by the JDBC source. In that case, it will execute it in Hive, while still pushing the rest of supported filter conditions to the JDBC system. Besides, Hive can generate SQL using different dialects depending on the system present behind the JDBC connection.

Another critical feature is that the JDBC storage handler can split the query into multiple subqueries that would be sent to the engine in parallel in order to speedup reading large amounts of data from the sources.

The following figure depicts an example illustrating Hive’s new JDBC pushdown capabilities. Observe that the SQL queries generated for MySQL and PostgreSQL defer in formatting.

Still to come: Automatic metadata mapping

To query data in other systems, a user just needs to declare a Hive external table using the corresponding storage handler and possibly passing some additional information that is relevant to that system. For instance, following with the JDBC example, if a user wants to declare a JDBC source pointing to a table ‘item’ in database ‘organization’ in PostgreSQL, he would execute the following statement:

While writing the previous statement is fairly straightforward, our longer term vision to discover and query external sources through Hive goes far beyond that.

As shown in above example, the create table currently requires requires schema of the JDBC table to be specified. HIVE-21060 tracks the work to autodiscover schema for an JDBC based external table, so that it doesn’t have to be stated in create table command.

HIVE-21059 tracks the development of external catalog support. External catalog would allow creation of a new catalog in metastore that points to an external mysql database, and through this catalog, all tables under it will be automatically available for querying from hive.

So, what do you do with it?

This federation concept gives you a lot of flexibility, and here are some examples of what you can do with it:

  • Combining best tools for the job: Applications developers can pick multiple data processing systems and access all of them with a single interface. No need to learn different query dialects or interfaces. No need to worry about security and governance either, it’s being taken care of by Hive. This simplifies integration immensely.
  • Transform & write-back: With these capabilities you can transform data residing outside of Hive using Hive SQL. For instance, you can query Kafka, transform the data and then write it back out to Kafka.
  • Simplifying data-loads: You can now directly query other systems from Hive, clean your data, enhance it and merge it into Hive tables without deploying any other tools. ETL becomes much easier.
  • Query across multiple systems: For certain ad-hoc queries or small reports it is not practical to define an ETL process and move all the data to the same place. With these new capabilities you can just write queries that combine a number of different data sources directly.

We hope that this gives you some ideas to go and try it out for yourself!

Gunther Hagleitner
VP of Engineering
More by this author

1 Comments

by Sam on

Does Apache Hive JDBC Storage Handler support Teradata. I tried adding the teradata jdbc and teradata config drivers and tried creating the external table in Hive using storage handler. The external table got created without any problem, but facing errors while trying to query it. Any help will be greatly appreciated

Details below:

CREATE EXTERNAL TABLE td_tab1(
id STRING,
nm STRING
)
STORED BY ‘org.apache.hadoop.hive.jdbc.storagehandler.JdbcStorageHandler’
TBLPROPERTIES (
“mapred.jdbc.driver.class”=”com.teradata.jdbc.TeraDriver”,
“mapred.jdbc.url”=”jdbc:teradata://stcprd/CLIENT_CHARSET=GBK,TMODE=TERA,CHARSET=ASCII,database=dbname,LOB_Support=OFF”,
“mapred.jdbc.username”=”user”,
“mapred.jdbc.input.table.name”=”tab1”,
“mapred.jdbc.output.table.name”=”tab2”,
“mapred.jdbc.password”=”password”,
“mapred.jdbc.hive.lazy.split”= “true”
);

OK

hive (dp_mstr_temp)> select * from td_tab1;
OK

Failed with exception java.io.IOException:java.io.IOException: SQLException in nextKeyValue

Leave a comment

Your email address will not be published. Links are not permitted in comments.