在 PySpark 作业上打印 Kafka 调试消息

Posted

技术标签:

【中文标题】在 PySpark 作业上打印 Kafka 调试消息【英文标题】:Printing Kafka debug message on PySpark job 【发布时间】:2019-04-30 04:46:32 【问题描述】:

在运行 PySpark 作业时,有没有办法打印 Kafka 调试消息(我正在考虑类似于 librdkafka 调试消息或 kafkacat -D 选项的日志消息)?

问题是我在 PySpark 上使用以下代码连接到名为 A 的 Kafka 集群,每次有新消息进来时,它都会工作并将内容打印到控制台。但是当我切换到另一个集群时,称为B 并以与集群 A 相同的方式设置,当有新消息进入时,它不会在屏幕上打印任何内容,我可以看到消息在两个集群上都使用 kafkacat 工具处理得很好。

consumer.py

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)

hosts = "host1:9092,host2:9092,host3:9092"
topic = "myTopic"
securityProtocol = "SASL_PLAINTEXT"
saslMechanism = "PLAIN"

try:
  df = sqlc \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", hosts) \
    .option("kafka.security.protocol", securityProtocol) \
    .option("kafka.sasl.mechanism", saslMechanism) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic) \
    .load()

  dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream.outputMode('append') \
    .format("console") \
    .start()

  dss.awaitTermination()
except KeyboardInterrupt:    
  print 'shutting down...'

kafka.jaas

KafkaClient 
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="user1"
   password="sssshhhh"
   serviceName="kafka";
;

shell 命令:

spark-submit \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 \
    --files "kafka.jaas" \
    --driver-java-options "-Djava.security.auth.login.config=kafka.jaas" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas" \
    "./consumer.py"

似乎kafka 集群 B 是可访问的,因为我能够从中获取偏移信息,但它只是没有读取消息。

【问题讨论】:

【参考方案1】:

该问题是由于工作节点连接到 Kafka 集群造成的,工作节点 IP 地址不在 Kafka 集群的防火墙白名单上。上面的代码导致工作节点超时并继续重试连接到 Kafka 集群,直到给出中断信号。

关于错误消息本身,由于工作节点仍在尝试连接到 Kafka 集群,因此没有向 Master 节点生成错误消息,但不时在 Master 控制台上打印出一条消息说它失败了与工作节点通信(或一些消息,如“收集信息”)。

注意:这是我假设在工作节点中发生的(由于管理员权限,我无法登录),但可能有一个日志存储在工作节点上。 (如果有人可以支持或证明其他情况。将不胜感激)

至于 Kafka 调试消息本身,如果出现错误、信息或警告,则默认情况下似乎已经打印到屏幕上,具体取决于记录器级别设置,并且在像这样的一些奇怪实例中,日志消息可能不会直接在屏幕上可见。

【讨论】:

你可以像这样覆盖日志设置***.com/a/55596632/2308683

以上是关于在 PySpark 作业上打印 Kafka 调试消息的主要内容,如果未能解决你的问题,请参考以下文章

来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试

从 Docker 容器将 PySpark 连接到 Kafka

在 YARN 集群上部署 pyspark 作业时出现 FileNotFoundException

将 Pyspark 与 Kafka 连接起来

Dataproc Pyspark 作业仅在一个节点上运行

无法让 pyspark 作业在 hadoop 集群的所有节点上运行