Spark HBase Connector – A Year in Review

Spark HBase Connector – A Year in Review

This blog post was published on Hortonworks.com before the merger with Cloudera. Some links, resources, or references may no longer be accurate.

In 2016, we published the second version v1.0.1 of Spark HBase Connector (SHC). In this blog, we will go through the major features we have implemented this year.

Support Phoenix coder

SHC can be used to write data out to HBase cluster for further downstream processing. It supports Avro serialization for input and output data and defaults to a custom serialization using a simple native encoding mechanism. When reading input data, SHC pushes down filters to HBase for efficient scans of data. Given the popularity of Phoenix data in HBase, it seems natural to support Phoenix data as input to HBase in addition to Avro data. Also, defaulting to the simple native binary encoding seems susceptible to future changes and is a risk for users who write data from SHC into HBase. For example, with SHC going forward, backwards compatibility needs to be properly handled. So the default, SHC needs to change to a more standard and well tested format like Phoenix.

For the composite key support, prior to this feature, the value length of each dimension was required to be fixed – with the exception of last dimension of composite key. This limitation has been removed by Phoenix coder. Currently, if users choose Phoenix as the data coder, they do not need to specify the length of each part of the composite key in the catalog.

As Phoenix is the default coder, the only change for the users is that if they want to use PrimitiveType as the data coder, they need to specify “tableCoder”:”PrimitiveType” in their catalogs to notify SHC that they want to use PrimitiveType instead of Phoenix as “tableCoder”.

def catalog = s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”},
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf2″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf3″, “col”:”col3″, “type”:”float”},
|”col4″:{“cf”:”cf4″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf5″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf6″, “col”:”col6″, “type”:”smallint”},
|”col7″:{“cf”:”cf7″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf8″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin

Cache Spark HBase Connections

SHC did not cache connection objects to HBase before. Specifically, the call to ‘ConnectionFactory.createConnection’ was done each time when SHC needed to visit HBase tables and regions. Users could see this simply by looking at the executor logs and observing zookeeper connections being established for each request. In the documentation of interface Connection, it says that the connection creation is a heavy-weight operation and connection implementations are thread-safe. Hence, for long-lived processes, it would be very useful for SHC to keep a connection cached. With this feature, SHC decreases the number of connections created drastically, and greatly improves its performance in the process.

Support Duplicated Column Families

SHC has supported duplicated column families support. Now users can define their catalogs like this:

def catalog = s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey”:”key”,
|”columns”:{
|”col0″:{“cf”:”rowkey”, “col”:”key”, “type”:”string”},
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf1″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf1″, “col”:”col3″, “type”:”float”},
|”col4″:{“cf”:”cf2″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf2″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf3″, “col”:”col6″, “type”:”smallint”},
|”col7″:{“cf”:”cf3″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf3″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin

In the catalog definition above, column ‘col0’, ‘col1’ and ‘col2’ have the same column family ‘cf1’.

Use Spark UnhandledFilters API

SHC has also implemented the Spark API unhandledFilters, which is an effective optimization. This API tells Spark about filters SHC aren’t implementing as opposed to returning all the filters. The previous behavior, in this case, was to re-apply all the filters once data is pulled in Spark. This should be idempotent, so doesn’t change any data, but can be expensive if the filters are complicated.

SHC Community

SHC community is bigger and more influential than one year ago. In 2016, we gave talks in Hadoop Summit and in HBase/Spark meetup, and wrote detailed blogs. With the number of SHC users increasing, we are receiving a higher number of user questions. We are very happy to see increased adoption of SHC and if you have any thoughts about how to further improve it, please provide us feedback via Hortonworks Community Connection.

ACKNOWLEDGEMENT

We want to thank Bloomberg team in guiding us in this work and also helping us validate this work. We also want to thank HBase community for providing their feedback and making this better. Finally, this work has leveraged the lessons from earlier Spark HBase integrations and we want to thanks their developers for paving the road.

REFERENCE:

SHC: https://github.com/hortonworks-spark/shc

Apache HBase:  https://hbase.apache.org/

Apache Spark: http://spark.apache.org/

Apache Phoenix: https://phoenix.apache.org/

Weiqing Yang
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.