kafka 到 pyspark 结构化流,将 json 解析为数据帧

Posted

技术标签:

【中文标题】kafka 到 pyspark 结构化流,将 json 解析为数据帧【英文标题】:kafka to pyspark structured streaming, parsing json as dataframe 【发布时间】:2018-03-21 12:13:57 【问题描述】:

我正在尝试使用 spark 结构化流 (spark v2.2.0) 来使用来自 kafka 的 json 数据。但是我遇到了以下错误。

pyspark.sql.utils.StreamingQueryException: '缺少必需的 没有默认值的配置“partition.assignment.strategy” 价值。

有人知道为什么吗?该作业是使用下面的 spark-submit 提交的。

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 sparksstream.py

这是整个 python 脚本。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("test") \
    .getOrCreate()

# Define schema of json
schema = StructType() \
        .add("Session-Id", StringType()) \
        .add("TransactionTimestamp", IntegerType()) \
        .add("User-Name", StringType()) \
        .add("ID", StringType()) \
        .add("Timestamp", IntegerType())

# load data into spark-structured streaming
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xxxx:9092") \
      .option("subscribe", "topicName") \
      .load() \
      .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

# Print output
query = df.writeStream \
            .outputMode("append") \
            .format("console") \
            .start()

【问题讨论】:

你在这方面有什么进展吗? 你好,不是真的~我放弃了,用另一种方法代替解析为json~ @jake,我的查询与 col 函数有关。我在函数模块中没有找到任何调用函数。如何使用 col 函数。 @Jake 你用了什么方法?我和你一样。 【参考方案1】:

使用它来提交:

spark-submit \
--conf "spark.driver.extraClassPath=$SPARK_HOME/jars/kafka-clients-1.1.0.jar"  \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
sparksstream.py

假设您已将 kafka-clients*jar 下载到 $SPARK_HOME/jars 文件夹中

【讨论】:

以上是关于kafka 到 pyspark 结构化流,将 json 解析为数据帧的主要内容,如果未能解决你的问题,请参考以下文章

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

Pyspark 结构化流处理

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

KafkaConsumer 对于多线程访问 pyspark 是不安全的

PySpark 处理流数据并将处理后的数据保存到文件