基于列 pyspark 的条件列选择

Posted

技术标签:

【中文标题】基于列 pyspark 的条件列选择【英文标题】:Conditional Column Select based on column pyspark 【发布时间】:2021-07-05 17:31:18 【问题描述】:

我有所有元素的输入数据集

+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+
|ingestion_connection_name|ingestion_source|hostname                |port |database  |schema      |username |topicname                  |brokers                                                    |startingoffsets|
+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+
|oradom_conn              |ORACLE          |wdummyRdbora01          |1521 |domabcr   |n4domop2    |n4domop2 |                           |                                                           |               |
|hdfs_connection          |HDFS            |                        |     |          |            |         |                           |                                                           |               |
|kafka_connection         |KAFKA           |                        |     |          |            |         |streaming.ELK.poc          |abcdedfdev001:9092,abcdedfdev002:9092,abcdedfdev003:9092|               |
+-------------------------+----------------+------------------------+-----+----------+------------+---------+---------------------------+-----------------------------------------------------------+---------------+

预期输出


    "ingestion_connection_details" : [
    
    "ingestion_connection_name": "oradom_conn",
    "ingestion_source" : "ORACLE",
    "hostname" : "wdummyRdbora01",
    "port" :"1521",
    "database" :"domabcr",
    "schema" :"domabcrsc",
    "username" :"n4domop2"
    ,
    
        "ingestion_connection_name": "kafka_connection",
        "ingestion_source" : "kafka",
        "topicname" : "streaming.ELK.poc",
        "brokers" : "abcdedfdev001:9092,abcdedfdev002:9092,abcdedfdev003:9092",
        "startingOffsets" :  ""
    ,
    
    "ingestion_connection_name": "hdfs_connection",
    "ingestion_source" : "hdfs"
    
    ]

代码

results = df.toJSON().map(lambda j: json.loads(j)).collect()
with open('/hadoopData/bdipoc/poc/logs/data.json', 'w', encoding='utf-8') as f:
    json.dump(results, f, ensure_ascii=False, indent=4)

此代码仅生成包含所有元素的记录数组。但是我如何希望将输入写入带有 ingestion_connection_details 标记的平面文件,并且只为每个源类型提供相关列

【问题讨论】:

【参考方案1】:

您可以使用 Spark 的sql functions 的组合来获得所需的结果:

from pyspark.sql import functions as F

df.withColumn("map", F.map_from_arrays(F.array([F.lit(c) for c in df.columns]),F.array(df.columns)))\
  .withColumn("filtered_map", F.expr("map_filter(map, (k,v) -> v is not null)")) \
  .withColumn("json", F.to_json("filtered_map"))
map 包含一个映射,将所有列名称映射到它们各自的值 在filtered_map 列中,仅保留值不为空的那些键 json 列包含最终的 json 字符串

从测试数据开始

+----+----+----+---+
|  c1|  c2|  c3| c4|
+----+----+----+---+
|   A|   B|null|  D|
|  10|null|  20| 30|
|null|null|null| 30|
+----+----+----+---+

最终结果是

+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+
|c1  |c2  |c3  |c4 |map                                           |filtered_map                  |json                           |
+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+
|A   |B   |null|D  |c1 -> A, c2 -> B, c3 -> null, c4 -> D       |c1 -> A, c2 -> B, c4 -> D   |"c1":"A","c2":"B","c4":"D"   |
|10  |null|20  |30 |c1 -> 10, c2 -> null, c3 -> 20, c4 -> 30    |c1 -> 10, c3 -> 20, c4 -> 30|"c1":"10","c3":"20","c4":"30"|
|null|null|null|30 |c1 -> null, c2 -> null, c3 -> null, c4 -> 30|c4 -> 30                    |"c4":"30"                    |
+----+----+----+---+----------------------------------------------+------------------------------+-------------------------------+

对于小于 3.0 的 Spark 版本,可以使用 UDF:

import json
from pyspark.sql import functions as F
from pyspark.sql import types as T

def to_json(cols, values):
  data = 
  for i, c in enumerate(cols):
    if values[i] != None:
      data[c] = values[i]
  return json.dumps(data)

u=F.udf(to_json, T.StringType())
df.withColumn("json", u(F.array([F.lit(c) for c in df.columns]),F.array(df.columns))).show(truncate=False)

【讨论】:

不幸的是,我仍在 HDP 中,使用 Spark 2.3.2 函数 map_from_arrays is not found 错误 @Rata 对于旧的 Spark 版本,可以使用 udf if values[i] != None - 对我不起作用,所以我改为 if len(values[i]) >0: 而是获得了第一组所需的输出。将分组并在数据上应用结构以获得所需的输出 当我尝试写回文件时,这些字符串用 \" for " df = df.withColumn("jsongrp",F.lit('1')) df_2=df.groupby( 'jsongrp').agg(F.collect_list('json').alias("ingestion_connection_details")) 结果 = df.toJSON().map(lambda j: json.loads(j)).collect() 与 open( '/hadoopData/bdipoc/poc/logs/data.json', 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=4) 你使用哪个 Python 版本?

以上是关于基于列 pyspark 的条件列选择的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中使用列条件替换空值

如何在 pyspark 中创建新列,其中条件取决于列的后续值?

Pyspark:基于所有列减去/差异 pyspark 数据帧

带有选择位置的 pyspark 新列

使用条件结果列连接 PySpark 数据帧

如何根据pyspark中的行和列条件过滤多行