基于列 pyspark 的条件列选择
Posted
技术标签:
【中文标题】基于列 pyspark 的条件列选择【英文标题】:Conditional Column Select based on column pyspark 【发布时间】:2021-07-05 17:31:18 【问题描述】:我有所有元素的输入数据集
+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+
|ingestion_connection_name|ingestion_source|hostname |port |database |schema |username |topicname |brokers |startingoffsets|
+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+
|oradom_conn |ORACLE |wdummyRdbora01 |1521 |domabcr |n4domop2 |n4domop2 | | | |
|hdfs_connection |HDFS | | | | | | | | |
|kafka_connection |KAFKA | | | | | |streaming.ELK.poc |abcdedfdev001:9092,abcdedfdev002:9092,abcdedfdev003:9092| |
+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+
预期输出
"ingestion_connection_details" : [
"ingestion_connection_name": "oradom_conn",
"ingestion_source" : "ORACLE",
"hostname" : "wdummyRdbora01",
"port" :"1521",
"database" :"domabcr",
"schema" :"domabcrsc",
"username" :"n4domop2"
,
"ingestion_connection_name": "kafka_connection",
"ingestion_source" : "kafka",
"topicname" : "streaming.ELK.poc",
"brokers" : "abcdedfdev001:9092,abcdedfdev002:9092,abcdedfdev003:9092",
"startingOffsets" : ""
,
"ingestion_connection_name": "hdfs_connection",
"ingestion_source" : "hdfs"
]
代码
results = df.toJSON().map(lambda j: json.loads(j)).collect()
with open('/hadoopData/bdipoc/poc/logs/data.json', 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=4)
此代码仅生成包含所有元素的记录数组。但是我如何希望将输入写入带有 ingestion_connection_details 标记的平面文件,并且只为每个源类型提供相关列
【问题讨论】:
【参考方案1】:您可以使用 Spark 的sql functions 的组合来获得所需的结果:
from pyspark.sql import functions as F
df.withColumn("map", F.map_from_arrays(F.array([F.lit(c) for c in df.columns]),F.array(df.columns)))\
.withColumn("filtered_map", F.expr("map_filter(map, (k,v) -> v is not null)")) \
.withColumn("json", F.to_json("filtered_map"))
列map
包含一个映射,将所有列名称映射到它们各自的值
在filtered_map
列中,仅保留值不为空的那些键
json
列包含最终的 json 字符串
从测试数据开始
+----+----+----+---+
| c1| c2| c3| c4|
+----+----+----+---+
| A| B|null| D|
| 10|null| 20| 30|
|null|null|null| 30|
+----+----+----+---+
最终结果是
+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+
|c1 |c2 |c3 |c4 |map |filtered_map |json |
+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+
|A |B |null|D |c1 -> A, c2 -> B, c3 -> null, c4 -> D |c1 -> A, c2 -> B, c4 -> D |"c1":"A","c2":"B","c4":"D" |
|10 |null|20 |30 |c1 -> 10, c2 -> null, c3 -> 20, c4 -> 30 |c1 -> 10, c3 -> 20, c4 -> 30|"c1":"10","c3":"20","c4":"30"|
|null|null|null|30 |c1 -> null, c2 -> null, c3 -> null, c4 -> 30|c4 -> 30 |"c4":"30" |
+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+
对于小于 3.0 的 Spark 版本,可以使用 UDF:
import json
from pyspark.sql import functions as F
from pyspark.sql import types as T
def to_json(cols, values):
data =
for i, c in enumerate(cols):
if values[i] != None:
data[c] = values[i]
return json.dumps(data)
u=F.udf(to_json, T.StringType())
df.withColumn("json", u(F.array([F.lit(c) for c in df.columns]),F.array(df.columns))).show(truncate=False)
【讨论】:
不幸的是,我仍在 HDP 中,使用 Spark 2.3.2 函数 map_from_arrays is not found 错误 @Rata 对于旧的 Spark 版本,可以使用 udf if values[i] != None - 对我不起作用,所以我改为 if len(values[i]) >0: 而是获得了第一组所需的输出。将分组并在数据上应用结构以获得所需的输出 当我尝试写回文件时,这些字符串用 \" for " df = df.withColumn("jsongrp",F.lit('1')) df_2=df.groupby( 'jsongrp').agg(F.collect_list('json').alias("ingestion_connection_details")) 结果 = df.toJSON().map(lambda j: json.loads(j)).collect() 与 open( '/hadoopData/bdipoc/poc/logs/data.json', 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=4) 你使用哪个 Python 版本?以上是关于基于列 pyspark 的条件列选择的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 中创建新列,其中条件取决于列的后续值?