如何将火花流数据帧存储到 Mysql 表。?

Posted

技术标签:

【中文标题】如何将火花流数据帧存储到 Mysql 表。?【英文标题】:How to store spark streaming dataframe to Mysql table.? 【发布时间】:2022-01-23 19:55:57 【问题描述】:

我正在尝试从 Azure 事件中心读取数据并以 Spark 流模式将此数据帧存储到 mysql 表中。

下面是我的 pyspark 代码

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime as dt
from pyspark.sql import DataFrameWriter

try:
    session = SparkSession.builder.master("local").appName("dataingestion").config("")
    spark = session.getOrCreate()
    print("Successfully build spark session : ")
except:
    print("Fail to build spark session : ")
    raise Exception

startOffset = "-1"
startingEventPosition = 
    "offset": startOffset,
    "seqNo": -1,  # not in use
    "enqueuedTime": None,  # not in use
    "isInclusive": True,

endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")

endingEventPosition = 
    "offset": None,  # not in use
    "seqNo": -1,  # not in use
    "enqueuedTime": endTime,
    "isInclusive": True

ehreadConf = 
ehreadConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehreadConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)

connectionString = "eventhub-connection-string"
ehreadConf['eventhubs.connectionString'] = connectionString

try:
    inputStream = spark.readStream.format("eventhubs").options(**ehreadConf).load()
    print("Successfully connected the event hub : ")
    print("Check streaming is started or not : ", inputStream.isStreaming)
    print("Schema of inputStream : ", inputStream.printSchema())
except Exception:
    print("Fail to connect with Azure event hub : ")
    raise Exception
inputStream = inputStream.withColumn("body", inputStream["body"].cast("string"))


server_name = "jdbc:mysql://localhost:3306"
database_name = "eventhub"
jdbcurl = server_name + "/" + database_name
print('%' * 100)
print(jdbcurl)
table_name = "stream_cdr_data"
username = "user"
password = "data@123"

try:
    print("Trying to connect MySql sql : ")
    sparkDf.writeStream \
        .format("jdbc") \
        .outputMode("append") \
        .option("url", jdbcurl) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("checkpointLocation", "./checkpoint") \
        .start().awaitTermination(True)
    print("Connection to the MySql is successful : ")
except ValueError as error:
    print("Connector write failed", error)

spark.sparkContext.stop()
spark.stop()

但我无法将此 spark 数据帧存储到 mysql 表中。 我收到一个错误,比如数据源 jdbc 不支持火花流。

py4j.protocol.Py4JJavaError:调用 o68.start 时出错。 : java.lang.UnsupportedOperationException: 数据源jdbc不支持流式写入>

【问题讨论】:

【参考方案1】:

与错误说明一样,Spark Yet 中不支持从流写入 JDBC 接收器(我猜)。 您可以在 streamDf 中执行 forEachBatch 并执行 write.jdbc 操作,而不是将流直接写入 mysq。

server_name = "jdbc:mysql://localhost:3306"
database_name = "eventhub"
jdbcurl = server_name + "/" + database_name
table_name = "stream_cdr_data"
db_properties = "user":""user"", "password":"data@123"

def write_to_mysql(df, epoch_id):
    dfwriter = df.write.mode("append") 
    dfwriter.jdbc(url=jdbcurl, table=table_name, properties=db_properties) # if this is not working use below
    #df.write.jdbc(url=jdbcurl, table=table_name, properties=db_properties, mode="append")
    pass

query = sparkDf.writeStream.outputMode("append").foreachBatch(write_to_mysql).start()

query.awaitTermination()

【讨论】:

Py4JJavaError: An error occurred while calling o425.awaitTermination. : org.apache.spark.sql.streaming.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o466.jdbc. : org.apache.spark.sql.AnalysisException: Table or view 'stream_job_data' already exists. SaveMode: ErrorIfExists 嗨 Badusha 谢谢你的回答,我试过这种方式,但它只插入第一个微批次记录,意味着假设我在事件中心有 50000 条记录,那么它只存储第一个微批次 1000记录到mysql。然后我会得到上述错误。 @akshay,您使用的是append 模式。请看上面代码的变化 让我看看哪种解决方案适合你

以上是关于如何将火花流数据帧存储到 Mysql 表。?的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花流中刷新加载的数据帧内容?

如何直接在 Azure Blob 存储上存储火花作业(结构化流)的检查点?

如何将 Pyspark 数据帧存储到 HBase

如何使用火花流处理实时流数据/日志?

火花流到pyspark json文件中的数据帧

什么是数据块火花增量表?他们是不是还存储特定会话的数据以及如何查看这些增量表及其结构