将 1 到 n 个项目连接到新的 spark 列中

Posted

技术标签:

【中文标题】将 1 到 n 个项目连接到新的 spark 列中【英文标题】:Concat 1 to n items into new spark column 【发布时间】:2021-05-28 03:15:18 【问题描述】:

我尝试动态连接字段,根据一些配置设置,目标是创建一个包含 1 到 n 个字段合并值的新字段。

language = "JP;EN"
language = list(str(item) for item in language.split(";"))
no_langs = len(language)

# check if columns for multi-language exists
for lang in language:
   doc_lang = "doctor.name_" + lang
   if doc_lang not in case_df.columns:
      case_df_final = AddColumn(case_df, doc_lang)

### combine translations of masterdata
case_df = case_df.withColumn(
   "doctor",
    F.concat(
       F.col(("doctor.name_" + language[0])),
       F.lit(" // "),
       F.col(("doctor.name_" + language[1])),
  ),
)

我想要实现的是,新列是动态的,具体取决于配置的语言数量。例如。如果只使用一种语言,结果会是这样。

case_df = case_df.withColumn(
   "doctor",
    F.col(("doctor.name_" + lang[0]))
)

对于 2 种或更多语言,它应该根据列表中的顺序选择所有语言。 谢谢你的帮助。 我正在使用 Spark 2.4。使用 Python 3

预期的输出如下

【问题讨论】:

火花版本? @Srinivas Spark 2.4 与 Python3 预期输出? @Srinivas 我添加了一张图片,它显示了 3 种配置语言的预期输出,如果只配置了一种语言,它只会是例如Japanese_name 您可以使用concat_ws()。它接受一个分隔符,在您的情况下是'\\',并将连接由该字符分隔的后续列值。如果是null,则跳过该字段。意思是,如果你有一种语言,那么它根本不会使用分隔符。例如- concate_ws('//', 'doctor.name_jp', 'doctor.name_en', 'doctor.name_de') 将生成您预期的输出。 【参考方案1】:

最终的工作代码如下:

# check if columns for multi-language exists
for lang in language:
    doc_lang = "doctor.name_" + lang
    if doc_lang not in case_df.columns:
        case_df = AddColumn(case_df, doc_lang)
    doc_lang_new = doc_lang.replace(".", "_")
    case_df = case_df.withColumnRenamed(doc_lang, doc_lang_new)

doc_fields = list(map(lambda k: "doctor_name_" + k, language))
case_df = case_df.withColumn("doctor", F.concat_ws(" // ", *doc_fields))

感谢大家的帮助和提示。

【讨论】:

以上是关于将 1 到 n 个项目连接到新的 spark 列中的主要内容,如果未能解决你的问题,请参考以下文章

如何将新的 firebase 项目连接到现有的 Google Analytics(而不是创建新的 GA 属性)

连接到 RDBMS 时在 Spark 中进行分区

Laravel Spark - 无法连接到 repo

将模块的输出连接到另一个并循环此逻辑以获得值列表

如何将表数据连接到一个逗号分隔的列中[重复]

连接到 Spark 集群时的序列化问题