具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO

Posted

技术标签:

【中文标题】具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO【英文标题】:Spark merge dataframe with mismatching schemas without extra disk IO 【发布时间】:2016-10-05 08:35:47 【问题描述】:

我想将 2 个数据框与(可能)不匹配的架构合并

org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> A.unionAll(B)

会导致:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;

我想在 Spark 中执行此操作。 但是,Spark 文档只建议将整个 2 个数据帧写入一个目录,然后使用 spark.read.option("mergeSchema", "true") 将它们读回。

link to docs

所以工会并没有帮助我,文档也没有。如果可能的话,我希望将这些额外的 I/O 排除在我的工作之外。我是否遗漏了一些无证信息,还是(目前)不可能?

【问题讨论】:

已经有几年了,但我想补充一点:不要递归地这样做,否则你的 DAG 会成倍增长,我还没有找到一种方法来告诉 spark 放弃这部分达格。如果您对多次迭代/DF 执行此操作,您将耗尽内存,并且 I/O 操作会更有效率。 【参考方案1】:

您可以将空列附加到框架 B 和联合 2 个框架之后:

import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields) 
   C = A.withColumn(field.name, expr("null")); 
 
A.unionAll(C)

【讨论】:

所以基本上我需要提取两个模式的比较它们并将额外的字段添加到 df 缺少任何内容作为空值? 它为了合并你的模式必须匹配的数据框。所以你需要引导一个单一的方案。在您的情况下,您需要丢弃 A 帧中的高度字段还是将其添加到 B 帧。它会比保存和加载到磁盘更快。 val missingFields = A.schema.toSet.diff(B.schema.toSet); var C: DataFrame = null; for (field &lt;- missingFields) C = A.withColumn(field.name, expr("null")); A.unionAll(C) @prudenko 我们如何在 pyspark 中实现同样的目标 您将如何使用嵌套的 StructType 字段来做到这一点? spark需要做的就是暴露他们的mergeSchema机制【参考方案2】:

parquet 模式合并默认禁用,通过以下方式开启此选项:

(1) set global option: spark.sql.parquet.mergeSchema=true

(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")

【讨论】:

只对镶木地板有效吗?我尝试使用 json 它错误我们说它无法推断架构。 虽然这是真的,但我已经在我的问题中说这不是我想要的。【参考方案3】:

这是一个 pyspark 解决方案。

假设如果由于一个数据框缺少另一列中包含的列而无法进行合并,那么正确的做法是使用空值添加缺少的列。

另一方面,如果由于两个数据框共享具有冲突类型或可空性的列而无法进行合并,那么正确的做法是引发 TypeError(因为这是您可能想知道的冲突) .

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = f.name: f.dataType for f in df_left.schema
    right_types = f.name: f.dataType for f in df_right.schema
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = right_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))       
    return df_left.union(df_right)

【讨论】:

我收到此错误 left_fields = set((f.name, f.dataType, f.nullable) for f in df.schema) Traceback(最近一次调用最后一次):文件“ ",第 1 行,在 TypeError: 'StructType' object is not iterable【参考方案4】:

感谢@conradlee!我修改了您的解决方案以通过添加强制转换和删除可空性检查来允许联合。它对我有用。

def harmonize_schemas_and_combine(df_left, df_right):
    '''
    df_left is the main df; we try to append the new df_right to it. 
    Need to do three things here: 
    1. Set other claim/clinical features to NULL
    2. Align schemas (data types)
    3. Align column orders
    '''
    left_types = f.name: f.dataType for f in df_left.schema
    right_types = f.name: f.dataType for f in df_right.schema
    left_fields = set((f.name, f.dataType) for f in df_left.schema)
    right_fields = set((f.name, f.dataType) for f in df_right.schema)
#     import pdb; pdb.set_trace() #pdb debugger

    # I. First go over left-unique fields: 
    # For columns in the main df, but not in the new df: add it as Null
    # For columns in both df but w/ different datatypes, use casting to keep them consistent w/ main df (Left)
    for l_name, l_type in left_fields.difference(right_fields): #1. find what Left has, Right doesn't
        if l_name in right_types: #2A. if column is in both, then something's off w/ the schema 
            r_type = right_types[l_name] #3. tell me what's this column's type in Right
            df_right = df_right.withColumn(l_name,df_right[l_name].cast(l_type)) #4. keep them consistent w/ main df (Left)
            print("Casting magic happened on column %s: Left type: %s, Right type: %s. Both are now: %s." % (l_name, l_type, r_type, l_type))
        else: #2B. if Left column is not in Right, add a NULL column to Right df
            df_right = df_right.withColumn(l_name, F.lit(None).cast(l_type))

    # Make sure Right columns are in the same order of Left
    df_right = df_right.select(df_left.columns)

    return df_left.union(df_right)

【讨论】:

【参考方案5】:

这是另一个解决方案。我使用 rdd union 是因为 dataFrame union 操作不支持多个 dataFrame。 注意 - 这不应该用于合并大量具有不同模式的数据帧。将空列添加到数据帧的成本将很快导致内存不足错误。 (即:尝试合并 1000 个缺少 10 列的数据帧将导致 10,000 次转换) 如果您的用例是从具有不同模式的存储中读取数据帧,该模式由具有不同模式的多个路径组成,那么更好的选择是首先将您的数据保存为镶木地板,然后使用“mergeSchema”选项读取数据帧。

def unionDataFramesAndMergeSchema(spark, dfsList):
'''
This function can perform a union between x dataFrames with different schemas.
Non-existing columns will be filled with null.
Note: If a column exist in 2 dataFrames with different types, an exception will be thrown.
:example:
>>> df1 = spark.createDataFrame([
>>>    
>>>        'A': 1,
>>>        'B': 1,
>>>        'C': 1
>>>    ])
>>> df2 = spark.createDataFrame([
>>>    
>>>        'A': 2,
>>>        'C': 2,
>>>        'DNew' : 2
>>>    ])
>>> unionDataFramesAndMergeSchema(spark,[df1,df2]).show()
>>> +---+----+---+----+
>>> |  A|   B|  C|DNew|
>>> +---+----+---+----+
>>> |  2|null|  2|   2|
>>> |  1|   1|  1|null|
>>> +---+----+---+----+
:param spark: The Spark session.
:param dfsList: A list of dataFrames.
:return: A union of all dataFrames, with schema merged.
'''
if len(dfsList) == 0:
    raise ValueError("DataFrame list is empty.")
if len(dfsList) == 1:
    logging.info("The list contains only one dataFrame, no need to perform union.")
    return dfsList[0]

logging.info("Will perform union between 0 dataFrames...".format(len(dfsList)))

columnNamesAndTypes = 
logging.info("Calculating unified column names and types...")
for df in dfsList:
    for columnName, columnType in dict(df.dtypes).iteritems():
        if columnNamesAndTypes.has_key(columnName) and columnNamesAndTypes[columnName] != columnType:
            raise ValueError(
                "column '0' exist in at least 2 dataFrames with different types ('1' and '2'"
                    .format(columnName, columnType, columnNamesAndTypes[columnName]))
        columnNamesAndTypes[columnName] = columnType
logging.info("Unified column names and types: 0".format(columnNamesAndTypes))

logging.info("Adding null columns in dataFrames if needed...")
newDfsList = []
for df in dfsList:
    newDf = df
    dfTypes = dict(df.dtypes)
    for columnName, columnType in dict(columnNamesAndTypes).iteritems():
        if not dfTypes.has_key(columnName):
            # logging.info("Adding null column for '0'.".format(columnName))
            newDf = newDf.withColumn(columnName, func.lit(None).cast(columnType))
    newDfsList.append(newDf)

dfsWithOrderedColumnsList = [df.select(columnNamesAndTypes.keys()) for df in newDfsList]
logging.info("Performing a flat union between all dataFrames (as rdds)...")
allRdds = spark.sparkContext.union([df.rdd for df in dfsWithOrderedColumnsList])
return allRdds.toDF()

【讨论】:

【参考方案6】:

如果您从存储文件中读取两个数据帧,您可以只使用预定义的架构:

val schemaForRead = 
StructType(List(
    StructField("userId", LongType,true), 
    StructField("dtEvent", LongType,true), 
    StructField("goodsId", LongType,true)
))

val dfA = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file1.parquet")      
val dfB = spark.read.format("parquet").schema(schemaForRead).load("/tmp/file2.parquet")

val dfC = dfA.union(dfB)

请注意,文件file1file2 中的架构可以不同,也可以不同于schemaForRead。如果file1 不包含来自schemaForRead 数据帧的字段A 将有带有null 的空字段。如果文件包含 schemaForRead 数据框中未显示的附加字段,则不会包含它。

【讨论】:

【参考方案7】:

这是 Scala 中的版本也在此处回答 - (Spark - Merge / Union DataFrame with Different Schema (column names and sequence) to a DataFrame with Master common schema)-

需要合并数据框列表..提供所有数据框中相同命名的列应该具有相同的数据类型..

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = 

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = 
      allCols.toList.map(x => x match 
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      )
    

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  

这是它的示例测试 -


    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
    unionPro(List(aDF, bDF), spark).show

输出为 -

+----+----+----+-----+
|Name|  ID| Sal|Deptt|
+----+----+----+-----+
|   A|   1|null| null|
|   B|   2|null| null|
|   C|null|   1|   D1|
|   D|null|   2|   D2|
+----+----+----+-----+

【讨论】:

【参考方案8】:

如果您使用的是 spark 版本 > 2.3.0,那么您可以使用 unionByName 内置函数来获得所需的输出。

链接到包含 unionByName 代码的 Git 存储库: https://github.com/apache/spark/blame/cee4ecbb16917fa85f02c635925e2687400aa56b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1894

【讨论】:

以上是关于具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO的主要内容,如果未能解决你的问题,请参考以下文章

合并具有索引的数据帧,其中一个包含另一个(但不相同)

当函数在具有自动检测模式的 spark 数据帧中不起作用时

火花连接数据帧和合并模式

spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行

在内存中缓存 Spark 数据帧是不是有额外的开销?

我们如何在 Spark 中使用 Dataframes(由 structtype 方法创建)合并具有不同列数的 2 个表?