将 Spark DataFrame 架构转换为新架构
Posted
技术标签:
【中文标题】将 Spark DataFrame 架构转换为新架构【英文标题】:Convert Spark DataFrame schema to new schema 【发布时间】:2018-08-01 16:52:39 【问题描述】:我有多个从不同来源读取的 spark 作业,它们具有不同的架构,但它们非常接近,我想要做的是将它们全部写入同一个 Redshift 表,因此我需要统一所有 DataFrame架构,最好的方法是什么?
假设第一个输入数据的架构如下:
val schema1 = StructType(Seq(
StructField("date", DateType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("platform", StringType),
StructField("country", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType)
))
而 seconf inout 源的 Schema 是这样的:
val schema2 = StructType(Seq(
StructField("date", DateType),
StructField("creator_id", StringType),
StructField("creator_name", StringType),
StructField("platform", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType),
StructField("ecpm", DoubleType)
))
表架构(预期统一数据帧):
val finalSchema = StructType(Seq(
StructField("date", DateType),
StructField("account_name", StringType),
StructField("adset_id", StringType),
StructField("adset_name", StringType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("pub_id", StringType),
StructField("pub_name", StringType),
StructField("creative_id", StringType),
StructField("creative_name", StringType),
StructField("platform", StringType),
StructField("install_source", StringType),
StructField("views", IntegerType),
StructField("clicks", IntegerType),
StructField("installs", IntegerType),
StructField("cost", DoubleType)
))
正如您在最终架构中看到的,我有一些列可能不在输入架构中,因此它应该为空,一些列名也应该重命名。像ecpm
这样的一些列应该被删除。
【问题讨论】:
【参考方案1】:根据index
将index
columns
添加到dataframes
和join
中,这样就会有一对一的映射。之后select
只来自joined
dataframe
的您想要的columns
。
如果你有两个dataframes
,如下所示
// df1.show
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
| Bob| 29|
| Tom| 26|
+-----+---+
//df2.show
+--------+-------+
| city|country|
+--------+-------+
| Delhi| India|
|New York| USA|
| London| UK|
+--------+-------+
现在添加index
columns
并获得一对一映射
import org.apache.spark.sql.functions._
val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
//joinedDf
+-----+---+------+--------+-------+------+
| name|age|index1| city|country|index2|
+-----+---+------+--------+-------+------+
|Alice| 25| 0| Delhi| India| 0|
| Bob| 29| 1|New York| USA| 1|
| Tom| 26| 2| London| UK| 2|
+-----+---+------+--------+-------+------+
现在您可以编写如下查询
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show
//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25| India|
| Bob| 29| USA|
| Tom| 26| UK|
+-----+---+-------+
【讨论】:
你怎么知道它们是相关的? @thebluephantom 他没有提供数据,但他需要实现连接以组合来自两个数据帧的数据。我做了一对一的映射。他还可以在日期等常用列上申请加入。 它们是独立的工作我不能加入DataFrames,每个工作都有自己的架构但应该将统一的版本写入数据库【参考方案2】:不确定是否有实现此目的的全自动方法。如果您的架构是固定的并且不是特别复杂,您可以手动调整架构并union
结果。
为了论证,假设您想包含来自frame1
的列col1
和col2
,并包含col2
和col4
的frame2
。
import org.apache.spark.sql.functions._
val subset1 = frame1.select($"col1", $"col2", lit(null).as("col4"))
val subset2 = frame2.select(lit(null).as("col1"), $"col2", $"col4")
val result = subset1 union subset2
做到了这一点。我们手动指定每一列,因此我们可以跳过我们喜欢的任何列。
【讨论】:
你的方法是我现在正在做的,但我希望找到更好的方法。以上是关于将 Spark DataFrame 架构转换为新架构的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Pandas DataFrame 中字典的字符串表示形式转换为新列?
将 CSV 数据加载到 Dataframe 并使用 Apache Spark (Java) 转换为 Array
如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?