Pyspark 数据框:访问列(TypeError:列不可迭代)

Posted

技术标签:

【中文标题】Pyspark 数据框:访问列(TypeError:列不可迭代)【英文标题】:Pyspark Data Frame: Access to a Column (TypeError: Column is not iterable) 【发布时间】:2020-04-10 12:29:28 【问题描述】:

我正在为 PySpark 代码苦苦挣扎,特别是,我想在一个不可迭代的对象 col 上调用一个函数。

from pyspark.sql.functions import col, lower, regexp_replace, split
from googletrans import Translator

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, r"^rt ", "")
  c = regexp_replace(c, r"(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "") #removePunctuation 
  c = regexp_replace(c, r"\n", " ")
  c = regexp_replace(c, r"   ", " ")
  c = regexp_replace(c, r"  ", " ")  
#   c = translator.translate(c, dest='en', src='auto')
  return c

clean_text_df = uncleanedText.select(clean_text(col("unCleanedCol")).alias("sentence"))
clean_text_df.printSchema()
clean_text_df.show(10)

只要我在c = translator.translate(c, dest='en', src='auto') 中运行代码,Spark 显示的错误就是TypeError: Column is not iterable

我想做的是逐字翻译:

发件人:

+--------------------+
|            sentence|
+--------------------+
|ciao team there a...|
|dear itteam i urg...|
|buongiorno segnal...|
|hi team regarding...|
|hello please add ...|
|ciao vorrei effet...|
|buongiorno ho vis...|
+--------------------+

收件人:

+--------------------+
|            sentence|
+--------------------+
|hello team there ...|
|dear itteam i urg...|
|goodmorning segna...|
|hi team regarding...|
|hello please add ...|
|hello would effet...|
|goodmorning I see...|
+--------------------+

DataFrame 的架构是:

root
 |-- sentence: string (nullable = true)

谁能帮帮我?

非常感谢

【问题讨论】:

您必须定义一个udf 才能使用Translator.translate。其他函数调用都是pyspark内置函数。 令人惊讶的是,我找不到这个问题的良好重复目标。我找到的最接近的是:TypeError: 'Column' object is not callable using WithColumn。如果有人找到更好的,请联系我。 【参考方案1】:

PySpark 只是为支持 Apache Spark 而编写的 Python API。如果要使用自定义 python 函数,则必须定义用户定义函数 (udf)。

保持clean_text() 函数原样(注释掉translate 行)并尝试以下操作:

from pyspark.sql.functions import udf
from pyspark.sql.Types import StringType

def translate(c):
  return translator.translate(c, dest='en', src='auto')

translateUDF = udf(translate, StringType())

clean_text_df = uncleanedText.select(
  translateUDF(clean_text(col("unCleanedCol"))).alias("sentence")
)

您原来的clean_textlowerregexp_replace)中的其他函数是内置的 spark 函数,并在pyspark.sql.Column 上运行。

请注意,使用此 udf 会降低性能。见:Spark functions vs UDF performance?

【讨论】:

以上是关于Pyspark 数据框:访问列(TypeError:列不可迭代)的主要内容,如果未能解决你的问题,请参考以下文章

如何访问pyspark数据框中的动态列

带有点“。”的数据框的 pyspark 访问列

pyspark pandas 对象作为数据框 - TypeError

在jupyter中访问数据框元素pyspark

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加

使用 numpy.npv 函数的输出向数据框中添加一列