大数据(5e)Spark之Scala读写HBase之Phoenix表

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(5e)Spark之Scala读写HBase之Phoenix表相关的知识,希望对你有一定的参考价值。

Phoenix建表

使用Phoenix对HBase建表,用于统计每小时用户数

create table uv_hour (
uid varchar,
ymdh varchar
constraint primary_key primary key (uid,ymdh)
);

依赖

<properties>
    <spark.version>3.0.3</spark.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-spark</artifactId>
        <version>5.0.0-HBase-2.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.glassfish</groupId>
                <artifactId>javax.el</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.HBaseConfiguration // HBase配置
import org.apache.phoenix.spark._ // 隐式转换支持:saveToPhoenix
// 创建SparkContext对象
val conf = new SparkConf().setAppName("A0").setMaster("local[2]")
val sc = new SparkContext(conf)
// 创建RDD
val rdd = sc.makeRDD(Seq(
  ("u1", "2021-08-08 09"),
  ("u2", "2021-08-08 09"),
  ("u1", "2021-08-08 10"),
))
// 存HBase之Phoenix表
rdd.saveToPhoenix(
  "UV_HOUR", // 表名
  Seq("UID", "YMDH"), // 字段名
  HBaseConfiguration.create(), // HBase配置
  Some("hadoop102:2181") // ZooKeeper的URL
)

HBaseConfiguration.create

使用create方法创建HBase配置

public staticConfiguration create() {
  Configuration conf = new Configuration();
  // In case HBaseConfiguration is loaded from a different classloader than
  // Configuration, conf needs to be set with appropriate class loader to resolve
  // HBase resources.
  conf.setClassLoader(HBaseConfiguration.class.getClassLoader());
  return addHbaseResources(conf);
}

自动获取hbase-default.xmlhbase-site.xml配置文件信息

public static Configuration addHbaseResources(Configuration conf) {
  conf.addResource("hbase-default.xml");
  conf.addResource("hbase-site.xml");

  checkDefaultsVersion(conf);
  return conf;
}

sqlline.py

import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark.toSparkSqlContextFunctions
val spark = SparkSession
  .builder()
  .appName("phoenix-test")
  .master("local[2]")
  .getOrCreate()
// 读HBase之Phoenix表
spark.sqlContext.phoenixTableAsDataFrame(
  table = "UV_HOUR",
  columns = Seq("UID", "YMDH"),
  predicate = Some("UID='u1'"),
  zkUrl = Some("hadoop102:2181"),
).show()

phoenixTableAsDataFrame

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{DataFrame, SQLContext}

class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable {

  /*
  This will return a Spark DataFrame, with Phoenix types converted Spark SQL catalyst types

  'table' is the corresponding Phoenix table
  'columns' is a sequence of of columns to query
  'predicate' is a set of statements to go after a WHERE clause, e.g. "TID = 123"
  'zkUrl' is an optional Zookeeper URL to use to connect to Phoenix
  'conf' is a Hadoop Configuration object. If zkUrl is not set, the "hbase.zookeeper.quorum"
    property will be used
 */
  def phoenixTableAsDataFrame(table: String, columns: Seq[String],
                               predicate: Option[String] = None,
                               zkUrl: Option[String] = None,
                               tenantId: Option[String] = None,
                               conf: Configuration = new Configuration): DataFrame = {

    // Create the PhoenixRDD and convert it to a DataFrame
    new PhoenixRDD(sqlContext.sparkContext, table, columns, predicate, zkUrl, conf, tenantId = tenantId)
      .toDataFrame(sqlContext)
  }
}

打印结果

以上是关于大数据(5e)Spark之Scala读写HBase之Phoenix表的主要内容,如果未能解决你的问题,请参考以下文章

如何使用scala+spark读写hbase?

大数据培训机构完整版Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari

大数据之Spark

SHC:使用 Spark SQL 高效地读写 HBase

大数据之Hbase:HBase之读写数据流程

大数据学习系列之九---- Hive整合Spark和HBase以及相关测试