Get fresh updates from Hortonworks by email

Once a month, receive latest insights, trends, analytics information and knowledge of Big Data.

cta

Get Started

cloud

Ready to Get Started?

Download sandbox

How can we help you?

closeClose button
June 07, 2016
prev slideNext slide

Spark-on-HBase: DataFrame based HBase connector

We are proud to announce the technical preview of Spark-HBase Connector, developed by Hortonworks working with Bloomberg.

The Spark-HBase connector leverages Data Source API (SPARK-3247) introduced in Spark-1.2.0.  It bridges the gap between the simple HBase Key Value store and complex relational SQL queries and enables users to perform complex data analytics on top of HBase using Spark. An HBase DataFrame is a standard Spark DataFrame, and is able to interact with any other data sources such as Hive, ORC, Parquet, JSON, etc.

Background

There are several open source Spark HBase connectors available either as Spark packages, as independent projects or in HBase trunk.

Spark has moved to the Dataset/DataFrame APIs, which provides built-in query plan optimization. Now, end users prefer to use DataFrames/Datasets based interface.

The HBase connector in the HBase trunk has a rich support at the RDD level, e.g. BulkPut, etc, but its DataFrame support is not as rich. HBase trunk connector relies on the standard HadoopRDD with HBase built-in TableInputFormat has some performance limitations. In addition, BulkGet performed in the the driver may be a single point of failure.

There are some other alternative implementations. Take Spark-SQL-on-HBase as an example. It applies very advanced custom optimization techniques by embedding its own query optimization plan inside the standard Spark Catalyst engine, ships the RDD to HBase and performs complicated tasks, such as partial aggregation, inside the HBase coprocessor. This approach is able to achieve high performance, but it difficult to maintain due to its complexity and the rapid evolution of Spark. Also allowing arbitrary code to run inside a coprocessor may pose security risks.

The Spark-on-HBase Connector (SHC) has been developed to overcome these potential bottlenecks and weaknesses. It implements the standard Spark Datasource API, and leverages the Spark Catalyst engine for query optimization.  In parallel, the RDD is constructed from scratch instead of using TableInputFormat in order to achieve high performance. With this customized RDD, all critical techniques can be applied and fully implemented, such as partition pruning, column pruning, predicate pushdown and data locality. The design makes the maintenance very easy, while achieving a good tradeoff between performance and simplicity.

Architecture

We assume Spark and HBase are deployed in the same cluster, and Spark executors are co-located with region servers, as illustrated in the figure below.

age

Figure 1. Spark-on-HBase Connector Architecture

At a high-level, the connector treats both Scan and Get in a similar way, and both actions are performed in the executors. The driver processes the query, aggregates scans/gets based on the region’s metadata, and generates tasks per region. The tasks are sent to the preferred executors co-­located with the region server, and are performed in parallel in the executors to achieve better data locality and concurrency. If a region does not hold the data required, that region server is not assigned any task. A task may consist of multiple Scans and BulkGets, and the data requests by a task is retrieved from only one region server, and this region server will also be the locality preference for the task. Note that the driver is not involved in the real job execution except for scheduling tasks. This avoids the driver being the bottleneck.

Table Catalog

To bring the HBase table as a relational table into Spark, we define a mapping between HBase and Spark tables, called Table Catalog. There are two critical parts of this catalog. One is the rowkey definition and the other is the mapping between table column in Spark and the column family and column qualifier in HBase. Please refer to the Usage section for details.

Native Avro support

The connector supports the Avro format natively, as it is a very common practice to persist structured data into HBase as a byte array. User can persist the Avro record into HBase directly. Internally, the Avro schema is converted to a native Spark Catalyst data type automatically. Note that both key-value parts in an HBase table can be defined in Avro format. Please refer to the examples/test cases in the repo for exact usage.

Predicate Pushdown

The connector only retrieves required columns from region server to reduce network overhead and avoid redundant processing in Spark Catalyst engine. Existing standard HBase filters are used to perform predicate push-down without leveraging the coprocessor capability. Because HBase is not aware of the data type except for byte array, and the order inconsistency between Java primitive types and byte array,  we have to preprocess the filter condition before setting the filter in the Scan operation to avoid any data loss. Inside the region server, records not matching the query condition are filtered out.

Partition Pruning

By extracting the row key from the predicates, we split the Scan/BulkGet into multiple non-overlapping ranges, only the region servers that has the requested data will perform Scan/BulkGet. Currently, the partition pruning is performed on the first dimension of the row keys. For example, if a row key is “key1:key2:key3”, the partition pruning will be based on “key1” only. Note that the WHERE conditions need to be defined carefully. Otherwise, the partition pruning may not take effect. For example, WHERE rowkey1 > “abc” OR column = “xyz” (where rowkey1 is the first dimension of the rowkey, and column is a regular hbase column) will result in a full scan, as we have to cover all the ranges because of the OR logic.

Data Locality

When a Spark executor is co-located with HBase region servers, data locality is achieved by identifying the region server location, and makes best effort to co-locate the task with the region server. Each executor performs Scan/BulkGet on the part of the data co-located on the same host.

Scan and BulkGet

These two operators are exposed to users by specifying WHERE CLAUSE, e.g., WHERE column > x and column < y for scan and WHERE column = x for get. The operations are performed in the executors, and the driver only constructs these operations. Internally they are converted to scan and/or get, and Iterator[Row] is returned to catalyst engine for upper layer processing.

Usage

The following illustrates the basic procedure on how to use the connector. For more details and advanced use case, such as Avro and composite key support, please refer to the examples in the repository.

1) Define the catalog for the schema mapping:

def catalog = s"""{
         |"table":{"namespace":"default", "name":"table1"},
         |"rowkey":"key",
         |"columns":{
           |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
           |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
           |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
           |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
           |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
           |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
           |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
           |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
           |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
         |}
       |}""".stripMargin

 

2) Prepare the data and populate the HBase table:
case class HBaseRecord(col0: String, col1: Boolean,col2: Double, col3: Float,col4: Int,       col5: Long, col6: Short, col7: String, col8: Byte)

object HBaseRecord {def apply(i: Int, t: String): HBaseRecord = { val s = s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i: $t”,      i.toByte) }}

val data = (0 to 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark.sql.execution.datasources.hbase”)
 .save()
 
3) Load the DataFrame:
def withCatalog(cat: String): DataFrame = {
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format(“org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df = withCatalog(catalog)

4) Language integrated query:
val s = df.filter((($”col0″ <= “row050″ && $”col0” > “row040”) ||
 $”col0″ === “row005” ||
 $”col0″ === “row020” ||
 $”col0″ ===  “r20” ||
 $”col0″ <= “row005”) &&
 ($”col4″ === 1 ||
 $”col4″ === 42))
 .select(“col0”, “col1”, “col4”)
s.show

5) SQL query:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Configuring Spark-Package

Users can use the Spark-on-HBase connector as a standard Spark package. To include the package in your Spark application use:

spark-shell, pyspark, or spark-submit

> $SPARK_HOME/bin/spark-shell –packages zhzhan:shc:0.0.11-1.6.1-s_2.10

Users can include the package as the dependency in your SBT file as well. The format is the spark-package-name:version

spDependencies += “zhzhan/shc:0.0.11-1.6.1-s_2.10

Running in Secure Cluster

For running in a Kerberos enabled cluster, the user has to include HBase related jars into the classpath as the HBase token retrieval and renewal is done by Spark, and is independent of the connector. In other words, the user needs to initiate the environment in the normal way, either through kinit or by providing principal/keytab.  The following examples show how to run in a secure cluster with both yarn-client and yarn-cluster mode. Note that SPARK_CLASSPATH has to be set for both modes, and the example jar is just a placeholder for Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase-client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Suppose hrt_qa is a headless acount, user can use following command for kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase-site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Putting It All Together

We’ve just given a quick overview of how HBase supports Spark at the DataFrame level.  With the DataFrame API Spark applications can work with data stored in HBase table as easily as any data stored in other data sources. With this new feature, data in HBase tables can be easily consumed by Spark applications and other interactive tools, e.g. users can run a complex SQL query on top of an HBase table inside Spark, perform a table join against Dataframe, or integrate with Spark Streaming to implement a more complicated system.

What’s Next?

Currently, the connector is hosted in Hortonworks repo, and published as a Spark package. It is in the process of being migrated to Apache HBase trunk. During the migration, we identified some critical bugs in the HBase trunk, and they will be fixed along with the merge.  The community work is tracked by the umbrella HBase JIRA HBASE-14789, including HBASE-14795 and HBASE-14796  to optimize the underlying computing architecture for Scan and BulkGet,  HBASE-14801 to provide JSON user interface for ease of use, HBASE-15336 for the DataFrame writing path, HBASE-15334 for Avro support, HBASE-15333  to support Java primitive types, such as short, int, long, float, and double, etc., HBASE-15335 to support composite row key, and HBASE-15572 to add optional timestamp semantics. We look forward to producing a future version of the connector which makes the connector even easier to work with.

Acknowledgement

We want to thank Hamel Kothari, Sudarshan Kadambi and Bloomberg team in guiding us in this work and also helping us validate this work. We also want to thank HBase community for providing their feedback and making this better. Finally, this work has leveraged the lessons from earlier Spark HBase integrations and we want to thanks their developers for paving the road.

Reference:

SHC: https://github.com/hortonworks/shc

Spark-package: http://spark-packages.org/package/zhzhan/shc

Apache HBase:  https://hbase.apache.org/

Apache Spark: http://spark.apache.org/

Comments

  • Why don’t you use Hive+HBaseSerDe to get DataFrame from HBase? CatalogSchema is very simmilar to a HBase external table definition (with HBase SerDe) in Hive. Don’t you think a better way would be to improve existing SparkSql functionality?

    • In this implementation, we have construct HBaseRDD from the scratch. The reason is for scalability and it better fit into the Spark catalyst engine. Please refer to the architecture part of the blog.

    • The SparkOnHBase project in Cloudera provide support for spark hbase integration on RDD level. This one is mainly trying to support the integration on DataFrame level. The majority of the code also is merging into the Apache HBase trunk.

    • We didn’t tested it with pyspark, but I assume it should work following existing pyspark data source examples. If you hit any issue, please let me know.

      • Hi Zhan,

        I would love to use this connector in a project. I too am using PySpark. I’m not sure how to find the “existing pyspark data source examples” though.

        I’ve looked at the examples at http://spark.apache.org/docs/latest/sql-programming-guide.html but it’s not clear from the short examples given for the ‘spark.read.load()’ function how to use a custom data source. Are you able to give any pointers?

        With kind regards,

        Will

        • I’m part of the way there. I launch spark-submit with “–packages zhzhan:shc:0.0.11-1.6.1-s_2.10″

          Then include the following code:

          tbl_map = ”'{“table”: {“namespace”: “default”, “name”: “TABLE_NAME”},
          “rowkey”: “KEY_NAME”,
          “columns”: {“KEY_NAME”: {“cf”: “rowkey”, “col”: “key”, “type”: “string”},
          “COL_NAME”: {“cf”: “COL_FAMILY”, “col”: “COL_NAME”, “type”: “string”}}}”’

          df = sqlContext.read \
          .format(“org.apache.spark.sql.execution.datasources.hbase”) \
          .option(“catalog”, tbl_map) \
          .option(“zkUrl”, “master-1.local:2181:/hbase-unsecure”) \
          .load()

          But I get the error:

          File “/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”, line 123, in load
          File “/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py”, line 538, in __call__
          File “/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”, line 36, in deco
          File “/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py”, line 300, in get_return_value
          py4j.protocol.Py4JJavaError: An error occurred while calling o54.load.
          : java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/util/DataTypeParser$

          Unfortunately PySpark 1.5.2 API does not contain the “catalyst” library so I can’t import it and resolve the issue. Is there a way around this?

          With thanks,

          Will

  • the examples run throw exception “scala.reflect.api.JavaUniverse.runtimeMirror”! get the solution is change scala version to 2.10.6, the offical scala version is 2.11.

  • Any plans to support it on scala version 2.11? I’m trying to choose a spark-hbase connector for my project which uses scala-2.11. This reply would help me choose.

  • I can use this to read a table from hbase just fine that has multiple columns in a column family:
    def empcatalog = s”””{
    |”table”:{“namespace”:”default”, “name”:”emp”},
    |”rowkey”:”key”,
    |”columns”:{
    |”empNumber”:{“cf”:”rowkey”, “col”:”key”, “type”:”string”},
    |”city”:{“cf”:”personal data”, “col”:”city”, “type”:”string”},
    |”empName”:{“cf”:”personal data”, “col”:”name”, “type”:”string”},
    |”jobDesignation”:{“cf”:”professional data”, “col”:”designation”, “type”:”string”},
    |”salary”:{“cf”:”professional data”, “col”:”salary”, “type”:”string”}
    |}
    |}”””.stripMargin

    If I try to write out a new set using the dataframe, I get java.lang.IllegalArgumentException: Family ‘personal data’ already exists so cannot be added
    at org.apache.hadoop.hbase.HTableDescriptor.addFamily(HTableDescriptor.java:829)

    • I figured out by changing the savemode to “append” I can save to an existing table. But if you are trying to write out to hbase initially with a catalog with more than one column assigned to a column family you will get the Family “<column family" already exists so cannot be added error.

  • Hi , I am using Spark 2.0 and Scala 2.11 but the above code is not working. Please give the guidance how should i get any example code for Spark 2.0 and Hbase.

  • Connector SHC is developed on the base of Spark 1.4.1 and will not run on latest versions.

    Is it planned to add support at least for the current version of Spark on HDP (1.6.2)?

    When such update could be released?

  • I don’t have responses for recent questions, but I recommend posting your question on the Hortonworks Community Connection, community.hortonworks.com. Lots of good Spark discussion over there.

  • First, Hbase has the feature for holding unstructed data, I mean in declaration time we can define columnfamilies and can define column at runtime. However, does your current implementation support only structured data?

    Second, if I have a Hbase table Person, with person_id as rowkey and 2 column families personal_data and professional_data, then can the following catalog schema valid:

    def catalog = s”””{
    |”table”:{“namespace”:”default”, “name”:”person”},
    |”rowkey”:”person_id”,
    |”columns”:{
    |”col0″:{“cf”:”rowkey”, “col”:”person_id”, “type”:”int”},
    |”personalname″:{“cf”:”personal_data″, “col”:”name″, “type”:”string”},
    |”personalage″:{“cf”:”personal_data″, “col”:”age″, “type”:”int”},
    |”personalweight″:{“cf”:”personal_data″, “col”:”weight″, “type”:”double”},
    |”professionalname″:{“cf”:”professional_data″, “col”:”name″, “type”:”string”},
    |”professionalDesig″:{“cf”:”professional_data″, “col”:”designation″, “type”:”string”}
    |}
    |}”””.stripMargin

  • Currently we use this connector in our project. We assumed that connection is being opened an closed automatically by the connector. But when we ran our job in production, we see that the connections are not closed for every. Do we have any way or method to close the connection once we are done with our task.

    Requesting for an immediate help.

  • I`m using apache spark 2.0 and start to use your shc connector to get data from HBase. After some hours I got an example running. To improve the performance I would like to use HBase Filters like FuzzyRowFilter which will be executed on HBase site. Since I can not find any shc examples for these HBase filter definitions please could you provide an example how to define and use HBase Filter in shc. Thanks!

    • Waras,
      Would you mind to share a sample of your code? i tried, but keep getting this error: java.lang.NullPointerException

  • I have two HDP clusters, the first named A and have Hbase on it and the second named B have spark on it, can i use B-Spark access A-Hbase? i don’t find the hard code hbase config in the example.

  • Does this connector supports HBase lookups like get operations on a particular rowkey or it only supports scan operations on whole table?

  • Need a example to connect hbase with spark in detail. when a table is in already in hbase and we want to write a sql query on hbase table what we have to do? I am new to spark and hbase. so can you please provide a explanation with example.

    Thanks

  • scala> val results = sql(“SELECT * FROM tablename”);
    results: org.apache.spark.sql.DataFrame = [hbid: string, matrix_col: string, matrix_value_col: double, country_col: string]
    scala> results.show();
    org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
    Tue Jul 25 10:03:27 SGT 2017, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68549: row ‘nrd_app_spt:capacity_new,,00000000000000’ on table ‘hbase:meta’ at region=hbase:meta,,1.1588230740, hostname=x01shdpeapp3a.sgp.dbs.com,60020,1500273862255, seqNum=0
    Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=68549: row ‘nrd_app_spt:capacity_new,,00000000000000’ on table ‘hbase:meta’ at region=hbase:meta,,1.1588230740, hostname=x01shdpeapp3a.sgp.dbs.com,60020,1500273862255, seqNum=0
    Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Call to x01shdpeapp3a.sgp.dbs.com/10.92.139.145:60020 failed on local exception: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to x01shdpeapp3a.sgp.dbs.com/10.92.139.145:60020 is closing. Call id=9, waitTime=3

  • Hi thanks you for this post . I want to know if we have Column Qualifier which are dynamic. How to deal with this situation ?

  • Leave a Reply

    Your email address will not be published. Required fields are marked *