Spark 根据现有列的映射值创建新列
Posted
技术标签:
【中文标题】Spark 根据现有列的映射值创建新列【英文标题】:Spark creating a new column based on a mapped value of an existing column 【发布时间】:2019-06-24 22:09:20 【问题描述】:我正在尝试将我的数据框中的一列的值映射到一个新值并使用 UDF 将其放入一个新列中,但我无法让 UDF 接受一个不是列的参数.例如我有一个这样的数据框dfOriginial
:
+-----------+-----+
|high_scores|count|
+-----------+-----+
| 9| 1|
| 21| 2|
| 23| 3|
| 7| 6|
+-----------+-----+
我正在尝试了解数值所在的 bin,因此我可以构建一个 bin 列表,如下所示:
case class Bin(binMax:BigDecimal, binWidth:BigDecimal)
val binMin = binMax - binWidth
// only one of the two evaluations can include an "or=", otherwise a value could fit in 2 bins
def fitsInBin(value: BigDecimal): Boolean = value > binMin && value <= binMax
def rangeAsString(): String =
val sb = new StringBuilder()
sb.append(trimDecimal(binMin)).append(" - ").append(trimDecimal(binMax))
sb.toString()
然后我想像这样转换我的旧数据框以制作dfBin
:
+-----------+-----+---------+
|high_scores|count|bin_range|
+-----------+-----+---------+
| 9| 1| 0 - 10 |
| 21| 2| 20 - 30 |
| 23| 3| 20 - 30 |
| 7| 6| 0 - 10 |
+-----------+-----+---------+
这样我就可以通过调用.groupBy("bin_range").count()
来最终获得垃圾箱实例的计数。
我正在尝试通过使用带有 UDF 的 withColumn
函数来生成 dfBin
。
这是我尝试使用的 UDF 代码:
val convertValueToBinRangeUDF = udf((value:String, binList:List[Bin]) =>
val number = BigDecimal(value)
val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
bin.rangeAsString()
)
val binList = List(Bin(10, 10), Bin(20, 10), Bin(30, 10), Bin(40, 10), Bin(50, 10))
val dfBin = dfOriginal.withColumn("bin_range", convertValueToBinRangeUDF(col("high_scores"), binList))
但它给了我一个类型不匹配:
Error:type mismatch;
found : List[Bin]
required: org.apache.spark.sql.Column
val valueCountsWithBin = valuesCounts.withColumn(binRangeCol, convertValueToBinRangeUDF(col(columnName), binList))
看到 UDF 的定义让我觉得它应该可以很好地处理转换,但显然不是,有什么想法吗?
【问题讨论】:
【参考方案1】:问题是UDF
的参数都应该是列类型。一种解决方案是将binList
转换为一列并将其传递给UDF
,类似于当前代码。
不过,稍微调整一下UDF
,将其变成def
,会更简单。这样就可以方便的传递其他非列类型的数据了:
def convertValueToBinRangeUDF(binList: List[Bin]) = udf((value:String) =>
val number = BigDecimal(value)
val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
bin.rangeAsString()
)
用法:
val dfBin = valuesCounts.withColumn("bin_range", convertValueToBinRangeUDF(binList)($"columnName"))
【讨论】:
这行得通,只需添加评论,当在withColumn
调用中调用它时,它必须被柯里化:val dfBin = valuesCounts.withColumn("bin_range", convertValueToBinRangeUDF(binList)(valuesCounts.col(columnName)))
@NateH06:啊,是的,我忘了提这个。我在答案中添加了您的使用代码。【参考方案2】:
试试这个 -
scala> case class Bin(binMax:BigDecimal, binWidth:BigDecimal)
| val binMin = binMax - binWidth
|
| // only one of the two evaluations can include an "or=", otherwise a value could fit in 2 bins
| def fitsInBin(value: BigDecimal): Boolean = value > binMin && value <= binMax
|
| def rangeAsString(): String =
| val sb = new StringBuilder()
| sb.append(binMin).append(" - ").append(binMax)
| sb.toString()
|
|
defined class Bin
scala> val binList = List(Bin(10, 10), Bin(20, 10), Bin(30, 10), Bin(40, 10), Bin(50, 10))
binList: List[Bin] = List(Bin(10,10), Bin(20,10), Bin(30,10), Bin(40,10), Bin(50,10))
scala> spark.udf.register("convertValueToBinRangeUDF", (value: String) =>
| val number = BigDecimal(value)
| val bin = binList.find( bin => bin.fitsInBin(number)).getOrElse(Bin(BigDecimal(0), BigDecimal(0)))
| bin.rangeAsString()
| )
res13: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
//-- Testing with one record
scala> val dfOriginal = spark.sql(s""" select "9" as `high_scores`, "1" as count """)
dfOriginal: org.apache.spark.sql.DataFrame = [high_scores: string, count: string]
scala> dfOriginal.createOrReplaceTempView("dfOriginal")
scala> val dfBin = spark.sql(s""" select high_scores, count, convertValueToBinRangeUDF(high_scores) as bin_range from dfOriginal """)
dfBin: org.apache.spark.sql.DataFrame = [high_scores: string, count: string ... 1 more field]
scala> dfBin.show(false)
+-----------+-----+---------+
|high_scores|count|bin_range|
+-----------+-----+---------+
|9 |1 |0 - 10 |
+-----------+-----+---------+
希望这会有所帮助。
【讨论】:
这很好,但我应该指定,binList 是动态的,需要作为参数传递给 UDF,而不是硬编码,这让我感到困惑。 我编辑了我的帖子。 binList 可以在注册 udf 之前在 udf 之外声明。您可以通过这种方式保持 binList 动态。此外,您可以决定是否要从某个文件或配置单元表中填充 binList 以控制来自外部代码的范围值。以上是关于Spark 根据现有列的映射值创建新列的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]
Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值