基于列索引的 Spark Dataframe 选择

Posted

技术标签:

【中文标题】基于列索引的 Spark Dataframe 选择【英文标题】:Spark Dataframe select based on column index 【发布时间】:2017-04-22 00:14:28 【问题描述】:

如何在 Scala 中选择具有特定索引的数据框的所有列?

例如,如果一个数据框有 100 列,而我只想提取列 (10,12,13,14,15),该怎么做?

下面从数据框 df 中选择所有列,其中列名在 Array colNames 中提到:

df = df.select(colNames.head,colNames.tail: _*)

如果有类似的,colNos数组里面有

colNos = Array(10,20,25,45)

如何转换上述df.select 以仅获取特定索引处的那些列。

【问题讨论】:

【参考方案1】:

你可以通过mapcolumns:

import org.apache.spark.sql.functions.col

df.select(colNos map df.columns map col: _*)

或:

df.select(colNos map (df.columns andThen col): _*)

或:

df.select(colNos map (col _ compose df.columns): _*)

上面显示的所有方法都是等效的,不会造成性能损失。以下映射:

colNos map df.columns 

只是一个本地 Array 访问 (constant time access for each index) 并在基于 StringColumnselect 变体之间进行选择不会影响执行计划:

val df = Seq((1, 2, 3 ,4, 5, 6)).toDF

val colNos = Seq(0, 3, 5)

df.select(colNos map df.columns map col: _*).explain
== Physical Plan ==
LocalTableScan [_1#46, _4#49, _6#51]
df.select("_1", "_4", "_6").explain
== Physical Plan ==
LocalTableScan [_1#46, _4#49, _6#51]

【讨论】:

【参考方案2】:

@user6910411 上面的回答就像一个魅力,任务/逻辑计划的数量类似于我下面的方法。 但是我的方法要快一些。 所以, 我建议您使用column names 而不是column numbersColumn names 比使用 numbers安全得多轻得多。您可以使用以下解决方案:

val colNames = Seq("col1", "col2" ...... "col99", "col100")

val selectColNames = Seq("col1", "col3", .... selected column names ... )

val selectCols = selectColNames.map(name => df.col(name))

df = df.select(selectCols:_*)

如果你对写完所有 100 个列名犹豫不决,那么也有一个快捷方法

val colNames = df.schema.fieldNames

【讨论】:

【参考方案3】:

示例:使用 Scala 按索引获取 Spark Dataframe 的前 14 列。

import org.apache.spark.sql.functions.col

// Gives array of names by index (first 14 cols for example)
val sliceCols = df.columns.slice(0, 14)
// Maps names & selects columns in dataframe
val subset_df = df.select(sliceCols.map(name=>col(name)):_*)

不能简单地这样做(我尝试过但失败了):

// Gives array of names by index (first 14 cols for example)
val sliceCols = df.columns.slice(0, 14)
// Maps names & selects columns in dataframe
val subset_df = df.select(sliceCols)

原因是您必须将 Array[String] 的数据类型转换为 Array[org.apache.spark.sql.Column] 才能使切片起作用。

使用 Currying 将其包装在一个函数中(为此我的同事高五):

// Subsets Dataframe to using beg_val & end_val index.
def subset_frame(beg_val:Int=0, end_val:Int)(df: DataFrame): DataFrame = 
  val sliceCols = df.columns.slice(beg_val, end_val)
  return df.select(sliceCols.map(name => col(name)):_*)


// Get first 25 columns as subsetted dataframe
val subset_df:DataFrame = df_.transform(subset_frame(0, 25))

【讨论】:

以上是关于基于列索引的 Spark Dataframe 选择的主要内容,如果未能解决你的问题,请参考以下文章

将数组列表作为列附加到具有相同列索引的熊猫数据框中

从结合两个多索引dfs和列索引的元组列表构建dict

pandas读取csv数据index_col参数指定作为行索引的数据列索引列表形成复合(多层)行索引使用iloc基于行索引位置筛选dataframe的指定单个数据行

pandas读取csv数据index_col参数指定作为行索引的数据列索引列表形成复合(多层)行索引使用iloc基于行索引位置筛选dataframe的指定单个数据行

pandas读取csv数据index_col参数指定作为行索引的数据列索引列表形成复合(多层)行索引使用iloc基于行索引位置和列索引位置筛选dataframe数据指定数据格内容

pandas使用read_csv函数读取csv数据header参数指定作为列索引的行索引列表形成复合(多层)列索引使用方括号[]基于最外层列索引名称索引列数据