使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式
Posted
技术标签:
【中文标题】使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式【英文标题】:Use data in Spark Dataframe column as condition or input in another column expression 【发布时间】:2016-08-30 19:05:52 【问题描述】:我有一个我想在 PySpark 2.0 中执行的操作,它很容易作为 df.rdd.map
执行,但由于出于性能原因我更愿意留在 Dataframe 执行引擎中,我想找到一种方法来仅使用 Dataframe 操作执行此操作。
RDD 风格的操作是这样的:
def precision_formatter(row):
formatter = "%.f".format(row.precision)
return row + [formatter % row.amount_raw / 10 ** row.precision]
df = df.rdd.map(precision_formatter)
基本上,我有一列告诉我,对于每一行,我的字符串格式化操作的精度应该是多少,我想根据该精度有选择地将“amount_raw”列格式化为字符串。
【问题讨论】:
【参考方案1】:我不知道如何使用一个或多个列的内容作为另一个 Column 操作的输入。我能想到的最接近的方法是建议使用Column.when
和一组外部定义的布尔运算,这些运算对应于一列或多列中的一组可能的布尔条件/情况。
在这种特定情况下,例如,如果您可以获得(或者更好的是,已经拥有)row.precision
的所有可能值,那么您可以遍历该集合并为每个值应用 Column.when
操作放。我相信这一套可以通过df.select('precision').distinct().collect()
获得。
因为pyspark.sql.functions.when
和Column.when
操作本身返回一个Column
对象,所以您可以迭代集合中的项目(无论它是如何获得的)并保持以编程方式相互“附加”when
操作,直到你已经用尽了这组:
import pyspark.sql.functions as PSF
def format_amounts_with_precision(df, all_precisions_set):
amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType()))
for precision in all_precisions_set:
if precision != 0: # this is a messy way of having a base case above
fmt_str = '%.f'.format(precision)
amt_col = amt_col.when(df['precision'] == precision,
PSF.format_string(fmt_str, df['amount_raw'] / 10 ** precision)
return df.withColumn('amount', amt_col)
【讨论】:
【参考方案2】:您可以使用 python UDF 来完成。它们可以获取尽可能多的输入值(来自行列的值)并输出单个输出值。它看起来像这样:
from pyspark.sql import types as T, functions as F
from pyspark.sql.function import udf, col
# Create example data frame
schema = T.StructType([
T.StructField('precision', T.IntegerType(), False),
T.StructField('value', T.FloatType(), False)
])
data = [
(1, 0.123456),
(2, 0.123456),
(3, 0.123456)
]
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
# Define UDF and apply it
def format_func(precision, value):
format_str = ":." + str(precision) + "f"
return format_str.format(value)
format_udf = F.udf(format_func, T.StringType())
new_df = df.withColumn('formatted', format_udf('precision', 'value'))
new_df.show()
此外,如果您想使用全局值而不是列精度值,则可以在调用时使用 lit(..) 函数,如下所示:
new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value'))
【讨论】:
以上是关于使用 Spark Dataframe 列中的数据作为条件或输入另一个列表达式的主要内容,如果未能解决你的问题,请参考以下文章
Spark DataFrame ArrayType 或 MapType 用于检查列中的值
将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中
我们可以在 Spark DataFrame 列中使用 Pandas 函数吗?如果是这样,怎么做?