使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

Posted

技术标签:

【中文标题】使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问【英文标题】:Make JSON in Spark's structured streaming accessible in python (pyspark) as dataframe without RDD 【发布时间】:2019-08-29 09:02:57 【问题描述】:

我使用 Spark 2.4.3 并希望使用来自 Kafka 源的数据进行结构化流式传输。到目前为止,以下代码有效:

from pyspark.sql import SparkSession
from ast import literal_eval

spark = SparkSession.builder \
    .appName("streamer") \
    .getOrCreate()

# Create DataFrame representing the stream
dsraw = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", """"test":"0":2707422""") \
  .load()

# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")

# Do query on the raw data
rawQuery = dsraw \
     .writeStream \
     .queryName("qraw") \
     .format("memory") \
     .start()
raw = spark.sql("select * from qraw")

# Do query on the converted data
dsQuery = ds \
     .writeStream \
     .queryName("qds") \
     .format("memory") \
     .start()
sdf = spark.sql("select * from qds")

# I have to access raw otherwise I get errors...
raw.select("value").show()

sdf.show()

# Make the json stuff accessable
sdf2 = sdf.rdd.map(lambda val: literal_eval(val['value']))
print(sdf2.first())

但我真的想知道倒数第二行的转换是否是最有用/最快的。你有其他想法吗?我可以使用 (Spark) 数据帧而不是 RDD 吗?

脚本的输出是

+--------------------+
|               value|
+--------------------+
|
  "Signal": "[...|
|
  "Signal": "[...|
+--------------------+
only showing top 20 rows

'Signal': '[1234]', 'Value': 0.0, 'Timestamp': '2019-08-27T13:51:43.7146327Z'

【问题讨论】:

【参考方案1】:

有一些解决方案,但只有这个经过调整的解决方案才有效(感谢 https://***.com/a/51070457/3021134):

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

schema = StructType(
        [
                StructField("Signal", StringType()),
                StructField("Value", DoubleType()),
                StructField("Timestamp", StringType())
        ]
)

sdf.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))\
    .show()

输出:

+--------+-----------+--------------------+
|  Signal|      Value|           Timestamp|
+--------+-----------+--------------------+
|[123456]|        0.0|2019-08-27T13:51:...|
|[123457]|        0.0|2019-08-27T13:51:...|
|[123458]| 318.880859|2019-08-27T13:51:...|
|[123459]|   285.5808|2019-08-27T13:51:...|

【讨论】:

以上是关于使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问的主要内容,如果未能解决你的问题,请参考以下文章

Spark 结构化流中的临时视图

如何在 Spark 结构化流中保存通过水印丢弃的记录

SPARK 结构化流中的 StructField 是不是存在错误

带有自定义接收器的 Spark 结构化流中的输入行数

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

有没有办法将生成的 groupby 流加入到 kafka-spark 结构化流中的原始流?