Hadoop | Scala | MapReduce | Open Source | HDFS | Apache Spark Lighting a Spark with HBase January 25, 2014
Neil Chaudhuri (He/Him)

Neil Chaudhuri (He/Him)

LinkedIn Neil Chaudhuri (He/Him)

Although most developers and users are still feeling their way through Hadoop and (more specifically MapReduce), the truth is Google wrote that paper in 2004. That’s ten years ago! Million Dollar Baby won Best Picture that year. Yeah! by Usher, Lil John and Ludacris topped the charts in the United States. And Facebook had only just started to kill work productivity and violate your privacy.

As long ago as that feels, it is an eternity in technology. Google has of course moved on way past MapReduce to things like Dremel. The rest of us aren’t moving so quickly, but that doesn’t mean we still need to plod along writing low-level MapReduce code.

Several abstractions have come along to make cloud-scale analytics easier. One of them is Apache Spark. Spark was originally created by AMPLab at UC Berkeley and is now in incubation at Apache. It utilizes cluster memory to minimize the disk reads and writes that slow down MapReduce. Spark is written in Scala and exposes both a Scala and Java API. Though as I’ve said before, once you get past the learning curve, I don’t see how anyone would prefer Java to Scala for analytics.

Please take a look at the Spark documentation to learn about the RDD abstraction. RDD’s are basically fancy arrays. You load data into them and perform whatever combination of operations—maps, filters, reducers, etc.— you want. RDD’s make analytics so much more fun to write than canonical MapReduce. In the end, Spark doesn’t just run faster; it lets us write faster.

The web has a bunch of examples of using Spark with Hadoop components like HDFS and Hive (via Shark, also made by AMPLab), but there is surprisingly little on using Spark to create RDD’s from HBase, the Hadoop database. If you don’t know HBase, check out this excellent presentation by Ian Varley. It’s just a cloud-scale key-value store.

The Spark Quick Start documentation says, “RDDs can be created from Hadoop InputFormats.” You may know that InputFormat is the Hadoop abstraction for anything that can be processed in a MapReduce job. As it turns out, HBase uses a TableInputFormat, so it should be possible to use Spark with HBase.

It turns out that it is.

As the Scaladoc for RDD shows, there are numerous concrete RDD implementations—each best suited for different situations. The NewHadoopRDD is great for reading data stored in later versions of Hadoop, which is exactly what we need here.

Check out this code.

val sparkContext = new SparkContext("local", "Simple App")
val hbaseConfiguration = (hbaseConfigFileName: String, tableName: String) => {
  val hbaseConfiguration = HBaseConfiguration.create()
  hbaseConfiguration.addResource(hbaseConfigFileName)
  hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, tableName)
  hbaseConfiguration
  }
val rdd = sparkContext.newAPIHadoopRDD(
  hbaseConfiguration("/path/to/hbase-site.xml", "table-with-data"),
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result]
)
import scala.collection.JavaConverters._
rdd
  .map(tuple => tuple._2)
  .map(result => result.getColumn("columnFamily".getBytes(), "columnQualifier".getBytes()))
  .map(keyValues => {
  keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue
})

Once we instantiate the SparkContext for the local machine, we write an anonymous function to create an HBaseConfiguration, which will enable us to add the HBase configuration files to any Hadoop ones we have as well as the name of the table we want to read. That’s an HBase thing—not a Spark thing.

Then we create an instance of NewHadoopRDD with the SparkContext instance and three Java Class objects related to HBase:

  • One for the InputFormat, TableInputFormat, as mentioned earlier
  • One for the key type, which on a table scan is always ImmutableBytesWritable
  • One for the value type, which on a table scan is always Result

Then we call our anonymous function with the location of hbase-site.xml and the name of the table we want to read, table-with-data, to provide Spark with the necessary HBase configuration to construct the RDD. Once we have the RDD, we can perform all the usual operations on HBase we usually see with the more conventional usage of RDDs in Spark.

The code example does feature some HBase-specific operations though. When the RDD is constructed, it loads all the data in table-with-data as (ImmutableBytesWritable, Result) tuples—key-value pairs as you would expect. For each one of these pairs, we grab the second item in the tuple (the Result) and get the column from it marked with a specific column family and column qualifier.

As advocated by big data overlord Nathan Marz, HBase has no notion of deletes; new values are just appended. Consequently, each column is really a collection, and after converting the Java collection returned by the HBase API into a Scala collection, the last map call extracts the latest—and therefore the “right”—KeyValue. We now have a collection of all the current data in each row.

If that is unclear, let’s summarize the mapping operations this way:

  • Load an RDD of (ImmutableBytesWritable, Result) tuples from the table
  • Transform previous into an RDD of Result’s
  • Transform previous into an RDD of collections of KeyValues (where the latest one is what we want). Collection of collections? Oh no!
  • Transform previous into an RDD of byte[] (binary representations of the current data)

So using Spark, we can read the contents of an entire HBase table and perform one transformation after another—maybe along with some filters, aggregations, and other operations—to do ALL THE ANALYTICS.

Astute or experienced readers might have noticed that getColumn call in the second map operation is actually deprecated in favor of getColumnCells, which returns a collection of Cells ordered by timestamp. That’s the superior approach. I just used getColumn as an excuse to demonstrate Spark’s ability to apply anonymous functions within RDD operations in the next map call.

The next time some analytics are in order, don’t bother with old-school MapReduce. The higher level of abstraction over anything with an InputFormat, including HBase, makes Spark a great choice for Hadoop analytics.