在pyspark中将字典列拆分为多列

Posted

技术标签:

【中文标题】在pyspark中将字典列拆分为多列【英文标题】:splitting dictionary column into multiple columns in pyspark 【发布时间】:2019-01-21 06:04:42 【问题描述】:
Column Names
Production_uint_id,batch_id,items_produced,items_discarded
Data:
P188    gv962   'scissor': 141, 'paper': 274, 'rock': 218
'scissor': 14,'paper': 135, 'rock': 24
P258    mr005   'scissor': 151, 'paper': 143, 'rock': 225
'scissor': 24, 'paper': 60, 'rock': 17

代码:

from pyspark.sql.types import *
sc = spark.sparkContext
production_rdd = sc.textFile("/Production_logs.tsv")
production_parts = production_rdd.map(lambda l: l.split("\t"))
production = production_parts.map(lambda p: (p[0], p[1], p[2], p[3].strip()))
schemaStringProduction = "production_unit_id batch_id items_produced items_discarded"
fieldsProduction = [StructField(field_name, StringType(), True) for field_name in schemaStringProduction.split()]
schemaProduction = StructType(fieldsProduction)
schemaProductionDF = spark.createDataFrame(production, schemaProduction)

I am Trying to explode
exploding = schemaProductionDF.select("production_unit_id",  explode("items_produced").alias("item_p", "item_p_count"), "items_discarded")

收到此错误:

pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(`items_produced`)' due to data type mismatch: 
input to function explode should be array or map type, not string;

请帮忙

【问题讨论】:

所有列在您的架构 fieldsProduction = [StructField(field_name, StringType(), True) for field_name in schemaStringProduction.split()] 中设置为 StringType 。将其更改为正确的数据类型。 嗨,Suresh,我应该提到哪种数据类型? 您可以将MapType(StringType(),LongType()) 用于列items_produced,items_discarded How to query JSON data column using Spark DataFrames?的可能重复 【参考方案1】:

Explode 是 UDTF 函数,它将为每个数组元素返回新的 Row。 爆炸:Explode in PySpark

对于您的问题,请尝试以下代码:

from pyspark import SparkContext
from pyspark.sql import Row
sc= SparkContext.getOrCreate()
import pandas as pd
rdd1=sc.textFile("D:\MOCK_DATA\*_dict.txt")
lineRDD=rdd1.map(lambda line: line.split("\t"))
header="Production_uint_id,batch_id,items_produced,items_discarded"
col_name=[x.encode("utf-8") for x in header.split(',')] 
production = lineRDD.map(lambda p: (eval(p[0]), eval(p[1]), eval(p[2]), eval(p[3]).strip()))
flatRDD=lineRDD.map(lambda a : ((a[0],a[1],eval(a[2]).values(),eval(a[3]).values())))
DF1=flatRDD.toDF(col_name)
DF1.printSchema()
from pyspark.sql import functions as f
DF2=DF1
lst='scissor,paper,rock'
col_lst='items_produced,items_discarded'
for col_ele in col_lst.split(","):
    count=0
    for i in lst.split(','):
        DF2=DF2.withColumn(col_ele+'.'+i, DF2[col_ele][count])
        count=count+1

DF2.show()

【讨论】:

如果您遇到任何问题,请告诉我,如果符合您的要求,请将其标记为正确。

以上是关于在pyspark中将字典列拆分为多列的主要内容,如果未能解决你的问题,请参考以下文章

将列表的列拆分为同一 PySpark 数据框中的多列

Pyspark 数据框将 json 列值拆分为***多列

如何在pyspark中将列转换为行?

在pyspark中将带有字符串json字符串的列转换为带有字典的列

如何将 map_keys() 中的值拆分为 PySpark 中的多列

在 Pyspark 中将字典转换为数据框