Spark 读取 HBase 数据
Posted lyh1997
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 读取 HBase 数据相关的知识,希望对你有一定的参考价值。
1、pom.xml
版本号
<properties> <hbase.version>2.2.2</hbase.version> <hadoop.version>2.10.0</hadoop.version> <spark.version>2.4.2</spark.version> </properties>
依赖包
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils-core</artifactId> </exclusion> </exclusions> </dependency> <!-- Spark SQL--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency> <dependency> <groupId>org.scalaj</groupId> <artifactId>scalaj-http_2.11</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies>
2、Java 对 HBase 的增删改查操作
(1)Junit 方式来实现操作,最开始需要连接 Hbase。
Connection connection = null Admin admin = null Table table = null @Before public void setUp() throw IOException { Configuration conf = new Configuration(); conf.set("hbase.rootdir", "hdfs://localhost:8020/hbase"); conf.set("hbase.zookeeper.quorum", "localhost:2181"); try { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } catch(IOException e) { if(null != admin) { e.printStackTrace(); } } }
首先 HBase 的配置,Configuration 需要配置 hbase.rootdir 和 hbase.zookeeper.quorum。hbase.rootdir 是 HBase 地址下的hbase文件夹,hbase.zookeeper.quorum 是 Zookeeper 的地址。HBase 的 connection 是通过 ConnectionFactory 来获取的,其中就需要参数 conf。
然后我们还需要拿到 connection 中的 admin,admin 的作用是做一些删除表、创建表的操作。
(2)创建 HBase 表
@Test public void createTable() throws Exception { TableName table = TableName.valueOf(tableName); // 首先判断HBase中是否存在预创建的表 if(admin.tableExists(table)) { System.out.println(tableName + " 已经存在 。。。") } else { // 设置表名 HTableDescriptor tableDescriptor = new HTableDescriptor(table); // 设置表的列族,列族需要在创建表的时候就设置 tableDescriptor.addFamily(new HColumnDescriptor("info")); tableDescriptor.addFamily(new HColumnDescriptor("address")); // 利用 admin 创建表 admin.createTable(tableDescriptor); System.out.println(tableName + " 创建成功。。。"); } }
(3)向创建表中插入数据
@Test public void testPut() throws Exception { table = connection.getTable(TableName.valueOf(tableName)); // 插入单条诗句 // 添加 rowKey Put put = new Put(Bytes.toBytes("hunter")); // 通过Put设置要添加的数据的CF、qualifier、value put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("23")); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("1997-12-12")); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("HaHa")); put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN")); put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("LN")); put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("DL")); // 将数据put到HBase中去 table.put(put); // 插入多条诗句 // 创建一个 Put 的 List,用作将多条 put 数据存入其中 List<Put> puts = new ArrayList<Put>(); // 第一个 Put Put put1 = new Put(Bytes.toBytes("jepson")); put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("15")); put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("xxxx-xx-xx")); put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("WaWa")); put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN")); put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("BJ")); put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("BJ")); // 第二个 Put Put put2 = new Put(Bytes.toBytes("okey")); put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("19")); put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("yy-yy-yy")); put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("DuoDuo")); put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN")); put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("SH")); put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("SH")); // 全部 add 到 puts list 中 puts.add(put1); puts.add(put2); // 保存到 HBase 中 table.put(puts); }
(4)HBase 表中查询数据
@Test //通过RowKey来获取Hbase数据 public void testGetByRowkey() throws IOException { // 获取表 table = connection.getTable(TableName.valueOf(tableName)); // Get get = new Get("20200107_1003629081".getBytes()); // 用来设置返回的列。参数:(列族,列名) // get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age")); Result result = table.get(get); printResult(result); } private void printResult(Result result) { for(Cell cell: result.rawCells()) { System.out.println(Bytes.toString(result.getRow())+ " " + Bytes.toString(CellUtil.cloneFamily(cell)) + " " + Bytes.toString(CellUtil.cloneQualifier(cell)) + " " + Bytes.toString(CellUtil.cloneValue(cell)) + " " + cell.getTimestamp() ); } } @Test public void testScan() throws IOException { table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("info")); // scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company")); // Scan scan = new Scan(Bytes.toBytes("jepson")); // >= // Scan scan = new Scan(new Get(Bytes.toBytes("jepson"))); // 明确查询 // Scan scan = new Scan(Bytes.toBytes("jepson"), Bytes.toBytes("okey")); // 包含起始,不包含结束 // ResultScanner results = table.getScanner(Bytes.toBytes("info"), Bytes.toBytes("company")); ResultScanner results = table.getScanner(scan); for(Result result: results) { printResult(result); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } }
以上是关于Spark 读取 HBase 数据的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Spark Streaming读取HBase的数据并写入到HDFS