使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问
Posted
技术标签:
【中文标题】使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问【英文标题】:Make JSON in Spark's structured streaming accessible in python (pyspark) as dataframe without RDD 【发布时间】:2019-08-29 09:02:57 【问题描述】:我使用 Spark 2.4.3 并希望使用来自 Kafka 源的数据进行结构化流式传输。到目前为止,以下代码有效:
from pyspark.sql import SparkSession
from ast import literal_eval
spark = SparkSession.builder \
.appName("streamer") \
.getOrCreate()
# Create DataFrame representing the stream
dsraw = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", """"test":"0":2707422""") \
.load()
# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")
# Do query on the raw data
rawQuery = dsraw \
.writeStream \
.queryName("qraw") \
.format("memory") \
.start()
raw = spark.sql("select * from qraw")
# Do query on the converted data
dsQuery = ds \
.writeStream \
.queryName("qds") \
.format("memory") \
.start()
sdf = spark.sql("select * from qds")
# I have to access raw otherwise I get errors...
raw.select("value").show()
sdf.show()
# Make the json stuff accessable
sdf2 = sdf.rdd.map(lambda val: literal_eval(val['value']))
print(sdf2.first())
但我真的想知道倒数第二行的转换是否是最有用/最快的。你有其他想法吗?我可以使用 (Spark) 数据帧而不是 RDD 吗?
脚本的输出是
+--------------------+
| value|
+--------------------+
|
"Signal": "[...|
|
"Signal": "[...|
+--------------------+
only showing top 20 rows
'Signal': '[1234]', 'Value': 0.0, 'Timestamp': '2019-08-27T13:51:43.7146327Z'
【问题讨论】:
【参考方案1】:有一些解决方案,但只有这个经过调整的解决方案才有效(感谢 https://***.com/a/51070457/3021134):
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
schema = StructType(
[
StructField("Signal", StringType()),
StructField("Value", DoubleType()),
StructField("Timestamp", StringType())
]
)
sdf.withColumn("value", from_json("value", schema))\
.select(col('value.*'))\
.show()
输出:
+--------+-----------+--------------------+
| Signal| Value| Timestamp|
+--------+-----------+--------------------+
|[123456]| 0.0|2019-08-27T13:51:...|
|[123457]| 0.0|2019-08-27T13:51:...|
|[123458]| 318.880859|2019-08-27T13:51:...|
|[123459]| 285.5808|2019-08-27T13:51:...|
【讨论】:
以上是关于使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问的主要内容,如果未能解决你的问题,请参考以下文章
SPARK 结构化流中的 StructField 是不是存在错误