来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试

Posted

技术标签:

【中文标题】来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试【英文标题】:pySpark Structured Streaming from Kafka does not output to console for debugging 【发布时间】:2020-04-20 04:21:09 【问题描述】:

下面是我的代码。我尝试了许多不同的选择变体,但应用程序运行,但没有显示每秒写入的消息。我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息。 Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics


KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"

if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) \
        .add("reading_time", IntegerType()) \
        .add("tractionForce", IntegerType()) \
        .add("vel_latitudinal", IntegerType()) \
        .add("vel_longitudinal", IntegerType()) \
        .add("velocity", IntegerType()) \
        .add("x_pos", IntegerType()) \
        .add("y_pos", IntegerType()) \
        .add("yawrate", IntegerType())


 # Construct a streaming DataFrame that reads from testtopic
    trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


#(from_json(col("value").cast("string"),schema))

    #Q1 =  trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
    #Q2 =  trans_det_d.select("parsed_value*", "timestamp")


    query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()

【问题讨论】:

已解决。我不得不启用 DEBUG,并注意到存在连接问题,因此将上述端口更新为 9092 解决了该问题。我使用了与 Spark Streaming 相同的端口,就像我在本练习中所做的那样,即结构化流。 【参考方案1】:

kafka.bootstrap.serversKafka 代理 地址(默认端口 9092),而不是 Zookeeper(端口 2181)

还要注意您的起始偏移量是最新的,因此您必须在启动流应用程序后生成数据。

如果要查看现有主题数据,请使用最早的偏移量。

【讨论】:

嗨 Cricket,是的,在更新所有这些设置后,我开始 writeStream 确认我能够看到值“json 编码中的 kafka 主题消息。但是,当我为 json 列创建查询时,我正如您在上面看到的那样,我提供了架构,现在我只得到空值。例如,我能够看到速度读数执行对速度列的查询......你有什么想法吗? 您可能希望接受此答案并发布一个包含这些问题和新代码的新问题。但是,如果您得到空值,通常您的架构是错误的。您没有在问题中显示您的消息,所以我不确定解决方案是什么

以上是关于来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试的主要内容,如果未能解决你的问题,请参考以下文章

如何通过在 PySpark 中选择 struct-array 列的一个字段来提取数组列

pyspark:使用从 kafka 检索到的数据训练 kmeans 流式传输

kafka 到 pyspark 结构化流,将 json 解析为数据帧

在 Pyspark 中爆炸不是数组的结构列

如何在pyspark上更改JSON结构?

如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框