使用 SparkSQL 从 HBase 获取所有记录
Posted
技术标签:
【中文标题】使用 SparkSQL 从 HBase 获取所有记录【英文标题】:Fetch all records from HBase using SparkSQL 【发布时间】:2015-07-06 13:30:03 【问题描述】:我正在尝试从 Hbase 表中读取所有记录。下面是代码sn-p。
SparkContext sparkContext = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.master", "localhost:60000");
hbaseConf.setInt("timeout", 120000);
hbaseConf.set("hbase.zookeeper.quorum", "localhost");
hbaseConf.set("zookeeper.znode.parent", "/hbase-unsecure");
hbaseConf.set(TableInputFormat.INPUT_TABLE, "Test");
DataFrame df = sqlContext.createDataFrame(sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class), TestBean.class);
df.registerTempTable("TempTest");
df.show();
在df.show()
,我遇到了错误
java.lang.IllegalArgumentException: object is not an instance of declaring class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
任何指针,为什么我会遇到这个问题。
【问题讨论】:
堆栈跟踪会有所帮助,但我怀疑您的库版本不匹配。 【参考方案1】:您正在尝试从由对组成的 RDD 创建一个 DataFrame:
org.apache.hadoop.hbase.io.ImmutableBytesWritable
org.apache.hadoop.hbase.client.Result
您需要阅读您的 hBaseRDD:
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]);
然后将 (ImmutableBytesWritable, Result) 元组转换为 Result 的 RDD:
val resultRDD = hBaseRDD.map(tuple => tuple._2)
然后转换成 RDD 的 Rows 可以转换成 DataFrame。
作为示例,我们假设您有一个 HBase 表,其键包含两个值“value1_value2”,您可以使用以下方法解析键(由“_”分隔):
val keyValueRDD = resultRDD.map(result => (Bytes.toString(result.getRow()).split("_")(0), Bytes.toString(result.getRow()).split("_")(1), Bytes.toFloat(result.value())))
现在您可以使用“_”分隔键中的值创建一个数据框:
import sqlContext.implicits._
val df = keyValueRDD.toDF("value1", "value2");
df.registerTempTable("Table")
sqlContext.sql("SELECT * FROM Table Limit 5").show()
为了将您的 HBase 表完全映射到您需要的 DataFrame:
创建案例类:(在您的对象之外)
case class TestRow(rowkey: String, value1: String, value2: String, value3: Float, value4: Float)
将列族定义为字节:
final val cfTest = "te"
final val cfTestBytes = Bytes.toBytes(cfTest)
解析结果:
object TestRow
def parseTestRow(result: Result): TestRow =
val rowkey = Bytes.toString(result.getRow())
val p0 = rowkey
val p1 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("currency")))
val p2 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("asat")))
val p3 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_aed")))
val p4 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_usd")))
TestRow(p0, p1, p2, p3, p4)
创建数据框
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]);
val resultRDD = hBaseRDD.map(tuple => tuple._2)
val testRDD = resultRDD.map(TestRow.parseTestRow)
import sqlContext.implicits._
val testDF = testRDD.toDF()
testDF.registerTempTable("Test")
sqlContext.sql("SELECT count(*) FROM Test").show()
【讨论】:
以上是关于使用 SparkSQL 从 HBase 获取所有记录的主要内容,如果未能解决你的问题,请参考以下文章