使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

Posted

技术标签:

【中文标题】使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问【英文标题】:Make JSON in Spark's structured streaming accessible in python (pyspark) as dataframe without RDD 【发布时间】:2019-08-29 09:02:57 【问题描述】:

我使用 Spark 2.4.3 并希望使用来自 Kafka 源的数据进行结构化流式传输。到目前为止,以下代码有效:

from pyspark.sql import SparkSession
from ast import literal_eval

spark = SparkSession.builder \
    .appName("streamer") \
    .getOrCreate()

# Create DataFrame representing the stream
dsraw = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", """"test":"0":2707422""") \
  .load()

# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")

# Do query on the raw data
rawQuery = dsraw \
     .writeStream \
     .queryName("qraw") \
     .format("memory") \
     .start()
raw = spark.sql("select * from qraw")

# Do query on the converted data
dsQuery = ds \
     .writeStream \
     .queryName("qds") \
     .format("memory") \
     .start()
sdf = spark.sql("select * from qds")

# I have to access raw otherwise I get errors...
raw.select("value").show()

sdf.show()

# Make the json stuff accessable
sdf2 = sdf.rdd.map(lambda val: literal_eval(val['value']))
print(sdf2.first())

但我真的想知道倒数第二行的转换是否是最有用/最快的。你有其他想法吗?我可以使用 (Spark) 数据帧而不是 RDD 吗?

脚本的输出是

+--------------------+
|               value|
+--------------------+
|
  "Signal": "[...|
|
  "Signal": "[...|
+--------------------+
only showing top 20 rows

'Signal': '[1234]', 'Value': 0.0, 'Timestamp': '2019-08-27T13:51:43.7146327Z'

【问题讨论】:

【参考方案1】:

有一些解决方案,但只有这个经过调整的解决方案才有效(感谢 https://***.com/a/51070457/3021134):

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

schema = StructType(
        [
                StructField("Signal", StringType()),
                StructField("Value", DoubleType()),
                StructField("Timestamp", StringType())
        ]
)

sdf.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))\
    .show()

输出:

+--------+-----------+--------------------+
|  Signal|      Value|           Timestamp|
+--------+-----------+--------------------+
|[123456]|        0.0|2019-08-27T13:51:...|
|[123457]|        0.0|2019-08-27T13:51:...|
|[123458]| 318.880859|2019-08-27T13:51:...|
|[123459]|   285.5808|2019-08-27T13:51:...|

【讨论】:

以上是关于使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问的主要内容,如果未能解决你的问题,请参考以下文章