如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

Posted

技术标签:

【中文标题】如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?【英文标题】:How to use foreach or foreachBatch in PySpark to write to database? 【发布时间】:2020-03-05 02:11:02 【问题描述】:

我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 执行 Spark Structured Streaming (Spark 2.4.x)。

我想使用流式 Spark 数据帧,而不是静态或 Pandas 数据帧。

似乎必须使用foreachforeachBatch,因为根据https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks,流数据帧没有可能的数据库接收器。

这是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = "user":"writer", "password":"1234"
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

我收到一个错误:

属性错误:写入

为什么?

【问题讨论】:

【参考方案1】:

tl;drforeach 替换为 foreachBatch


引用official documentation:

foreachforeachBatch 操作允许您在流式查询的输出上应用任意操作和编写逻辑。它们的用例略有不同——foreach 允许在每一行上自定义写入逻辑,foreachBatch 允许在每个微批处理的输出上进行任意操作和自定义逻辑。

换句话说,您的writeStream.foreach(process_row) 作用于没有write.jdbc 可用的单行(数据),因此出现错误。

将行视为一段数据,您可以使用任何 API 将其保存在任何位置。

如果您确实需要 Spark 的支持(并且确实使用 write.jdbc),您应该实际使用 foreachBatch

foreach 允许在每一行上自定义写入逻辑,foreachBatch 允许对每个微批处理的输出进行任意操作和自定义逻辑。

【讨论】:

foreachforeachBatch 是否会降低应用程序的速度? 在当前批次完成之前,不会再启动(微)批次。所以,请尽快输入foreachforeachBatch【参考方案2】:

在 Jacek 的支持下,我可以修复我的示例:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

您还必须将 epoch_id 放入函数参数中。否则,您会在 spark 日志文件中看到 jupyter notebook 中未显示的错误。

【讨论】:

以上是关于如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark dataframe foreach 填充列表

pyspark:rdd.foreach(print)报错NameError

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

如何在使用转义或引号在pyspark中的文件中写入数据帧时获得完全匹配? [复制]

从 foreach 内部调用 Pyspark 保存不起作用

计算每个 pyspark RDD 分区中的元素数