如何将 Pyspark 数据帧存储到 HBase

Posted

技术标签:

【中文标题】如何将 Pyspark 数据帧存储到 HBase【英文标题】:how to store Pyspark dataframe into HBase 【发布时间】:2018-11-29 06:59:10 【问题描述】:

我有一个将 Pyspark 流数据转换为数据帧的代码。我需要将此数据帧存储到 Hbase 中。帮我另外写代码。

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession

def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
    globals()['sparkSessionSingletonInstance'] = SparkSession\
        .builder\
        .config(conf=sparkConf)\
        .getOrCreate()
return globals()['sparkSessionSingletonInstance']


if __name__ == "__main__":
if len(sys.argv) != 3:
    print("Usage: sql_network_wordcount.py <hostname> <port> ", 
file=sys.stderr)
    exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))

def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        words = rdd.map(lambda line :line.split(" ")).collect()
        spark = getSparkSessionInstance(rdd.context.getConf())
        linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])

        linesDataFrame.show()
except :
pass

lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()

【问题讨论】:

【参考方案1】:

您可以使用 Spark-Hbase 连接器从 Spark 访问 HBase。它在低级 RDDDataframes 中都提供了 API。

连接器要求您为 HBase 表定义 Schema。下面是为名称为table1、行键为键和多个列 (col1-col8) 的 HBase 表定义的 Schema 示例。请注意,rowkey 还必须详细定义为具有特定 cf(行键)的列 (col0)。

def catalog = '
        "table":"namespace":"default", "name":"table1",\
        "rowkey":"key",\
        "columns":\
          "col0":"cf":"rowkey", "col":"key", "type":"string",\
          "col1":"cf":"cf1", "col":"col1", "type":"boolean",\
          "col2":"cf":"cf1", "col":"col2", "type":"double",\
          "col3":"cf":"cf1", "col":"col3", "type":"float",\
          "col4":"cf":"cf1", "col":"col4", "type":"int",\
          "col5":"cf":"cf2", "col":"col5", "type":"bigint",\
          "col6":"cf":"cf2", "col":"col6", "type":"smallint",\
          "col7":"cf":"cf2", "col":"col7", "type":"string",\
          "col8":"cf":"cf2", "col":"col8", "type":"tinyint"\
        \
      '

根据数据框的架构定义目录后,您可以使用以下方法将数据框写入 HBase:

df.write\
.options(catalog=catalog)\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.save()

从 HBase 读取数据:

df = spark.\
read.\
format("org.apache.spark.sql.execution.datasources.hbase").\
option(catalog=catalog).\
load()

在提交 spark 应用程序时,您需要包含如下 Spark-HBase 连接器包。

pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/

【讨论】:

感谢您的帮助。我需要澄清很多事情。什么是命名空间?以及我需要在“def 目录”上定义我的架构的地方 @ariunariun 命名空间是 HBase 表命名空间,默认为 'default' 。和目录需要在 PySpark 应用程序本身中定义。 你能通过私信帮助我吗?我怎么能和你联系?请帮帮我。 示例请参考github.com/hortonworks-spark/shc,如果您仍然遇到问题,请粘贴错误。谢谢!

以上是关于如何将 Pyspark 数据帧存储到 HBase的主要内容,如果未能解决你的问题,请参考以下文章

将数据帧从 pandas 转换为 pyspark 到 Foundry 的数据类型

Pyspark - 如何将多个数据帧的列连接成一个数据帧的列

Pyspark:将数据帧作为 JSON 存储在 MySQL 表列中

如何使用 pyspark 管理跨集群的数据帧的物理数据放置?

如何在限制行数的同时拆分 Pyspark 数据帧?

PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧