将 Pyspark 与 Kafka 连接起来

Posted

技术标签:

【中文标题】将 Pyspark 与 Kafka 连接起来【英文标题】:Connecting Pyspark with Kafka 【发布时间】:2022-01-19 06:45:20 【问题描述】:

我在理解如何连接 Kafka 和 PySpark 时遇到问题。

我在 Windows 10 上安装了 kafka,主题很好地流式传输数据。 我已经安装了运行正常的 pyspark——我能够毫无问题地创建测试 DataFrame。

但是当我尝试连接到 Kafka 流时,它给了我错误:

AnalysisException:找不到数据源:kafka。请部署 根据“结构化流式传输”的部署部分应用程序- Kafka 集成指南”。

Spark 文档并没有真正的帮助 - 它说: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 版本 = 3.2.0 ...

对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。

然后当您转到部署部分时,它会说:

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 spark-sql-kafka-0-10_2.12 及其依赖可以直接使用 --packages 添加到 spark-submit 中,例如, ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...

我正在开发应用程序,我不想部署它。 如果我正在开发 pyspark 应用程序,在哪里以及如何添加这些依赖项?

尝试了几个教程最终变得更加困惑。

看到回答说

"您需要将 kafka-clients JAR 添加到您的 --packages".so-answer

更多的步骤可能会有用,因为对于新手来说这是不清楚的。

版本

卡夫卡2.13-2.8.1 火花3.1.2 java 11.0.12

所有环境变量和路径都设置正确。

编辑

我已经加载了:

   os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'

按照建议,但仍然出现相同的错误。 我已经三次检查了 kafka、scala 和 spark 版本并尝试了各种组合,但没有成功,我仍然遇到同样的错误:

AnalysisException:找不到数据源:kafka。请部署 按照“Structured Streaming-Kafka Integration Guide”的部署部分应用程序。

编辑 2

我安装了最新的 Spark 3.2.0 和 Hadoop 3.3.1 和 kafka 版本 kafka_2.12-2.8.1。更改了所有环境变量,测试了 Spark 和 Kafka - 工作正常。

我的环境变量现在看起来像这样:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'

仍然没有运气,我得到同样的错误:(

【问题讨论】:

【参考方案1】:

Spark 文档并没有真正的帮助 - 它说 ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

是的,这是正确的......但对于最新版本的 Spark

版本:

火花3.1.2

你试过查看version specific docs吗?

换句话说,您需要匹配的spark-sql-kafka 3.1.2 版本。

bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

或者在 Python 中,

import os

spark_version = '3.1.2'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:'.format(spark_version)

# init spark here

需要在上面添加这个库及其依赖

正如您在我之前的回答中找到的,还使用逗号分隔列表附加 kafka-clients 包。

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1


我正在开发应用程序,我不想部署它。

“部署”是 Spark 术语。本地运行还是“部署

【讨论】:

非常感谢您!我还不清楚一件事 - 我如何“附加 kafka-clients 包”?那个包裹在哪里? 另外spark.apache.org/docs/latest/… 说:“不要手动添加对 org.apache.kafka 工件的依赖项(例如 kafka-clients)。spark-streaming-kafka-0-10 工件具有适当的传递依赖项已经,并且不同的版本可能以难以诊断的方式不兼容。” 上次我检查过,PySpark 不会提取传递依赖项(因此,您链接的答案中报告的错误),只有在使用 SBT / Maven / Gradle 之类的工具时,它才会这样做。至于“哪里”——Maven Central。您将字符串“附加”到 packages 参数,如图所示 我已经按照您的建议进行了尝试 - 请参阅我的问题的编辑。它仍然无法正常工作:( @user12 是对的,也被other times回答了

以上是关于将 Pyspark 与 Kafka 连接起来的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何将数据框与存储在其他变量中的列名连接起来

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

无法将 Kafka 与 InfluxDB Sink Connector 连接

我可以使用 spark 2.3.0 和 pyspark 从 Kafka 进行流处理吗?