在 Pyspark/Hive 中处理不断变化的数据类型

Posted

技术标签:

【中文标题】在 Pyspark/Hive 中处理不断变化的数据类型【英文标题】:Handling changing datatypes in Pyspark/Hive 【发布时间】:2020-10-10 03:37:10 【问题描述】:

我在解析 pyspark 中不一致的数据类型时遇到问题。如下面的示例文件所示,SA 键始终包含一个字典,但有时它可以显示为字符串值。当我尝试获取 SA.SM.Name 列时,出现如下所示的异常。

如何将 pyspark/hive 中的 SA.SM.Name 列设置为空值具有 JSON 以外的值。有人可以帮帮我吗?

我尝试转换为不同的数据类型,但没有任何效果,或者我可能做错了什么。

输入文件内容:mypath

"id":1,"SA":"SM": "Name":"John","Email":"John@example.com"

"id":2,"SA":"SM": "Name":"Jerry","Email":"Jerry@example.com"

"id":3,"SA":"STRINGVALUE"

df=spark.read.json(my_path)
df.registerTempTable("T")
spark.sql("""select id,SA.SM.Name from T """).show()

Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件“/usr/lib/spark/python/pyspark/sql/session.py”,行 767,在sql中 返回 DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) 文件 "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 第 1257 行,在 call 文件中 “/usr/lib/spark/python/pyspark/sql/utils.py”,第 69 行,在 deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "Can't extract value from SA#6.SM:需要结构类型但得到字符串;第 1 行 pos 10"

【问题讨论】:

你能在这个问题上发布你的 df 的 shcema 吗? print(df.schema),如果你的 SA 是 STRING TYPE,那么你有写函数来转换为 MAP TYPE >>> df.schema StructType(List(StructField(SA,StructType(List(StructField(SM,StringType,true))),true),StructField(id,LongType,true))) @e.zy : 你能帮忙看看如何转换成 MAP 类型吗? 【参考方案1】:

这在使用数据帧时是不可能的,因为当 spark 加载它时,列 SA 被读取为字符串。但是您可以使用 sparkContext 作为 rdd 加载文件/表,然后使用更清洁的函数将空 dict 值映射到 SA。这里我将文件加载为textFile,但如果它是hadoopfile,则进行必要的实现。

def cleaner(record):
    output = ""
    print(type(record))
    try:
        output = json.loads(record)
    except Exception as e:
        print("exception happened")
    finally:
        if isinstance(output.get("SA"), str ):
            print("This is string")
            output["SA"] = 
    return output

dfx = spark.sparkContext.textFile("file://"+my_path)

dfx2 = dfx.map(cleaner)

new_df = spark.createDataFrame(dfx2)
new_df.show(truncate=False)
+---------------------------------------------------+---+
|SA                                                 |id |
+---------------------------------------------------+---+
|[SM -> [Email -> John@example.com, Name -> John]]  |1  |
|[SM -> [Email -> Jerry@example.com, Name -> Jerry]]|2  |
|[]                                                 |3  |
+---------------------------------------------------+---+

new_df.printSchema()
root
 |-- SA: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- id: long (nullable = true)


注意:如果 name 的输出值必须写入同一个表/列,此解决方案可能不起作用,如果您尝试将加载的数据帧写回同一个表,则会导致 SA 列break,您将根据 qn 的 cmets 中提供的架构获得姓名和电子邮件列表。

【讨论】:

非常感谢@HArdRese7。过滤记录解决我的问题至少忽略不是字符串类型的记录。

以上是关于在 Pyspark/Hive 中处理不断变化的数据类型的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark/Hive 中带条件的加权运行总计

PySpark/Hive:如何使用 LazySimpleSerDe 创建表以转换布尔值“t”/“f”?

PySpark/HIVE:附加到现有表

为啥 df.limit 在 Pyspark 中不断变化?

能适应不断变化的环境的"液体"机器学习系统

计算不断变化的数据的均值