使用 SparkSQL 从 HBase 获取所有记录

Posted

技术标签:

【中文标题】使用 SparkSQL 从 HBase 获取所有记录【英文标题】:Fetch all records from HBase using SparkSQL 【发布时间】:2015-07-06 13:30:03 【问题描述】:

我正在尝试从 Hbase 表中读取所有记录。下面是代码sn-p。

    SparkContext sparkContext = new SparkContext(conf);

    SQLContext sqlContext = new SQLContext(sparkContext);

    Configuration hbaseConf = HBaseConfiguration.create();

    hbaseConf.set("hbase.master", "localhost:60000");
    hbaseConf.setInt("timeout", 120000);
    hbaseConf.set("hbase.zookeeper.quorum", "localhost");
    hbaseConf.set("zookeeper.znode.parent", "/hbase-unsecure");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Test");

    DataFrame df = sqlContext.createDataFrame(sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class), TestBean.class);

    df.registerTempTable("TempTest");
    df.show();

df.show(),我遇到了错误

java.lang.IllegalArgumentException: object is not an instance of declaring class at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

任何指针,为什么我会遇到这个问题。

【问题讨论】:

堆栈跟踪会有所帮助,但我怀疑您的库版本不匹配。 【参考方案1】:

您正在尝试从由对组成的 RDD 创建一个 DataFrame:

org.apache.hadoop.hbase.io.ImmutableBytesWritable     
org.apache.hadoop.hbase.client.Result

您需要阅读您的 hBaseRDD:

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result]);

然后将 (ImmutableBytesWritable, Result) 元组转换为 Result 的 RDD:

val resultRDD = hBaseRDD.map(tuple => tuple._2) 

然后转换成 RDD 的 Rows 可以转换成 DataFrame。

作为示例,我们假设您有一个 HBase 表,其键包含两个值“value1_value2”,您可以使用以下方法解析键(由“_”分隔):

val keyValueRDD = resultRDD.map(result =>     (Bytes.toString(result.getRow()).split("_")(0),   Bytes.toString(result.getRow()).split("_")(1), Bytes.toFloat(result.value())))

现在您可以使用“_”分隔键中的值创建一个数据框:

   import sqlContext.implicits._
   val df = keyValueRDD.toDF("value1", "value2");
   df.registerTempTable("Table")
   sqlContext.sql("SELECT * FROM Table Limit 5").show()

为了将您的 HBase 表完全映射到您需要的 DataFrame:

    创建案例类:(在您的对象之外)

     case class TestRow(rowkey: String, value1: String, value2: String, value3: Float, value4: Float)
    

    将列族定义为字节

    final val cfTest = "te"
    final val cfTestBytes = Bytes.toBytes(cfTest)
    

    解析结果

      object TestRow 
        def parseTestRow(result: Result): TestRow = 
          val rowkey = Bytes.toString(result.getRow())
    
          val p0 = rowkey
          val p1 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("currency")))
          val p2 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("asat")))
          val p3 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_aed")))
          val p4 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_usd")))
          TestRow(p0, p1, p2, p3, p4)
        
      
    

    创建数据框

    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result]);  
    
    val resultRDD = hBaseRDD.map(tuple => tuple._2)
    val testRDD = resultRDD.map(TestRow.parseTestRow)
    import sqlContext.implicits._
    val testDF = testRDD.toDF()
    testDF.registerTempTable("Test")
    sqlContext.sql("SELECT count(*) FROM Test").show()
    

【讨论】:

以上是关于使用 SparkSQL 从 HBase 获取所有记录的主要内容,如果未能解决你的问题,请参考以下文章

sparkSQL自定义数据源

sparkSQL自定义数据源查询HBase数据

SparkSQL+Hive+Hbase+HbaseIntegration 不起作用

SparkSQL同步Hbase数据到Hive表

我们可以从 HBase 表中获取所有列名吗?

关于自定义sparkSQL数据源(Hbase)操作中遇到的坑