使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式

Posted

技术标签:

【中文标题】使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式【英文标题】:Use data in Spark Dataframe column as condition or input in another column expression 【发布时间】:2016-08-30 19:05:52 【问题描述】:

我有一个我想在 PySpark 2.0 中执行的操作,它很容易作为 df.rdd.map 执行,但由于出于性能原因我更愿意留在 Dataframe 执行引擎中,我想找到一种方法来仅使用 Dataframe 操作执行此操作。

RDD 风格的操作是这样的:

def precision_formatter(row):
    formatter = "%.f".format(row.precision)
    return row + [formatter % row.amount_raw / 10 ** row.precision]
df = df.rdd.map(precision_formatter)

基本上,我有一列告诉我,对于每一行,我的字符串格式化操作的精度应该是多少,我想根据该精度有选择地将“amount_raw”列格式化为字符串。

【问题讨论】:

【参考方案1】:

我不知道如何使用一个或多个列的内容作为另一个 Column 操作的输入。我能想到的最接近的方法是建议使用Column.when 和一组外部定义的布尔运算,这些运算对应于一列或多列中的一组可能的布尔条件/情况。

在这种特定情况下,例如,如果您可以获得(或者更好的是,已经拥有)row.precision 的所有可能值,那么您可以遍历该集合并为每个值应用 Column.when 操作放。我相信这一套可以通过df.select('precision').distinct().collect()获得。

因为pyspark.sql.functions.whenColumn.when 操作本身返回一个Column 对象,所以您可以迭代集合中的项目(无论它是如何获得的)并保持以编程方式相互“附加”when 操作,直到你已经用尽了这组:

import pyspark.sql.functions as PSF

def format_amounts_with_precision(df, all_precisions_set):
    amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType()))
    for precision in all_precisions_set:
        if precision != 0:  # this is a messy way of having a base case above
            fmt_str = '%.f'.format(precision)
            amt_col = amt_col.when(df['precision'] == precision,
                           PSF.format_string(fmt_str, df['amount_raw'] / 10 ** precision)

    return df.withColumn('amount', amt_col)

【讨论】:

【参考方案2】:

您可以使用 python UDF 来完成。它们可以获取尽可能多的输入值(来自行列的值)并输出单个输出值。它看起来像这样:

from pyspark.sql import types as T, functions as F
from pyspark.sql.function import udf, col

# Create example data frame
schema = T.StructType([
    T.StructField('precision', T.IntegerType(), False),
    T.StructField('value', T.FloatType(), False)
])

data = [
    (1, 0.123456),
    (2, 0.123456),
    (3, 0.123456)
]

rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

# Define UDF and apply it
def format_func(precision, value):
    format_str = ":." + str(precision) + "f"
    return format_str.format(value)

format_udf = F.udf(format_func, T.StringType())

new_df = df.withColumn('formatted', format_udf('precision', 'value'))
new_df.show()

此外,如果您想使用全局值而不是列精度值,则可以在调用时使用 lit(..) 函数,如下所示:

new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value'))

【讨论】:

以上是关于使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame ArrayType 或 MapType 用于检查列中的值

将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

我们可以在 Spark DataFrame 列中使用 Pandas 函数吗?如果是这样,怎么做?

如果 spark 数据框的特定列中的所有条目都为空,则删除

过滤包含Scala Spark数据帧中数组的列中的数组长度[重复]

定义一个接受 Spark DataFrame 中对象数组的 UDF?