在 Pyspark/Hive 中处理不断变化的数据类型
Posted
技术标签:
【中文标题】在 Pyspark/Hive 中处理不断变化的数据类型【英文标题】:Handling changing datatypes in Pyspark/Hive 【发布时间】:2020-10-10 03:37:10 【问题描述】:我在解析 pyspark 中不一致的数据类型时遇到问题。如下面的示例文件所示,SA 键始终包含一个字典,但有时它可以显示为字符串值。当我尝试获取 SA.SM.Name 列时,出现如下所示的异常。
如何将 pyspark/hive 中的 SA.SM.Name 列设置为空值具有 JSON 以外的值。有人可以帮帮我吗?
我尝试转换为不同的数据类型,但没有任何效果,或者我可能做错了什么。
输入文件内容:mypath
"id":1,"SA":"SM": "Name":"John","Email":"John@example.com"
"id":2,"SA":"SM": "Name":"Jerry","Email":"Jerry@example.com"
"id":3,"SA":"STRINGVALUE"
df=spark.read.json(my_path)
df.registerTempTable("T")
spark.sql("""select id,SA.SM.Name from T """).show()
Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件“/usr/lib/spark/python/pyspark/sql/session.py”,行 767,在sql中 返回 DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) 文件 "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 第 1257 行,在 call 文件中 “/usr/lib/spark/python/pyspark/sql/utils.py”,第 69 行,在 deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "Can't extract value from SA#6.SM:需要结构类型但得到字符串;第 1 行 pos 10"
【问题讨论】:
你能在这个问题上发布你的 df 的 shcema 吗? print(df.schema),如果你的 SA 是 STRING TYPE,那么你有写函数来转换为 MAP TYPE >>> df.schema StructType(List(StructField(SA,StructType(List(StructField(SM,StringType,true))),true),StructField(id,LongType,true))) @e.zy : 你能帮忙看看如何转换成 MAP 类型吗? 【参考方案1】:这在使用数据帧时是不可能的,因为当 spark 加载它时,列 SA 被读取为字符串。但是您可以使用 sparkContext 作为 rdd 加载文件/表,然后使用更清洁的函数将空 dict 值映射到 SA。这里我将文件加载为textFile,但如果它是hadoopfile,则进行必要的实现。
def cleaner(record):
output = ""
print(type(record))
try:
output = json.loads(record)
except Exception as e:
print("exception happened")
finally:
if isinstance(output.get("SA"), str ):
print("This is string")
output["SA"] =
return output
dfx = spark.sparkContext.textFile("file://"+my_path)
dfx2 = dfx.map(cleaner)
new_df = spark.createDataFrame(dfx2)
new_df.show(truncate=False)
+---------------------------------------------------+---+
|SA |id |
+---------------------------------------------------+---+
|[SM -> [Email -> John@example.com, Name -> John]] |1 |
|[SM -> [Email -> Jerry@example.com, Name -> Jerry]]|2 |
|[] |3 |
+---------------------------------------------------+---+
new_df.printSchema()
root
|-- SA: map (nullable = true)
| |-- key: string
| |-- value: map (valueContainsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- id: long (nullable = true)
注意:如果 name 的输出值必须写入同一个表/列,此解决方案可能不起作用,如果您尝试将加载的数据帧写回同一个表,则会导致 SA 列break,您将根据 qn 的 cmets 中提供的架构获得姓名和电子邮件列表。
【讨论】:
非常感谢@HArdRese7。过滤记录解决我的问题至少忽略不是字符串类型的记录。以上是关于在 Pyspark/Hive 中处理不断变化的数据类型的主要内容,如果未能解决你的问题,请参考以下文章