使用 pyspark 中 json 文件中的模式读取固定宽度文件

Posted

技术标签:

【中文标题】使用 pyspark 中 json 文件中的模式读取固定宽度文件【英文标题】:Read fixed width file using schema from json file in pyspark 【发布时间】:2018-12-17 14:57:28 【问题描述】:

我有如下固定宽度的文件

00120181120xyz12341
00220180203abc56792
00320181203pqr25483 

以及指定架构的相应JSON文件:

"Column":"id","From":"1","To":"3"
"Column":"date","From":"4","To":"8"
"Column":"name","From":"12","To":"3"
"Column":"salary","From":"15","To":"5"

我使用以下方法将架构文件读入 DataFrame:

SchemaFile = spark.read\
    .format("json")\
    .option("header","true")\
    .json('C:\Temp\schemaFile\schema.json')

SchemaFile.show()
#+------+----+---+
#|Column|From| To|
#+------+----+---+
#|    id|   1|  3|
#|  date|   4|  8|
#|  name|  12|  3|
#|salary|  15|  5|
#+------+----+---+

同样,我将固定宽度文件解析为 pyspark DataFrame,如下所示:

File = spark.read\
    .format("csv")\
    .option("header","false")\
    .load("C:\Temp\samplefile.txt")

File.show()
#+-------------------+
#|                _c0|
#+-------------------+
#|00120181120xyz12341|
#|00220180203abc56792|
#|00320181203pqr25483|
#+-------------------+

我显然可以硬编码每列的位置和长度的值以获得所需的输出:

from pyspark.sql.functions import substring
data = File.select(
    substring(File._c0,1,3).alias('id'),
    substring(File._c0,4,8).alias('date'),
    substring(File._c0,12,3).alias('name'),
    substring(File._c0,15,5).alias('salary')
)

data.show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

但是如何使用SchemaFile DataFrame 来指定行的宽度和列名,以便在运行时动态应用架构(无需硬编码)?

【问题讨论】:

您有什么顾虑?你试过什么? 我想在数据上应用模式,同时作为运行时读取。非硬编码 【参考方案1】:

这里最简单的做法是 collect SchemaFile 的内容并遍历其行以提取所需的数据。

首先将模式文件作为 JSON 读入 DataFrame。然后调用 collect 并将每一行映射到一个字典:

sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
#['Column': u'id', 'From': u'1', 'To': u'3',
# 'Column': u'date', 'From': u'4', 'To': u'8',
# 'Column': u'name', 'From': u'12', 'To': u'3',
# 'Column': u'salary', 'From': u'15', 'To': u'5']

现在您可以遍历 sfDict 中的行并使用这些值来为您的列添加子字符串:

from pyspark.sql.functions import substring
File.select(
    *[
        substring(
            str='_c0',
            pos=int(row['From']),
            len=int(row['To'])
        ).alias(row['Column']) 
        for row in sfDict
    ]
).show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

请注意,我们必须将 ToFrom 转换为整数,因为它们在您的 json 文件中被指定为字符串。

【讨论】:

以上是关于使用 pyspark 中 json 文件中的模式读取固定宽度文件的主要内容,如果未能解决你的问题,请参考以下文章

从 Pyspark 中的嵌套 Json-String 列中提取模式

火花流到pyspark json文件中的数据帧

如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?

Pyspark 中的 JSON 文件解析

pyspark 将模式应用于 csv - 仅返回空值

基于pyspark中的键有效地推断数据帧模式