Hadoop | Scala | MapReduce | Open Source | HDFS | Apache Spark Lighting a Spark with HBase January 25, 2014 Neil Chaudhuri (He/Him)
LinkedIn
Neil Chaudhuri (He/Him)
Although most developers and users are still feeling their way through Hadoop and (more specifically MapReduce), the truth is Google wrote that paper in 2004. That’s ten years ago! Million Dollar Baby won Best Picture that year. Yeah! by Usher, Lil John and Ludacris topped the charts in the United States. And Facebook had only just started to kill work productivity and violate your privacy.
As long ago as that feels, it is an eternity in technology. Google has of course moved on way past MapReduce to things like Dremel. The rest of us aren’t moving so quickly, but that doesn’t mean we still need to plod along writing low-level MapReduce code.
Several abstractions have come along to make cloud-scale analytics easier. One of them is Apache Spark. Spark was originally created by AMPLab at UC Berkeley and is now in incubation at Apache. It utilizes cluster memory to minimize the disk reads and writes that slow down MapReduce. Spark is written in Scala and exposes both a Scala and Java API. Though as I’ve said before, once you get past the learning curve, I don’t see how anyone would prefer Java to Scala for analytics.
Please take a look at the Spark documentation to learn about the RDD abstraction. RDD’s are basically fancy arrays. You load data into them and perform whatever combination of operations—maps, filters, reducers, etc.— you want. RDD’s make analytics so much more fun to write than canonical MapReduce. In the end, Spark doesn’t just run faster; it lets us write faster.
The web has a bunch of examples of using Spark with Hadoop components like HDFS and Hive (via Shark, also made by AMPLab), but there is surprisingly little on using Spark to create RDD’s from HBase, the Hadoop database. If you don’t know HBase, check out this excellent presentation by Ian Varley. It’s just a cloud-scale key-value store.
The Spark Quick Start documentation says, “RDDs can be created from Hadoop InputFormats.” You may know that InputFormat is the Hadoop abstraction for anything that can be processed in a MapReduce job. As it turns out, HBase uses a TableInputFormat, so it should be possible to use Spark with HBase.
It turns out that it is.
As the Scaladoc for RDD
shows, there are numerous concrete RDD
implementations—each best suited for different situations. The
NewHadoopRDD
is great for reading data stored in later versions of Hadoop, which is exactly what we need here.
Check out this code.
val sparkContext = new SparkContext("local", "Simple App")
val hbaseConfiguration = (hbaseConfigFileName: String, tableName: String) => {
val hbaseConfiguration = HBaseConfiguration.create()
hbaseConfiguration.addResource(hbaseConfigFileName)
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, tableName)
hbaseConfiguration
}
val rdd = sparkContext.newAPIHadoopRDD(
hbaseConfiguration("/path/to/hbase-site.xml", "table-with-data"),
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
import scala.collection.JavaConverters._
rdd
.map(tuple => tuple._2)
.map(result => result.getColumn("columnFamily".getBytes(), "columnQualifier".getBytes()))
.map(keyValues => {
keyValues.asScala.reduceLeft {
(a, b) => if (a.getTimestamp > b.getTimestamp) a else b
}.getValue
})
Once we instantiate the SparkContext for the local machine, we write an anonymous function to create an HBaseConfiguration, which will enable us to add the HBase configuration files to any Hadoop ones we have as well as the name of the table we want to read. That’s an HBase thing—not a Spark thing.
Then we create an instance of NewHadoopRDD
with the SparkContext
instance and three Java
Class objects related to HBase:
- One for the
InputFormat
,TableInputFormat
, as mentioned earlier - One for the key type, which on a table scan is always ImmutableBytesWritable
- One for the value type, which on a table scan is always Result
Then we call our anonymous function with the location of hbase-site.xml
and the name of the table we want to read, table-with-data
,
to provide Spark with the necessary HBase configuration to construct the RDD
. Once we have the RDD
, we can perform all
the usual operations on HBase we usually see with the more conventional usage of RDD
s in Spark.
The code example does feature some HBase-specific operations though. When the RDD is constructed, it loads all the
data in table-with-data
as (ImmutableBytesWritable
, Result
) tuples—key-value pairs as you would expect. For each
one of these pairs, we grab the second item in the tuple (the Result
) and
get the column
from it marked with a specific column family and column qualifier.
As advocated by big data overlord Nathan Marz, HBase has no notion of deletes; new values are just appended. Consequently, each column is really a collection, and after converting the Java collection returned by the HBase API into a Scala collection, the last map call extracts the latest—and therefore the “right”—KeyValue. We now have a collection of all the current data in each row.
If that is unclear, let’s summarize the mapping operations this way:
- Load an
RDD
of (ImmutableBytesWritable
,Result
) tuples from the table - Transform previous into an
RDD
of Result’s - Transform previous into an
RDD
of collections ofKeyValue
s (where the latest one is what we want). Collection of collections? Oh no! - Transform previous into an RDD of
byte[]
(binary representations of the current data)
So using Spark, we can read the contents of an entire HBase table and perform one transformation after another—maybe along with some filters, aggregations, and other operations—to do ALL THE ANALYTICS.
Astute or experienced readers might have noticed that getColumn
call in the second map
operation is actually
deprecated in favor of getColumnCells
,
which returns a collection of Cells ordered by
timestamp. That’s the superior approach. I just used getColumn
as an excuse to demonstrate Spark’s ability to apply
anonymous functions within RDD
operations in the next map
call.
The next time some analytics are in order, don’t bother with old-school MapReduce. The higher level of abstraction over anything with an
InputFormat
, including HBase, makes Spark a great choice for Hadoop analytics.