使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]
Posted
技术标签:
【中文标题】使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]【英文标题】:Converting CSV to JSON using pyspark with filter and additional columns [closed] 【发布时间】:2020-11-22 00:49:36 【问题描述】:给定一个带有标题的 CSV 输入文件:
"CorrelationID", "Message", "EventTimeStamp", "Flag", "RandomColumns"
12345, "Hello", "2019-06-09 04:25:15", "True", "blah"
12345, "Hello", "2019-06-09 04:25:18", "False", "blah"
45678, "Brick", "2019-06-09 04:26:23", "True", "blah"
78912, "Stone", "2019-06-09 04:29:50", "False", "blah"
考虑仅那些同时具有真假标志的 CorrelationID。忽略“flag”列中不包含“true”和“false”值的其余行
EventTimeStamp
True 标志的值为 StartTime
和 EventTimeStamp
False 标志的值为 EndTime
JSON 文件格式作为输出:
"CorrelationID": "12345","Message":"Hello","StartTime":"2019-06-09 04:25:15","EndTime":"2019-06-09 04:25:18"
【问题讨论】:
这是一个值得重新讨论的好问题! 【参考方案1】:使用groupBy
& 在agg
函数内部使用collect_set
,first
& last
函数。检查下面的代码。
from pyspark.sql import functions as F
df \
.withColumn(\ # casting eventtimestamp to timestamp
"eventtimestamp", \
F.col("eventtimestamp").cast("timestamp")\
) \
.orderBy(F.col("eventtimestamp").asc) \ # sorting eventtimestamp asc
.groupBy(F.col("correlationid"),F.col("Message")) \ # grouping records based on correlationid
.agg( \
F.first(F.col("eventtimestamp")).cast("string").alias("StartTime"),\ # First value of eventtimestamp as StartTime
F.last(F.col("eventtimestamp")).cast("string").alias("EndTime"), \ # Last value of eventtimestamp as End Time
F.collect_set(F.col("flag")).alias("flag")\ # Collecting Set Of flags & Use size of this value in filter to get only records which has true and false for correlationid.
) \
.filter(F.size(F.col("flag")) === 2) \
.select( \
F.to_json(\ # Adding required columns to inside struct to make json record
F.struct(\
F.col("CorrelationID"),\
F.col("Message"), \
F.col("StartTime"), \
F.col("EndTime") \
).alias("json_data")\
) \
) \
.show(false)
+-------------------------------------------------------------------------------------------------------------+
|json_data |
+-------------------------------------------------------------------------------------------------------------+
|"CorrelationID":"12345","Message":"Hello","StartTime":"2019-06-09 04:25:15","EndTime":"2019-06-09 04:25:18"|
+-------------------------------------------------------------------------------------------------------------+
【讨论】:
以上是关于使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]的主要内容,如果未能解决你的问题,请参考以下文章