无法从 spark 读取 Azure Eventhub 主题

Posted

技术标签:

【中文标题】无法从 spark 读取 Azure Eventhub 主题【英文标题】:Unable to read Azure Eventhub topics from spark 【发布时间】:2021-04-25 15:09:53 【问题描述】:

环境细节

    火花版本:3.x Python 3.8 版和 java 8 版 azure-eventhubs-spark_2.12-2.3.17.jar
import json
from pyspark.sql import SparkSession


#the below command getOrCreate() uses the SparkSession shared across the jobs instead of using one SparkSession per job.
spark = SparkSession.builder.appName('ntorq_eventhub_load').getOrCreate()

#ntorq adls checkpoint location.
ntorq_connection_string = "connection-string"

ehConf = 
ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)
# ehConf['eventhubs.connectionString'] = ntorq_connection_string
ehConf['eventhubs.consumerGroup'] = "$default"

OFFSET_START = "-1"   # the beginning
OFFSET_END = "@latest"

# Create the positions
startingEventPosition = 
  "offset": OFFSET_START ,
  "seqNo": -1,            #not in use
  "enqueuedTime": None,   #not in use
  "isInclusive": True


endingEventPosition = 
  "offset": OFFSET_END,           #not in use
  "seqNo": -1,              #not in use
  "enqueuedTime": None,
  "isInclusive": True


# Put the positions into the Event Hub config dictionary
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)


df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load() \
  .selectExpr("cast(body as string) as body_str")

df.writeStream \
    .format("console") \
    .start()

错误

21/04/25 20:17:53 WARN Utils: Your hostname,resolves to a loopback address: 127.0.0.1; using 192.168.1.202 instead (on interface en0)
21/04/25 20:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/04/25 20:17:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/Users/PycharmProjects/pythonProject/test.py", line 12, in <module>
    ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)
TypeError: 'JavaPackage' object is not callable

代码在 databricks 环境中运行良好,但无法使用来自 eventthub 的所有消息我尝试在每次运行之前清除默认检查点文件夹,但仍然面临问题,所以想在本地系统上尝试。 在尝试面临 JavaPackage 问题的本地环境时。 感谢任何帮助。 谢谢你

【问题讨论】:

本地环境如何运行 我尝试使用本地环境中的 Azure Eventhub 主题。我在 pycharm Ide 中运行 spark 代码。 但要使用它,您需要有 EventHubs 库等。您是如何添加它的? 【参考方案1】:

创建会话时需要添加EventHubs包:

park = SparkSession.builder.appName('ntorq_eventhub_load')\
  .config("spark.jars.packages", "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18")\
  .getOrCreate()

【讨论】:

嗨,亚历克斯,感谢您的评论。我已手动将 azure-eventhub jar 添加到 spark 类路径。请随时发表评论。 这真的很奇怪 - 加密的代码是正确的。 嗨,我找到了解决方案。我尝试设置所有 spark 环境变量,然后它就像一个魅力。 你使用了哪些具体的环境变量? SPARK_HOME、PYSPARK_PYTHON、PYSPARK_DRIVER_PYTHON 环境变量在 pycharm 中设置,然后它就像一个魅力。

以上是关于无法从 spark 读取 Azure Eventhub 主题的主要内容,如果未能解决你的问题,请参考以下文章

Azure databricks - 无法使用来自 datalake 存储 gen2 服务的 spark 作业读取 .csv 文件

如何从 ADLS 将自定义数据框写入 eventthub

Azure Databricks Spark XML 库 - 尝试读取 xml 文件

我无法从数据块中的 spark 数据帧创建加载数据到 Azure Synapse (DWH)

Azure Datalake Store Gen2 使用 scala spark 库从 Databricks 读取文件

无法使用 jdbc 和 spark 连接器从 databricks 集群连接到 Azure 数据库 for MySQL 服务器