将多个 Dataframe 行转换为一行
Posted
技术标签:
【中文标题】将多个 Dataframe 行转换为一行【英文标题】:Transform several Dataframe rows into a single row 【发布时间】:2019-07-21 12:22:38 【问题描述】:以下是 Dataframe sn-p 示例:
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_lid |trace |message |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1103960793391132675|47c10fda9b40407c998c154dc71a9e8c|[app.py:208] Prediction label: "id": 617, "name": "CENSORED", score=0.3874854505062103 |
|1103960793391132676|47c10fda9b40407c998c154dc71a9e8c|[app.py:224] Similarity values: [0.6530804801919593, 0.6359653379418201] |
|1103960793391132677|47c10fda9b40407c998c154dc71a9e8c|[app.py:317] Predict=s3://CENSORED/scan_4745/scan4745_t1_r0_c9_2019-07-15-10-32-43.jpg trait_id=112 result=InferenceResult(predictions=[Prediction(label_id='230', label_name='H3', probability=0.0), Prediction(label_id='231', label_name='Other', probability=1.0)], selected=Prediction(label_id='231', label_name='Other', probability=1.0)). Took 1.3637824058532715 seconds |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我有数百万个类似日志的结构,它们都可以按会话独有的跟踪进行分组。
我希望将这些行集转换为单行,本质上是对它们进行映射,对于此示例,我将从第一个名称中提取 "id": 617
,从第二行中提取值 0.6530804801919593, 0.6359653379418201
,从第三行中提取将Prediction(label_id='231', label_name='Other', probability=1.0)
值排成一行。
然后我将组成一个包含列的新表:
| trace | id | similarity | selected |
与价值观:
| 47c10fda9b40407c998c154dc71a9e8c | 617 | 0.6530804801919593, 0.6359653379418201 | 231 |
我应该如何在 pyspark 中的多行上实现这个组映射转换?
【问题讨论】:
【参考方案1】:为方便起见,我用 Scala 编写了以下示例,但它应该很容易转换为 Pyspark。
1) 通过“消息”字段上的regexp_extract
在数据框中创建新列。如果正则表达式匹配,这将产生所需的值,否则将产生空字符串:
scala> val dss = ds.select(
| 'trace,
| regexp_extract('message, "\"id\": (\\d+),", 1) as "id",
| regexp_extract('message, "Similarity values: \\[(\\-?[0-9\\.]+, \\-?[0-9\\.]+)\\]", 1) as "similarity",
| regexp_extract('message, "selected=Prediction\\(label_id='(\\d+)'", 1) as "selected"
| )
dss: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
scala> dss.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace |id |similarity |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617| | |
|47c10fda9b40407c998c154dc71a9e8c| |0.6530804801919593, 0.6359653379418201| |
|47c10fda9b40407c998c154dc71a9e8c| | |231 |
+--------------------------------+---+--------------------------------------+--------+
2) 按“trace”分组并消除正则表达式不匹配的情况。快速而肮脏的方法(如下所示)是选择每一列的 max
,但如果您希望每次跟踪遇到多个匹配项,则可能需要做一些更复杂的事情:
scala> val ds_final = dss.groupBy('trace).agg(max('id) as "id", max('similarity) as "similarity", max('selected) as "selected")
ds_final: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]
scala> ds_final.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace |id |similarity |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|0.6530804801919593, 0.6359653379418201|231 |
+--------------------------------+---+--------------------------------------+--------+
【讨论】:
【参考方案2】:我最终在
行中使用了一些东西expected_schema = StructType([
StructField("event_timestamp", TimestampType(), False),
StructField("trace", StringType(), False),
...
])
@F.pandas_udf(expected_schema, F.PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def transform(pdf):
output =
for l in pdf.to_dict(orient='record'):
x = re.findall(r'^(\[.*:\d+\]) (.*)', l['message'])[0][1]
...
return pd.DataFrame(data=[output])
df.groupby('trace').apply(transform)
【讨论】:
以上是关于将多个 Dataframe 行转换为一行的主要内容,如果未能解决你的问题,请参考以下文章
将 pandas Dataframe 的行转换为可迭代的字符串列表
如何将 pandas DataFrame 行保存为 JSON 字符串?