连接 Apache Spark DataFrame 中的列

Posted

技术标签:

【中文标题】连接 Apache Spark DataFrame 中的列【英文标题】:Concatenate columns in Apache Spark DataFrame 【发布时间】:2015-07-16 09:49:07 【问题描述】:

我们如何连接 Apache Spark DataFrame 中的两列? Spark SQL中有什么函数可以使用吗?

【问题讨论】:

【参考方案1】:

对于原始 SQL,您可以使用 CONCAT:

在 Python 中

df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")

在 Scala 中

import sqlContext.implicits._

val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")

从 Spark 1.5.0 开始,您可以将 concat 函数与 DataFrame API 一起使用:

在 Python 中:

from pyspark.sql.functions import concat, col, lit

df.select(concat(col("k"), lit(" "), col("v")))

在 Scala 中:

import org.apache.spark.sql.functions.concat, lit

df.select(concat($"k", lit(" "), $"v"))

还有concat_ws 函数,它以字符串分隔符作为第一个参数。

【讨论】:

【参考方案2】:

这是自定义命名的方法

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

给,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

通过连接创建新列:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

【讨论】:

lit 创建一列_【参考方案3】:

在 Spark Scala 中连接字符串列的一个选项是使用 concat

有必要检查空值。因为如果其中一列为空,即使其他一列确实有信息,结果也会为空。

使用concatwithColumn

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

使用concatselect

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

使用这两种方法,您将拥有一个 NEW_COLUMN,其值是列的串联:来自原始 df 的 COL1 和 COL2。

【讨论】:

我在 pyspark 中尝试过你的方法,但没有成功,警告“col 应该是 Column”。 @Samson 抱歉,我只检查了 Scala API @IgnacioAlorre 如果您使用concat_ws 而不是concat,则可以避免检查NULL。【参考方案4】:

concat(*cols)

v1.5 及更高版本

将多个输入列连接到一个列中。该函数适用于字符串、二进制和兼容的数组列。

例如:new_df = df.select(concat(df.a, df.b, df.c))


concat_ws(sep, *cols)

v1.5 及更高版本

类似于concat,但使用指定的分隔符。

例如:new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat(*cols)

v2.4 及更高版本

用于连接地图,返回所有给定地图的并集。

例如:new_df = df.select(map_concat("map1", "map2"))


使用concat 运算符(||):

v2.3 及更高版本

例如:df = spark.sql("select col_a || col_b || col_c as abc from table_x")

参考:Spark sql doc

【讨论】:

【参考方案5】:

如果你想使用 DF,你可以使用 udf 在现有列的基础上添加一个新列。

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) =>  first + " " + second  )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()

【讨论】:

【参考方案6】:

从 Spark 2.3(SPARK-22771) Spark SQL 支持连接运算符||

例如;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")

【讨论】:

【参考方案7】:

这是 pyspark 的另一种方法:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+

【讨论】:

【参考方案8】:

当您不知道 Dataframe 中列的数量或名称时,这里有一个建议。

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

【讨论】:

【参考方案9】:

我们有没有对应下面流程的java语法

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

【讨论】:

【参考方案10】:

在 Spark 2.3.0 中,您可以这样做:

spark.sql( """ select '1' || column_a from table_a """)

【讨论】:

【参考方案11】:

在 Java 中,您可以这样做来连接多个列。示例代码旨在为您提供一个场景以及如何使用它以便更好地理解。

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton 
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) 
        if (instance == null) 
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        
        return instance;
    

上面的代码将 col1,col2,col3 用“_”分隔来创建一个名为“concatenatedCol”的列。

【讨论】:

【参考方案12】:

就我而言,我想要一个 Pipe-'I' 分隔的行。

from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()

这就像黄油上的热刀一样效果很好。

【讨论】:

【参考方案13】:

在 pySpark 中使用 sqlContext 的另一种方法...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))

【讨论】:

【参考方案14】:

确实,有一些漂亮的内置抽象供您完成连接,而无需实现自定义函数。既然你提到了 Spark SQL,所以我猜你正试图通过 spark.sql() 将它作为声明性命令传递。如果是这样,您可以通过 SQL 命令直接完成,例如: SELECT CONCAT(col1, '&lt;delimiter&gt;', col2, ...) AS concat_column_name FROM &lt;table_name&gt;;

此外,从 Spark 2.3.0 开始,您可以在以下行中使用命令: SELECT col1 || col2 AS concat_column_name FROM &lt;table_name&gt;;

其中,是您首选的分隔符(也可以是空格),是您尝试从中读取的临时表或永久表。

【讨论】:

【参考方案15】:

我们也可以简单地使用SelectExpr

df1.selectExpr("*","upper(_2||_3) as new")

【讨论】:

【参考方案16】:

像这样使用 concat 方法:

Dataset<Row> DF2 = DF1
            .withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")

【讨论】:

【参考方案17】:
val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

注意:要使此代码正常工作,您需要将括号“()”放在“isNotNull”函数中。 -> 正确的是“isNotNull()”。

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))

【讨论】:

以上是关于连接 Apache Spark DataFrame 中的列的主要内容,如果未能解决你的问题,请参考以下文章

连接 Apache Spark DataFrame 中的列

两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败

使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe

如何使延迟加载 Apache Spark Dataframe 连接到 REST API

Scala Dataframe 仅连接并获取正确的表列

java的怎么操作spark的dataframe