在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列

Posted

技术标签:

【中文标题】在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列【英文标题】:In StructStreaming of pyspark; How do I convert each row (a json-formatted string) in the DataFrame into multiple columns 【发布时间】:2020-04-21 06:49:26 【问题描述】:

我的 DataFrame 结构如下所示

+--------------------+
|              values|
+--------------------+
|"user_id":"00000...|
+--------------------+

然后这里的字符串结构是这样的


    "user_id":"00000000002",
    "client_args":
        "order_by":"id",
        "page":"4",
        "keyword":"Blue flowers",
        "reverse":"false"
    ,
    "keyword_tokenizer":[
        "Blue",
        "flowers"
    ],
    "items":[
        "00000065678",
        "00000065707",
        "00000065713",
        "00000065741",
        "00000065753",
        "00000065816",
        "00000065875",
        "00000066172"
    ]

我希望这个 DataFrame 看起来像这样

+---------------+-------------------+------------------+----------------------------+
|    user_id    |     client_args   | keyword_tokenizer|            items          |
+---------------+-------------------+------------------+----------------------------+
|00000000000001 |"order_by":"",...|["Blue","flowers"]|["000006578","00002458",...]|
+---------------+-------------------+------------------+----------------------------+

我的代码是这样的

lines = spark_session\
    .readStream\
    .format("socket")\
    .option("host", "127.0.0.1")\
    .option("port", 9998)\
    .load()

@f.udf("struct<user_id:string,client_args:string,keyword_tokenizer:array>")
def str_to_json(s):
    return json.loads(s)

lines.select(str_to_json(lines.values))

但这只会将它们转换为 JSON,不能转换为列拆分。 我该怎么办?

此外: 后来我找到了这个方法来解决这个问题。 效率低吗?

schema = StructType([StructField("user_id",StringType()),
                     StructField("client_args", StructType([
                         StructField("order_by", StringType()),
                         StructField("page", StringType()),
                         StructField("keyword", StringType()),
                         StructField("reverse", StringType()),
                     ])),
                     StructField("keyword_tokenizer", ArrayType(StringType())),
                     StructField("items", ArrayType(StringType()))])

new_df = lines.withColumn("tmp", f.from_json(lines.values, schema))\
    .withColumn("user_id", f.col("tmp").getItem("user_id"))\
    .withColumn("client_args", f.col("tmp").getItem("client_args"))\
    .withColumn("keyword_tokenizer", f.col("tmp").getItem("keyword_tokenizer"))\
    .withColumn("items", f.col("tmp").getItem("items"))\
    .drop("value", "tmp")

【问题讨论】:

这能回答你的问题吗? Pyspark: Parse a column of json strings 我很欣赏这一点,但是我的代码在'Struct Streaming'中工作,我使用了您提供的链接中的方法,并且发生了错误:必须使用writeStream执行带有流源的查询。开始 ();; \ ntextSocket ' 你能帮帮我吗? 这不是相关的错误......基本上你需要做以下事情:1)为数据创建一个模式(例如你可以读取一个JSON文件并允许推断模式;2)使用from_json使用创建的模式将字符串转换为列。这是 Scala 中的示例 - 我没有 Python 示例:github.com/alexott/dse-playground/blob/master/spark-dse/src/… 谢谢,我看了你的代码,在python中复制过来看看是不是效率低下,我把代码加到问题里了。 解码JSON后可以.select("*", "tmp.*").drop("tmp),这样就不用一一提取嵌套字段了... 【参考方案1】:

使用 pyspark 读取为 json 文件

df = spark.read.json("test.json") 
df.show()

+--------------------+--------------------+-----------------+-----------+
|         client_args|               items|keyword_tokenizer|    user_id|
+--------------------+--------------------+-----------------+-----------+
|[Blue flowers, id...|[00000065678, 000...|  [Blue, flowers]|00000000002|
+--------------------+--------------------+-----------------+-----------+

【讨论】:

对不起,我的这段代码是一个流处理器,然后用数据流传过来

以上是关于在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

pyspark:在日期和时间上重新采样 pyspark 数据帧

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

Pyspark / pyspark 内核在 jupyter notebook 中不起作用

避免在 pyspark 代码中使用 collect() 函数的最佳方法是啥?编写优化pyspark代码的最佳方法?

在 Pyspark 代码中读取嵌套的 Json 文件。 pyspark.sql.utils.AnalysisException: