将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误

Posted

技术标签:

【中文标题】将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误【英文标题】:Duplicate column in json file throw error when creating PySpark dataframe Databricks after upgrading runtime 7.3LTS(Spark3.0.1) to 9.1LTS(Spark3.1.2) 【发布时间】:2021-12-27 07:17:43 【问题描述】:

问题陈述:在升级 Databricks 运行时版本时,重复列在创建数据框时抛出错误。在较低的运行时间中,创建了数据框,并且由于下游不需要重复列,因此它被简单地排除在选择中。

文件位置:存储在 ADLS Gen2 (Azure) 上的 Json 文件。 集群模式:标准

代码: 我们在 Azure Databricks 中阅读如下。

intermediate_df = spark.read.option("multiline","true").json(f"path/IN-109418_Part_1.json")

json 文件是嵌套的,其中之一出现在 tags 下,这是重复的列(下图)。 读入数据框后,我们选择所需的列。无论如何,我们需要这个重复的tags

之前我们在 Databricks 运行时 7.3LTS(Spark3.0.1) 上运行,它创建了包含重复列的数据框,但由于我们没有进一步使用它,所以它没有受到伤害。

但是,我们现在正在升级到运行时 9.1LTS(Spark3.1.2),它会在创建数据框本身时引发列重复的错误。 错误信息:Found duplicate column(s) in the data schema: `tags`

图片重复栏:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)

结论: 我一读取数据框就尝试选择列,但没有成功。 我有一种预感,因为现在升级后的 Databricks 运行时版本默认情况下更倾向于 Delta 表(增量表不支持其中的重复列),可能有一个属性我们必须关闭才能忽略此检查整个笔记本或只是在读入数据框时。

虽然这个确切的错误发生在 json 上,但我相信如果其他文件格式(如 csv)有重复的列,它可能会发生。

该文件非常嵌套,并且为所有必需的列定义架构不是很实用,因为它很繁琐并且容易出错,以防将来需要更多列(这将是次要解决方案)。文件由供应商使用自动化流程生成,预计所有文件将保持与已交付历史文件相同的格式。

运行时 9.1LTS(Spark3.1.2) 上的完全错误:

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"path/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

编辑:预先定义模式的评论。

【问题讨论】:

更新我如何解决这个问题。 我在答案部分的解决方法。 【参考方案1】:

请使用 json.load 将 json 转换为字典并处理重复键

import json

#test json
test_json = """[
   "id": 1,
   "tags": "test1",
   "tags": "test1",
  "id": 2,
   "tags": "test2",
   "tags": "test2",
   "tags": "test3"]
"""

#function to handle duplicate keys:
def value_resolver(pairs):
    d = 
    i=1
    for k, v in pairs:
        if k in d:
           d[k + str(i)] = v
           i+=1
        else:
           d[k] = v
    return d

#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> ['id': 1, 'tags': 'test1', 'tags1': 'test1', 'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3']

#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()


+---+-----+-----+-----+
| id| tags|tags1|tags2|
+---+-----+-----+-----+
|  1|test1|test1| null|
|  2|test2|test2|test3|
+---+-----+-----+-----+

【讨论】:

【参考方案2】:

有不同的好建议,可能会有所帮助。

正如@ScootCork 所指出的,预先定义模式会有所帮助,因为 Spark 不必自己创建模式。但是,我的文件非常庞大且嵌套严重,因为手动定义架构会很麻烦。

最后我确实使用了架构,但找到了一种解决方法,因此我不必手动创建它。 即使有重复的列,我也能够在 7.3 LTS 运行时 中创建数据框,如原始问题中所述。因此,我在此运行时读取了一个文件并将其写入 ADLS Gen2(您可以将其存储在任何地方)。这是一次性活动,现在您可以在每次运行代码时读回此文件(多行在读回时不需要为真),使用.schema 获取其架构,并使用此架构来读取新的json 文件。由于 spark 不必自行推断模式,因此不会为重复列抛出错误。请注意,重复的列仍然存在,如果您尝试使用它,您将收到ambiguous 错误。但是,如果由于剪切大小和复杂的 json 结构而手动定义模式不是很实用,并且如果重复的列没有用,则此方法非常有用。描述如下:-

7.3 LTS 运行时的一次性活动

# Few columns were coming as duplicate in raw file. e.g.: languages[0].groupingsets[0].element.attributes.tags[0] was repeated twice.
# This caused errror while creating dataframe.
# However, we are able to read it in Databricks Runtime 7.3 LTS. Hence used this runtime to read a file and write it to ADLS as ONE-TIME activity.
# For all further runs, this file can be read using multiline as false, then use its schema while reading the other new files (which in this case needs multiline as true). In this way spark does not have to create schema on its own hence does not throw error eben in higher runtime versions.
# Have used a historical file initially delivered which had a lot of records due to historical data. This ensures we cover all possibilities.
# Can be created again using 7.3 LTS runtime cluster if this schema is deleted. 

dfOldRuntime = spark.read.option("multiline","true").json(pathOneFile) # Can take any file to creat sample schema.
dfOldRuntime.coalesce(1).write.mode('overwrite').format('json').save(pathSchema)

现在将这个写入的文件用于所有未来的运行,即使是在更高的运行时。

# Read sample which was created using 7.3 LTS runtime.
# The multiline does NOT have to be true for this.
# Get its schema and use it to read new files even on higher runtime without error which was caused due to duplicate columns.
dfSchema = spark.read.json(pathSchema)
schema = dfSchema.schema

# Read new json files using this schema by using `.schema()`. Works on higher runtimes as well since spark now does not have to create schema on its own.
intermediate_df = spark.read.option("multiline","true").schema(schema).json(f"json_path")

【讨论】:

【参考方案3】:

spark documentation 中目前没有此选项。对于具有重复键值的 json 的有效性以及如何处理它们,似乎也存在不同的意见/标准 (SO discussion)。

提供不带重复键字段的架构会导致加载成功。它采用 json 中最后一个键的值。

架构取决于您的源文件。

test.json


    "id": 1,
    "tags": "test1",
    "tags": "test2"

蟒蛇

from pyspark.sql.types import *

schema = StructType([
    StructField('id', LongType(), True),
    StructField('tags', StringType(), True)
])

df = spark.read.schema(schema).json("test.json", multiLine=True)

df.show()

+---+-----+
| id| tags|
+---+-----+
|  1|test2|
+---+-----+

在 pyspark 3.1.1 上本地运行

【讨论】:

以上是关于将运行时 7.3LTS(Spark3.0.1) 升级到 9.1LTS(Spark3.1.2) 后创建 PySpark 数据帧 Databricks 时,json 文件中的重复列会引发错误的主要内容,如果未能解决你的问题,请参考以下文章

唯品会SPARK3.0升级之路

How to install Zabbix4.0 LTS version with Yum on the Oracle Linux 7.3 system?

升级集群的 Databricks Runtime 后调试 PySpark 时出错

Ubuntu 12.04 LTS:在不破坏依赖关系的情况下将 python 2.7.3 更新到 2.7.6 [关闭]

在 ubuntu 16.04 LTS 上运行 mongodb

编辑 XIB 时 XCode 7.3 崩溃