使用 numpy.npv 函数的输出向数据框中添加一列

Posted

技术标签:

【中文标题】使用 numpy.npv 函数的输出向数据框中添加一列【英文标题】:Adding a column to a data frame using the output of numpy.npv function 【发布时间】:2019-04-16 02:53:07 【问题描述】:

我试图在我的 pyspark 数据框中使用 2 列来使用 numpy.npv() 函数计算净现值。我收到以下错误

return (values / (1+rate)**np.arange(0, len(values))).sum(axis=0) TypeError: len() of unsized object

我也尝试使用 numpy.npv 函数作为 udf 失败

请求帮助以解决此问题

# Creating the DataFrame
df = sc.parallelize([('a',1,100),('a',2,200),('a',3,300),('a',4,400), 
('a',5,500),('a',6,600),('b',1,23),('b',2,32),('b',3,34),('b',4,55), 
('b',5,43)]).toDF(['Name','yr','cash'])
df.show()

# Loading the requisite packages
from pyspark.sql import Window
from pyspark.sql.functions import col, collect_list
import numpy as np
w = (Window.partitionBy('Name').orderBy(col('yr').desc()).rangeBetween(Window.unboundedPreceding, 0))

df = df.withColumn('cash_list', collect_list('cash').over(w))    
df.show(truncate=False)
df = df.withColumn('discount_rate', lit(0.3))

#calculate npv
df = df.withColumn('npv_value', np.npv(df.discount_rate, df.cash_list))

【问题讨论】:

假设Name=1 yr=1,你想要600/(1.3)**5 + 500/(1.3)**4 + ...200/(1.3)**1 + 100/ (1.3)**0?或者另一个在附近?顺便说一句,您不能以这种方式使用np.npv。为此,您必须使用UDF 正确!这就是我想要达到的目标。我认为 NPV 函数会为我做到这一点。但很明显,我离得很远。我尝试了UDF方式。重新编写函数如下@udf(returnType=DoubleType()) def calc_npv_value(rate, values): values = np.asarray(values) return (values / (1+rate)**np.arange(1,len( values)+1)).sum(axis=0) 但仍然遇到错误。如果您可以协助使用 UDF 方法,将会非常有帮助 【参考方案1】:

使用OP提供的代码,我们得到以下DataFrame -

df.show(truncate=False)
+----+---+----+------------------------------+-------------+
|Name|yr |cash|cash_list                     |discount_rate|
+----+---+----+------------------------------+-------------+
|b   |5  |43  |[43]                          |0.3          |
|b   |4  |55  |[43, 55]                      |0.3          |
|b   |3  |34  |[43, 55, 34]                  |0.3          |
|b   |2  |32  |[43, 55, 34, 32]              |0.3          |
|b   |1  |23  |[43, 55, 34, 32, 23]          |0.3          |
|a   |6  |600 |[600]                         |0.3          |
|a   |5  |500 |[600, 500]                    |0.3          |
|a   |4  |400 |[600, 500, 400]               |0.3          |
|a   |3  |300 |[600, 500, 400, 300]          |0.3          |
|a   |2  |200 |[600, 500, 400, 300, 200]     |0.3          |
|a   |1  |100 |[600, 500, 400, 300, 200, 100]|0.3          |
+----+---+----+------------------------------+-------------+

OP 想要计算Net Present Value (NPV),为此他想使用UDF。对于Name=a yr=1,NPV 如下 -

600/(1.3)^5 + 500/(1.3)^4 + 400/(1.3)^3 + 300/(1.3)^2 + 200/(1.3)^1 + 100/(1.3)

# Creating a function and it's corresponding UDF
from pyspark.sql.functions import udf
def calculate_npv(cash_list,rate):
   # Reverse the List
   cash_list = cash_list[::-1]
   return float(np.npv(rate,cash_list))
calculate_npv = udf(calculate_npv,FloatType())

# Applying the UDF to the DataFrame below
df = df.withColumn('NPV',calculate_npv('cash_list','discount_rate'))
df.show(truncate=False)
+----+---+----+------------------------------+-------------+----------+
|Name|yr |cash|cash_list                     |discount_rate|NPV       |
+----+---+----+------------------------------+-------------+----------+
|b   |5  |43  |[43]                          |0.3          |43.0      |
|b   |4  |55  |[43, 55]                      |0.3          |88.07692  |
|b   |3  |34  |[43, 55, 34]                  |0.3          |101.75148 |
|b   |2  |32  |[43, 55, 34, 32]              |0.3          |110.27037 |
|b   |1  |23  |[43, 55, 34, 32, 23]          |0.3          |107.823364|
|a   |6  |600 |[600]                         |0.3          |600.0     |
|a   |5  |500 |[600, 500]                    |0.3          |961.53845 |
|a   |4  |400 |[600, 500, 400]               |0.3          |1139.645  |
|a   |3  |300 |[600, 500, 400, 300]          |0.3          |1176.65   |
|a   |2  |200 |[600, 500, 400, 300, 200]     |0.3          |1105.1154 |
|a   |1  |100 |[600, 500, 400, 300, 200, 100]|0.3          |950.08875 |
+----+---+----+------------------------------+-------------+----------+

【讨论】:

非常感谢。这行得通。我会试着弄清楚我做错了什么。减去列表的反转,我几乎做了同样的事情。我看到的一个区别是 UDF 的输入是 (discount_rate, cash_list) 这是您给我的一些非常有用的知识。非常感谢和感激! 希望你一切顺利。

以上是关于使用 numpy.npv 函数的输出向数据框中添加一列的主要内容,如果未能解决你的问题,请参考以下文章

通过用户定义的函数向数据框添加列

如何为存在的每一行向 Spark 数据框中添加新列?

向数据框中的新列添加值

如何创建 Pyspark UDF 以向数据框添加新列

向 JSON RPC 回调函数添加参数

如何在数据框中进行后向/前向数据分组?