Spark- How to concatenate DataFrame columns

Posted xiagnming

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark- How to concatenate DataFrame columns相关的知识,希望对你有一定的参考价值。

使用concat()concat_ws()SQL函数,可以将一个或多个列连接到Spark DataFrame上的单个列中。在文本中,将学习如何使用这些函数,还可以使用原始SQL通过Scala示例来连接列。

Preparing Data & DataFrame

  val data = Seq(("James","A","Smith","2018","M",3000),
    ("Michael","Rose","Jones","2010","M",4000),
    ("Robert","K","Williams","2010","M",4000),
    ("Maria","Anne","Jones","2005","F",4000),
    ("Jen","Mary","Brown","2010","",-1)
  )

  val columns = Seq("fname","mname","lname","dob_year","gender","salary")
  import spark.sqlContext.implicits._
  val df = data.toDF(columns:_*)
  df.show(false)

注意,我们需要导入spark对象上的implicits,它是SparkSession的一个实例,以便在Seq集合上使用toDF(),并在输出下面使用df.show()

+-------+-----+--------+--------+------+------+
|fname  |mname|lname   |dob_year|gender|salary|
+-------+-----+--------+--------+------+------+
|James  |A    |Smith   |2018    |M     |3000  |
|Michael|Rose |Jones   |2010    |M     |4000  |
|Robert |K    |Williams|2010    |M     |4000  |
|Maria  |Anne |Jones   |2005    |F     |4000  |
|Jen    |Mary |Brown   |2010    |      |-1    |
+-------+-----+--------+--------+------+------+

Using Concat() function to concatenate DataFrame columns

spark sql提供了concat()函数来连接二个或多个DataFrame的列,使其变为一列。

语法

concat(exprs: Columns*): Column

它还可以获取不同整数类型的列,并将它们连接到单个列中。例如,它支持String,Int,Boolean和数据。

df.select(concat(col("fname"), lit(","), col("mname"), lit(","), col("lname")).as("FullName"))

该语句通过串联以分隔符逗号分割的fname,mname,lname列来创建"FullName"列。要添加定界符,我们使用了lit()函数。这样产生的输出只有一个串联的列。

+------------------+
|FullName          |
+------------------+
|James,A,Smith     |
|Michael,Rose,Jones|
|Robert,K,Williams |
|Maria,Anne,Jones  |
|Jen,Mary,Brown    |
+------------------+

在withColumn中使用Concat()函数

让我们来看另一个在withColumn()上使用concat()函数的例子,这里我们将通过连接列名添加一个新的列FullName。

df.withColumn("FullName", concat(col("fname"), lit(","), col("mname"), lit(','),col("lname"))).show(false)

上面的代码段还保留了各个名称,如果不需要,可以使用下面的语句删除它们。

  df.withColumn("FullName",concat(col("fname"),lit(','),
    col("mname"),lit(','),col("lname")))
    .drop("fname")
    .drop("mname")
    .drop("lname")
    .show(false)

输出如下。

+--------+------+------+------------------+
|dob_year|gender|salary|FullName          |
+--------+------+------+------------------+
|2018    |M     |3000  |James,A,Smith     |
|2010    |M     |4000  |Michael,Rose,Jones|
|2010    |M     |4000  |Robert,K,Williams |
|2005    |F     |4000  |Maria,Anne,Jones  |
|2010    |      |-1    |Jen,Mary,Brown    |
+--------+------+------+------------------+

concat_ws()函数使用分隔符连接

concat_ws()函数可以轻松地在连接DataFrame列时添加分隔符。

语法

concat_ws(sep: String, exprs: Columns*): Colums

concat_ws()函数取第一个参数作为分隔符,来分隔需要连接的列。

  df.withColumn("FullName",concat_ws(",",col("fname"),col("mname"),col("lname")))
    .drop("fname")
    .drop("mname")
    .drop("lname")
      .show(false)

输出结果如下。

+--------+------+------+------------------+
|dob_year|gender|salary|FullName          |
+--------+------+------+------------------+
|2018    |M     |3000  |James,A,Smith     |
|2010    |M     |4000  |Michael,Rose,Jones|
|2010    |M     |4000  |Robert,K,Williams |
|2005    |F     |4000  |Maria,Anne,Jones  |
|2010    |      |-1    |Jen,Mary,Brown    |
+--------+------+------+------------------+

使用原生SQL

如果你有SQL背景,spark SQL还提供了一种使用Raw SQL语法进行连接的方法。但是,为了使用此功能,需要使用df.createOrReplaceTempView("EMP")创建一个临时视图。该操作将创建一个临时表"EMP"。

df.createOrReplaceTempView("EMP")
spark.sql("select CONCAT(fname, ' ', lanme, ' ', mname) as FullName from EMP").show(false)

我们同样可以在原始SQL语句使用concat()函数

+------------------+
|FullName          |
+------------------+
|James Smith A     |
|Michael Jones Rose|
|Robert Williams K |
|Maria Jones Anne  |
|Jen Brown Mary    |
+------------------+

到目前为止,我们已经使用了concat()函数,现在让我们看看另一种使用操作符||连接字符串的方法。

  spark.sql("select fname ||' '|| lname ||' '|| mname as FullName from EMP")
    .show(false)

这将产生与上述语句相同的输出。

以上是关于Spark- How to concatenate DataFrame columns的主要内容,如果未能解决你的问题,请参考以下文章

How to convert matrix to RDD[Vector] in spark

How to install Hadoop 2.7.3 cluster on CentOS 7.3

There Are Now 3 Apache Spark APIs. Here’s How to Choose the Right One

spark 怎么去连接 ElasticSearch

TypeError: can only concatenate str (not “int“) to str

python产生错误:can only concatenate str (not "int") to str