来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试
Posted
技术标签:
【中文标题】来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试【英文标题】:pySpark Structured Streaming from Kafka does not output to console for debugging 【发布时间】:2020-04-20 04:21:09 【问题描述】:下面是我的代码。我尝试了许多不同的选择变体,但应用程序运行,但没有显示每秒写入的消息。我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息。 Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics
KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"
if __name__ == "__main__":
print("NXB PySpark Structured Streaming with Kafka Demo Started")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.master("local[*]") \
.config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
schema = StructType() \
.add("WheelAngle", IntegerType()) \
.add("acceleration", IntegerType()) \
.add("heading", IntegerType()) \
.add("reading_time", IntegerType()) \
.add("tractionForce", IntegerType()) \
.add("vel_latitudinal", IntegerType()) \
.add("vel_longitudinal", IntegerType()) \
.add("velocity", IntegerType()) \
.add("x_pos", IntegerType()) \
.add("y_pos", IntegerType()) \
.add("yawrate", IntegerType())
# Construct a streaming DataFrame that reads from testtopic
trans_det_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_SERVER) \
.option("subscribe", KAFKA_TOPIC) \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
#(from_json(col("value").cast("string"),schema))
#Q1 = trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
#Q2 = trans_det_d.select("parsed_value*", "timestamp")
query = trans_det_df.writeStream \
.format("console") \
.option("truncate","false") \
.start() \
.awaitTermination()
【问题讨论】:
已解决。我不得不启用 DEBUG,并注意到存在连接问题,因此将上述端口更新为 9092 解决了该问题。我使用了与 Spark Streaming 相同的端口,就像我在本练习中所做的那样,即结构化流。 【参考方案1】:kafka.bootstrap.servers
是 Kafka 代理 地址(默认端口 9092),而不是 Zookeeper(端口 2181)
还要注意您的起始偏移量是最新的,因此您必须在启动流应用程序后生成数据。
如果要查看现有主题数据,请使用最早的偏移量。
【讨论】:
嗨 Cricket,是的,在更新所有这些设置后,我开始 writeStream 确认我能够看到值“json 编码中的 kafka 主题消息。但是,当我为 json 列创建查询时,我正如您在上面看到的那样,我提供了架构,现在我只得到空值。例如,我能够看到速度读数执行对速度列的查询......你有什么想法吗? 您可能希望接受此答案并发布一个包含这些问题和新代码的新问题。但是,如果您得到空值,通常您的架构是错误的。您没有在问题中显示您的消息,所以我不确定解决方案是什么以上是关于来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试的主要内容,如果未能解决你的问题,请参考以下文章
如何通过在 PySpark 中选择 struct-array 列的一个字段来提取数组列
pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输