有没有办法从 Scala 中数据框的现有列创建多个列?

Posted

技术标签:

【中文标题】有没有办法从 Scala 中数据框的现有列创建多个列?【英文标题】:Is there a way to create multiple columns from existing columns of a dataframe in Scala? 【发布时间】:2018-08-07 14:27:15 【问题描述】:

我正在尝试将 RDBMS 表引入 Hive。我通过以下方式获得了数据框:

val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
                                                   .option("dbtable", "(select * from schema.tablename where source_system_name='DB2' and period_year='2017') as year2017")
                                                   .option("user", devUserName)
                                                   .option("password", devPassword)
                                                   .option("numPartitions",15)
                                                   .load()

这些是数据框的列:

geography:string|
project:string|
reference_code:string
product_line:string
book_type:string
cc_region:string
cc_channel:string
cc_function:string
pl_market:string
ptd_balance:double
qtd_balance:double
ytd_balance:double
xx_last_update_tms:timestamp
xx_last_update_log_id:int
xx_data_hash_code:string
xx_data_hash_id:bigint

ptd_balance, qtd_balance, ytd_balance 列是双精度数据类型。我们的项目希望通过创建新列将其数据类型从 Double 转换为 String:ptd_balance_text, qtd_balance_text, ytd_balance_text 使用相同的数据以避免任何数据截断。

withColumn 将在数据框中创建一个新列。 withColumnRenamed 将重命名现有列。

数据框有近 1000 万条记录。 有没有一种有效的方法来创建多个具有相同数据和不同类型的新列与数据框中的现有列?

【问题讨论】:

如果要发生截断,根据其精度将其加载到双列时就会发生截断。稍后将其转换为字符串有什么意义? @philantrovert 哦,我不知道。在这种情况下,我如何将数据直接读入 String ? 您正在从 RDBMS 中读取数据,并且您在此处获得的数据类型取决于您的源表。如果您想将其作为字符串,请将 db_table 参数中的 select 子句更改为 cast(ptd_balance as string)varchar 或您的 rdbms 支持的任何内容。 @philantrovert 这是您建议阅读表格的方式吗:选择地理、项目、reference_code、product_line、book_type、cc_region、cc_channel、cc_function、pl_market、ptd_balance、qtd_balance、ytd_balance、xx_last_update_tms、xx_last_update_log_id、 xx_data_hash_code,xx_data_hash_id, ptd_balance::character 变化为 'ptd_balance_text', qtd_balance::character 变化为 'qtd_balance_text', ytd_balance::character 变化为 'ytd_balance_text' from schema.tablename where period_year='2017' 我给出了演员表(:: ) 到字符串(Greenplum 中不同的字符)。 【参考方案1】:

如果我处于你的位置,我会在提取查询中进行更改或要求 BI 团队付出一些努力 :P 在提取时动态添加和转换字段,但无论如何你问的是可能的。

您可以从现有列中添加列,如下所示。检查addColsTosampleDFdataframe。我希望下面的 cmets 足以理解,如果您有任何问题,请随时添加 cmets,我将编辑我的答案。

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.DataFrame, Row, SparkSession
import org.apache.spark.sql.DataFrame, Row, SparkSession

scala> val ss = SparkSession.builder().appName("TEST").getOrCreate()
18/08/07 15:51:42 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
ss: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6de4071b

//Sample dataframe with int, double and string fields
scala> val sampleDf = Seq((100, 1.0, "row1"),(1,10.12,"col_float")).toDF("col1", "col2", "col3")
sampleDf: org.apache.spark.sql.DataFrame = [col1: int, col2: double ... 1 more field]

scala> sampleDf.printSchema
root
 |-- col1: integer (nullable = false)
 |-- col2: double (nullable = false)
 |-- col3: string (nullable = true)

//Adding columns col1_string from col1 and col2_doubletostring from col2 with casting and alias
scala> val addColsTosampleDF = sampleDf.
select(sampleDf.col("col1"),
sampleDf.col("col2"),
sampleDf.col("col3"),
sampleDf.col("col1").cast("string").alias("col1_string"),
sampleDf.col("col2").cast("string").alias("col2_doubletostring"))
addColsTosampleDF: org.apache.spark.sql.DataFrame = [col1: int, col2: double ... 3 more fields]

//Schema with added columns
scala> addColsTosampleDF.printSchema
root
 |-- col1: integer (nullable = false)
 |-- col2: double (nullable = false)
 |-- col3: string (nullable = true)
 |-- col1_string: string (nullable = false)
 |-- col2_doubletostring: string (nullable = false)

 scala> addColsTosampleDF.show()
+----+-----+---------+-----------+-------------------+
|col1| col2|     col3|col1_string|col2_doubletostring|
+----+-----+---------+-----------+-------------------+
| 100|  1.0|     row1|        100|                1.0|
|   1|10.12|col_float|          1|              10.12|
+----+-----+---------+-----------+-------------------+

【讨论】:

直接读取RDBMS表怎么样:select geography,project,reference_code,product_line,book_type,cc_region,cc_channel, cc_function,pl_market,ptd_balance,qtd_balance,ytd_balance,xx_last_update_tms, xx_last_update_log_id,xx_data_hash_code,xx_data_hash_id, ptd_balance ::character 变化为 'ptd_balance_text', qtd_balance::character 变化为 'qtd_balance_text', ytd_balance::character 变化为 'ytd_balance_text' from schema.tablename where period_year='2017' 正如@philantrovert 所说,甚至在将数据摄取到 Hive 之前, 数据在读入 spark 时已经被截断。 你可以在 JDBC 选择中做同样的事情。我建议先在数据库上运行查询,然后在 jdbc 数据框中使用它。 它正在工作。我刚刚测试了它的计数,我得到了与 RDBMS 表匹配的数字。 好的,如果您认为我的回答对您解决问题有帮助,请勾选答案并点赞。【参考方案2】:

您可以执行此操作,从所有 columns 创建 query,如下所示

import org.apache.spark.sql.types.StringType

//Input: 

scala> df.show
+----+-----+--------+--------+
|  id| name|  salary|   bonus|
+----+-----+--------+--------+
|1001|Alice| 8000.25|1233.385|
|1002|  Bob|7526.365| 1856.69|
+----+-----+--------+--------+


scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = false)
 |-- bonus: double (nullable = false)

//solution approach:
val query=df.columns.toList.map(cl=>if(cl=="salary" || cl=="bonus") col(cl).cast(StringType).as(cl+"_text") else col(cl))

//Output: 

scala> df.select(query:_*).printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- salary_text: string (nullable = false)
 |-- bonus_text: string (nullable = false)


scala> df.select(query:_*).show
+----+-----+-----------+----------+
|  id| name|salary_text|bonus_text|
+----+-----+-----------+----------+
|1001|Alice|    8000.25|  1233.385|
|1002|  Bob|   7526.365|   1856.69|
+----+-----+-----------+----------+

【讨论】:

以上是关于有没有办法从 Scala 中数据框的现有列创建多个列?的主要内容,如果未能解决你的问题,请参考以下文章

使用 scala 基于两个数据框的连接列创建新列

如何为scala中的空数据框现有列添加赋值?

在 spark scala 中对数据框的每一列进行排序

需要一种从现有数据框创建熊猫数据框的方法

从现有数据框的子集自动创建数据框

如何在scala spark中将数据框的特定列与另一个列连接[重复]