将多个 Dataframe 行转换为一行

Posted

技术标签:

【中文标题】将多个 Dataframe 行转换为一行【英文标题】:Transform several Dataframe rows into a single row 【发布时间】:2019-07-21 12:22:38 【问题描述】:

以下是 Dataframe sn-p 示例:

+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_lid               |trace                           |message                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1103960793391132675|47c10fda9b40407c998c154dc71a9e8c|[app.py:208] Prediction label: "id": 617, "name": "CENSORED", score=0.3874854505062103                                                                                                                                                                                                                                                                                           |
|1103960793391132676|47c10fda9b40407c998c154dc71a9e8c|[app.py:224] Similarity values: [0.6530804801919593, 0.6359653379418201]                                                                                                                                                                                                                                                                                                           |
|1103960793391132677|47c10fda9b40407c998c154dc71a9e8c|[app.py:317] Predict=s3://CENSORED/scan_4745/scan4745_t1_r0_c9_2019-07-15-10-32-43.jpg trait_id=112 result=InferenceResult(predictions=[Prediction(label_id='230', label_name='H3', probability=0.0), Prediction(label_id='231', label_name='Other', probability=1.0)], selected=Prediction(label_id='231', label_name='Other', probability=1.0)). Took 1.3637824058532715 seconds |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

我有数百万个类似日志的结构,它们都可以按会话独有的跟踪进行分组。

我希望将这些行集转换为单行,本质上是对它们进行映射,对于此示例,我将从第一个名称中提取 "id": 617,从第二行中提取值 0.6530804801919593, 0.6359653379418201,从第三行中提取将Prediction(label_id='231', label_name='Other', probability=1.0) 值排成一行。

然后我将组成一个包含列的新表:

| trace | id | similarity | selected |

与价值观:

| 47c10fda9b40407c998c154dc71a9e8c | 617 | 0.6530804801919593, 0.6359653379418201 | 231 |

我应该如何在 pyspark 中的多行上实现这个组映射转换?

【问题讨论】:

【参考方案1】:

为方便起见,我用 Scala 编写了以下示例,但它应该很容易转换为 Pyspark。

1) 通过“消息”字段上的regexp_extract 在数据框中创建新列。如果正则表达式匹配,这将产生所需的值,否则将产生空字符串:

scala> val dss = ds.select(
     | 'trace, 
     | regexp_extract('message, "\"id\": (\\d+),", 1) as "id", 
     | regexp_extract('message, "Similarity values: \\[(\\-?[0-9\\.]+, \\-?[0-9\\.]+)\\]", 1) as "similarity", 
     | regexp_extract('message, "selected=Prediction\\(label_id='(\\d+)'", 1) as "selected"
     | )
dss: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]

scala> dss.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace                           |id |similarity                            |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|                                      |        |
|47c10fda9b40407c998c154dc71a9e8c|   |0.6530804801919593, 0.6359653379418201|        |
|47c10fda9b40407c998c154dc71a9e8c|   |                                      |231     |
+--------------------------------+---+--------------------------------------+--------+

2) 按“trace”分组并消除正则表达式不匹配的情况。快速而肮脏的方法(如下所示)是选择每一列的 max,但如果您希望每次跟踪遇到多个匹配项,则可能需要做一些更复杂的事情:

scala> val ds_final = dss.groupBy('trace).agg(max('id) as "id", max('similarity) as "similarity", max('selected) as "selected")
ds_final: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]

scala> ds_final.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace                           |id |similarity                            |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|0.6530804801919593, 0.6359653379418201|231     |
+--------------------------------+---+--------------------------------------+--------+

【讨论】:

【参考方案2】:

我最终在

行中使用了一些东西
expected_schema = StructType([
   StructField("event_timestamp", TimestampType(), False),
   StructField("trace", StringType(), False),
   ...
])

@F.pandas_udf(expected_schema, F.PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def transform(pdf):
  output = 

  for l in pdf.to_dict(orient='record'):
    x = re.findall(r'^(\[.*:\d+\]) (.*)', l['message'])[0][1]
    ...

  return pd.DataFrame(data=[output])

df.groupby('trace').apply(transform)

【讨论】:

以上是关于将多个 Dataframe 行转换为一行的主要内容,如果未能解决你的问题,请参考以下文章

将 Pandas DataFrame 的行转换为列标题,

将 pandas Dataframe 的行转换为可迭代的字符串列表

SparkSQL详解

如何将 pandas DataFrame 行保存为 JSON 字符串?

如何将 DataFrame 中的每一行/单元格值转换为 pandas 中的字典列表?

将 pandas DataFrame 的每一行转换为单独的 Json 字符串