Spark读取Hbase中的数据

Posted 陈小龙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取Hbase中的数据相关的知识,希望对你有一定的参考价值。

大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:


[python] view plain copy

  1. JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));  

Scala版本如下:

[python] view plain copy

  1. val myRDD= sc.parallelize(List(1,2,3))  

这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:

[python] view plain copy

  1. import org.apache.spark.SparkConf;  

  2. import org.apache.spark.api.java.JavaRDD;  

  3. import org.apache.spark.api.java.JavaSparkContext;  

  4.   

  5. SparkConf conf = new SparkConf().setAppName("Simple Application");  

  6. JavaSparkContext sc = new JavaSparkContext(conf);  

  7. sc.addFile("wyp.data");  

  8. JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));  

Scala版本如下:

[python] view plain copy

  1. import org.apache.spark.SparkContext  

  2. import org.apache.spark.SparkConf  

  3.   

  4. val conf = new SparkConf().setAppName("Simple Application")  

  5. val sc = new SparkContext(conf)  

  6. sc.addFile("spam.data")  

  7. val inFile = sc.textFile(SparkFiles.get("spam.data"))  

在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:

[python] view plain copy

  1. import org.apache.hadoop.conf.Configuration;  

  2. import org.apache.hadoop.hbase.HBaseConfiguration;  

  3. import org.apache.hadoop.hbase.client.Result;  

  4. import org.apache.hadoop.hbase.client.Scan;  

  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

  6. import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  

  7. import org.apache.hadoop.hbase.protobuf.ProtobufUtil;  

  8. import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;  

  9. import org.apache.hadoop.hbase.util.Base64;  

  10. import org.apache.hadoop.hbase.util.Bytes;  

  11. import org.apache.spark.api.java.JavaPairRDD;  

  12. import org.apache.spark.api.java.JavaSparkContext;  

  13.   

  14. JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",  

  15.                 System.getenv("SPARK_HOME"), System.getenv("JARS"));  

  16.   

  17. Configuration conf = HBaseConfiguration.create();  

  18. Scan scan = new Scan();  

  19. scan.addFamily(Bytes.toBytes("cf"));  

  20. scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));  

  21.   

  22. try {  

  23.         String tableName = "flight_wap_order_log";  

  24.         conf.set(TableInputFormat.INPUT_TABLE, tableName);  

  25.         ClientProtos.Scan proto = ProtobufUtil.toScan(scan);  

  26.         String ScanToString = Base64.encodeBytes(proto.toByteArray());  

  27.         conf.set(TableInputFormat.SCAN, ScanToString);  

  28.   

  29.         JavaPairRDD<ImmutableBytesWritable, Result> myRDD =   

  30.                 sc.newAPIHadoopRDD(conf,  TableInputFormat.class,   

  31.                 ImmutableBytesWritable.class, Result.class);  

  32.   

  33. catch (Exception e) {  

  34.             e.printStackTrace();  

  35. }  

这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:

[python] view plain copy

  1. System.out.println(myRDD.count());  

本段代码需要在pom.xml文件加入以下依赖:

[html] view plain copy

  1. <dependency>  

  2.         <groupId>org.apache.spark</groupId>  

  3.         <artifactId>spark-core_2.10</artifactId>  

  4.         <version>0.9.1</version>  

  5. </dependency>  

  6.   

  7. <dependency>  

  8.         <groupId>org.apache.hbase</groupId>  

  9.         <artifactId>hbase</artifactId>  

  10.         <version>0.98.2-hadoop2</version>  

  11. </dependency>  

  12.   

  13. <dependency>  

  14.         <groupId>org.apache.hbase</groupId>  

  15.         <artifactId>hbase-client</artifactId>  

  16.         <version>0.98.2-hadoop2</version>  

  17. </dependency>  

  18.   

  19. <dependency>  

  20.         <groupId>org.apache.hbase</groupId>  

  21.         <artifactId>hbase-common</artifactId>  

  22.         <version>0.98.2-hadoop2</version>  

  23. </dependency>  

  24.   

  25. <dependency>  

  26.         <groupId>org.apache.hbase</groupId>  

  27.         <artifactId>hbase-server</artifactId>  

  28.         <version>0.98.2-hadoop2</version>  

  29. </dependency>  

Scala版如下:

[python] view plain copy

  1. import org.apache.spark._  

  2. import org.apache.spark.rdd.NewHadoopRDD  

  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  

  4. import org.apache.hadoop.hbase.client.HBaseAdmin  

  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  

  6.   

  7. object HBaseTest {  

  8.   def main(args: Array[String]) {  

  9.     val sc = new SparkContext(args(0), "HBaseTest",  

  10.       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))  

  11.   

  12.     val conf = HBaseConfiguration.create()  

  13.     conf.set(TableInputFormat.INPUT_TABLE, args(1))  

  14.   

  15.     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],   

  16.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  

  17.       classOf[org.apache.hadoop.hbase.client.Result])  

  18.   

  19.     hBaseRDD.count()  

  20.   

  21.     System.exit(0)  

  22.   }  

  23. }  

我们需要在加入如下依赖:

[python] view plain copy

  1. libraryDependencies ++= Seq(  

  2.         "org.apache.spark" % "spark-core_2.10" % "0.9.1",  

  3.         "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",  

  4.         "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",  

  5.         "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",  

  6.         "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"  

  7. )  

在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件。


以上是关于Spark读取Hbase中的数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark读取Hbase中的数据

spark将数据写入hbase以及从hbase读取数据

Spark 读取 HBase 数据

Spark连接HBase

SHC:使用 Spark SQL 高效地读写 HBase

spark 从 hbase 读取数据,worker 是不是需要从远程驱动程序中获取分区数据?