如何在pyspark中将字符串列转换为ArrayType
Posted
技术标签:
【中文标题】如何在pyspark中将字符串列转换为ArrayType【英文标题】:How to convert string column to ArrayType in pyspark 【发布时间】:2019-09-10 09:43:47 【问题描述】:我有一个要求,我需要使用 pyspark 屏蔽存储在 Cassandra 表中的数据。我在 Cassandra 中有一个冻结的数据集,我在 pyspark 中将其作为 Array 获取。我将其转换为字符串以进行屏蔽。现在,我想将它转换回数组类型。
我正在使用 spark 2.3.2 来屏蔽 Cassandra 表中的数据。我将数据复制到数据框并将其转换为字符串以执行屏蔽。我尝试将其转换回数组但是,我无法保持原始结构。
table_df.createOrReplaceTempView("tmp")
networkinfos_df= sqlContext.sql('Select networkinfos , pid, eid, s sid From tmp ')
dfn1 = networkinfos_df.withColumn('networkinfos_ntdf',regexp_replace(networkinfos_df.networkinfos.cast(StringType()),r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)3(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', faker.ipv4_private(network=False, address_class=None))).drop('networkinfos') \
.withColumn('networkinfos_ntdf',regexp_replace('networkinfos_ntdf',r'([a-fA-F0-9]2[:|\-]?)6', faker.mac_address())) \
.withColumn('networkinfos_ntdf',regexp_replace('networkinfos_ntdf',r'(([0-9a-fA-F]1,4:)7,7[0-9a-fA-F]1,4|([0-9a-fA-F]1,4:)1,7:|([0-9a-fA-F]1,4:)1,6:[0-9a-fA-F]1,4|([0-9a-fA-F]1,4:)1,5(:[0-9a-fA-F]1,4)1,2|([0-9a-fA-F]1,4:)1,4(:[0-9a-fA-F]1,4)1,3|([0-9a-fA-F]1,4:)1,3(:[0-9a-fA-F]1,4)1,4|([0-9a-fA-F]1,4:)1,2(:[0-9a-fA-F]1,4)1,5|[0-9a-fA-F]1,4:((:[0-9a-fA-F]1,4)1,6)|:((:[0-9a-fA-F]1,4)1,7|:)|fe80:(:[0-9a-fA-F]0,4)0,4%[0-9a-zA-Z]1,|::(ffff(:01,4)0,1:)0,1((25[0-5]|(2[0-4]|10,1[0-9])0,1[0-9])\.)3,3(25[0-5]|(2[0-4]|10,1[0-9])0,1[0-9])|([0-9a-fA-F]1,4:)1,4:((25[0-5]|(2[0-4]|10,1[0-9])0,1[0-9])\.)3,3(25[0-5]|(2[0-4]|10,1[0-9])0,1[0-9]))', faker.ipv6(network=False))) \
.drop('networkinfos')
dfn2 = dfn1.withColumn("networkinfos_ntdf", array(dfn1["networkinfos_ntdf"]))
dfn2.show(30,False)
它的原始结构如下:
enter code here
|-- networkinfos: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- vendor: string (nullable = true)
| | |-- product: string (nullable = true)
| | |-- dhcp_enabled: boolean (nullable = true)
| | |-- dhcp_server: string (nullable = true)
| | |-- dns_servers: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- ipv4: string (nullable = true)
| | |-- ipv6: string (nullable = true)
| | |-- subnet_mask_obsolete: string (nullable = true)
| | |-- default_ip_gateway: string (nullable = true)
| | |-- mac_address: string (nullable = true)
| | |-- logical_name: string (nullable = true)
| | |-- dhcp_lease_obtained: timestamp (nullable = true)
| | |-- dhcp_lease_expires: timestamp (nullable = true)
| | |-- ip_enabled: boolean (nullable = true)
| | |-- ipv4_list: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- ipv6_list: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- subnet_masks_obsolete: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- default_ip_gateways: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- wins_primary_server: string (nullable = true)
| | |-- wins_secondary_server: string (nullable = true)
| | |-- subnet_mask: string (nullable = true)
| | |-- subnet_masks: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- interface_index: integer (nullable = true)
| | |-- speed: long (nullable = true)
| | |-- dhcp_servers: array (nullable = true)
| | | |-- element: string (containsNull = true)
我得到的是:
root
|-- pid: string (nullable = true)
|-- eid: string (nullable = true)
|-- sid: string (nullable = true)
|-- networkinfos_ntdf: array (nullable = false)
| |-- element: string (containsNull = true)
如何将其转换为原始结构?
【问题讨论】:
你可以使用 spark sql 函数 split 将其拆分回来spark.apache.org/docs/latest/api/java/org/apache/spark/sql/… 【参考方案1】:您可以尝试使用 pyspark.sql.functions.to_json() 和 pyspark.sql.functions.from_json() 如果您的 regexp_replace 操作不会破坏 JSON 数据:
首先找到字段networkinfos
的架构:
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import regexp_replace, from_json, to_json
# get the schema of the array field `networkinfos` in JSON
schema_data = df.select('networkinfos').schema.jsonValue()['fields'][0]['type']
# convert it into pyspark.sql.types.ArrayType:
field_schema = ArrayType.fromJson(schema_data)
获得 field_schema 后,您可以使用 from_json 从修改后的 JSON 字符串中将其设置回其原始模式:
dfn1 = networkinfos_df \
.withColumn('networkinfos', to_json('networkinfos')) \
.withColumn('networkinfos', regexp_replace('networkinfos',...)) \
.....\
.withColumn('networkinfos', from_json('networkinfos', field_schema))
【讨论】:
以上是关于如何在pyspark中将字符串列转换为ArrayType的主要内容,如果未能解决你的问题,请参考以下文章
在 Pandas 中将字符串列转换为日期的有效方法(在 Python 中),但没有时间戳
如何在 PySpark 中将字符串转换为字典 (JSON) 的 ArrayType
如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?