使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]

Posted

技术标签:

【中文标题】使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]【英文标题】:Converting CSV to JSON using pyspark with filter and additional columns [closed] 【发布时间】:2020-11-22 00:49:36 【问题描述】:

给定一个带有标题的 CSV 输入文件:

"CorrelationID", "Message", "EventTimeStamp", "Flag", "RandomColumns"
12345, "Hello", "2019-06-09 04:25:15", "True", "blah"
12345, "Hello", "2019-06-09 04:25:18", "False", "blah"
45678, "Brick", "2019-06-09 04:26:23", "True", "blah"
78912, "Stone", "2019-06-09 04:29:50", "False", "blah"

考虑那些同时具有真假标志的 CorrelationID。忽略“flag”列中不包含“true”和“false”值的其余行

EventTimeStamp True 标志的值为 StartTimeEventTimeStamp False 标志的值为 EndTime

JSON 文件格式作为输出:

"CorrelationID": "12345","Message":"Hello","StartTime":"2019-06-09 04:25:15","EndTime":"2019-06-09 04:25:18"

【问题讨论】:

这是一个值得重新讨论的好问题! 【参考方案1】:

使用groupBy & 在agg 函数内部使用collect_set,first & last 函数。检查下面的代码。

from pyspark.sql import functions as F
df \
.withColumn(\ # casting eventtimestamp to timestamp
    "eventtimestamp", \
    F.col("eventtimestamp").cast("timestamp")\
) \
.orderBy(F.col("eventtimestamp").asc) \ # sorting eventtimestamp asc
.groupBy(F.col("correlationid"),F.col("Message")) \ # grouping records based on correlationid
.agg( \
    F.first(F.col("eventtimestamp")).cast("string").alias("StartTime"),\ # First value of eventtimestamp as StartTime
    F.last(F.col("eventtimestamp")).cast("string").alias("EndTime"), \ # Last value of eventtimestamp as End Time
    F.collect_set(F.col("flag")).alias("flag")\ # Collecting Set Of flags & Use size of this value in filter to get only records which has true and false for correlationid.
) \
.filter(F.size(F.col("flag")) === 2) \ 
.select( \
    F.to_json(\ # Adding required columns to inside struct to make json record
        F.struct(\
            F.col("CorrelationID"),\
            F.col("Message"), \
            F.col("StartTime"), \
            F.col("EndTime") \
        ).alias("json_data")\
    ) \
) \
.show(false)
+-------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                    |
+-------------------------------------------------------------------------------------------------------------+
|"CorrelationID":"12345","Message":"Hello","StartTime":"2019-06-09 04:25:15","EndTime":"2019-06-09 04:25:18"|
+-------------------------------------------------------------------------------------------------------------+

【讨论】:

以上是关于使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]的主要内容,如果未能解决你的问题,请参考以下文章

带有过滤器的pyspark窗口函数

PySpark:使用过滤器功能后取一列的平均值

过滤 Pyspark 中列的动态唯一组合

在pyspark中创建带有arraytype列的数据框

PySpark - 将另一列的值作为 spark 函数的参数传递

如何通过 Pyspark 中同一数据框中另一列的正则表达式值过滤数据框中的一列