Spark 读取 HBase 数据

Posted lyh1997

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 读取 HBase 数据相关的知识,希望对你有一定的参考价值。

1、pom.xml

版本号

<properties>
    <hbase.version>2.2.2</hbase.version>
    <hadoop.version>2.10.0</hadoop.version>
    <spark.version>2.4.2</spark.version>
</properties>

依赖包

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>commons-beanutils</groupId>
                    <artifactId>commons-beanutils-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Spark SQL-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- HBase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-hadoop2-compat</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.scalaj</groupId>
            <artifactId>scalaj-http_2.11</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
</dependencies>

 

2、Java 对 HBase 的增删改查操作

  (1)Junit 方式来实现操作,最开始需要连接 Hbase。

Connection connection = null
Admin admin = null
Table table = null

@Before
public void setUp() throw IOException {
    Configuration conf = new Configuration();
    conf.set("hbase.rootdir", "hdfs://localhost:8020/hbase");
    conf.set("hbase.zookeeper.quorum", "localhost:2181");

    try {
        connection = ConnectionFactory.createConnection(conf);
        admin = connection.getAdmin();
    } catch(IOException e) {
        if(null != admin) {
            e.printStackTrace();
        }
    }
}

 

    首先 HBase 的配置,Configuration 需要配置 hbase.rootdir 和 hbase.zookeeper.quorumhbase.rootdir 是 HBase 地址下的hbase文件夹,hbase.zookeeper.quorum 是 Zookeeper 的地址。HBaseconnection 是通过 ConnectionFactory 来获取的,其中就需要参数 conf

      然后我们还需要拿到 connection 中的 adminadmin 的作用是做一些删除表、创建表的操作。

  

  (2)创建 HBase  

@Test
public void createTable() throws Exception {
    TableName table = TableName.valueOf(tableName);
    
    // 首先判断HBase中是否存在预创建的表
    if(admin.tableExists(table)) {
        System.out.println(tableName + " 已经存在 。。。")
    } else {
        // 设置表名
        HTableDescriptor tableDescriptor = new HTableDescriptor(table);
        // 设置表的列族,列族需要在创建表的时候就设置
        tableDescriptor.addFamily(new HColumnDescriptor("info"));
        tableDescriptor.addFamily(new HColumnDescriptor("address"));

        // 利用 admin 创建表
        admin.createTable(tableDescriptor);

        System.out.println(tableName + " 创建成功。。。");
    }
}

 

  

  (3)向创建表中插入数据

 

@Test
public void testPut() throws Exception {
    table = connection.getTable(TableName.valueOf(tableName));

    // 插入单条诗句
    // 添加 rowKey
    Put put = new Put(Bytes.toBytes("hunter"));
    // 通过Put设置要添加的数据的CF、qualifier、value
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("23"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("1997-12-12"));
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("HaHa"));

    put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));
    put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("LN"));
    put.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("DL"));

    // 将数据put到HBase中去
    table.put(put);

    // 插入多条诗句
    // 创建一个 Put 的 List,用作将多条 put 数据存入其中
    List<Put> puts = new ArrayList<Put>();
    
    // 第一个 Put
    Put put1 = new Put(Bytes.toBytes("jepson"));
    put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("15"));
    put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("xxxx-xx-xx"));
    put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("WaWa"));
    put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));
    put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("BJ"));
    put1.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("BJ"));
      
    // 第二个 Put
    Put put2 = new Put(Bytes.toBytes("okey"));
    put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("19"));
    put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("birthday"), Bytes.toBytes("yy-yy-yy"));
    put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"), Bytes.toBytes("DuoDuo"));
    put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("country"), Bytes.toBytes("CN"));
    put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("provimce"), Bytes.toBytes("SH"));
    put2.addColumn(Bytes.toBytes("address"), Bytes.toBytes("city"), Bytes.toBytes("SH"));

    // 全部 add 到 puts list 中
    puts.add(put1);
    puts.add(put2);

    // 保存到 HBase 中
    table.put(puts);
}    

 

 

  (4)HBase 表中查询数据

 

@Test
//通过RowKey来获取Hbase数据
public void testGetByRowkey() throws IOException {
    // 获取表
    table = connection.getTable(TableName.valueOf(tableName));
    // 
    Get get = new Get("20200107_1003629081".getBytes());
    // 用来设置返回的列。参数:(列族,列名)
    // get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"));

    Result result = table.get(get);
    printResult(result);
    }

    private void printResult(Result result) {
        for(Cell cell: result.rawCells()) {
            System.out.println(Bytes.toString(result.getRow())+ "	"
                    + Bytes.toString(CellUtil.cloneFamily(cell)) + "	"
                    + Bytes.toString(CellUtil.cloneQualifier(cell)) + "	"
                    + Bytes.toString(CellUtil.cloneValue(cell)) + "	"
                    + cell.getTimestamp()
            );
        }
    }

    @Test
    
    public void testScan() throws IOException {
        table = connection.getTable(TableName.valueOf(tableName));

      Scan scan = new Scan();
      scan.addFamily(Bytes.toBytes("info"));
     // scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("company"));
     // Scan scan = new Scan(Bytes.toBytes("jepson")); // >=
     // Scan scan = new Scan(new Get(Bytes.toBytes("jepson"))); 
     // 明确查询
     // Scan scan = new Scan(Bytes.toBytes("jepson"), 
    Bytes.toBytes("okey")); // 包含起始,不包含结束

     // ResultScanner results = table.getScanner(Bytes.toBytes("info"), Bytes.toBytes("company"));
    ResultScanner results =  table.getScanner(scan);

        for(Result result: results) {
            printResult(result);
   System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
        }
    }

 

 

 

 

 

 

 

    

以上是关于Spark 读取 HBase 数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark 读取 HBase 数据

如何使用Spark Streaming读取HBase的数据并写入到HDFS

spark 从 hbase 读取数据,worker 是不是需要从远程驱动程序中获取分区数据?

Spark读取Hbase数据的几种方法

spark读取hbase数据

spark读取hbase数据