Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame
Posted
技术标签:
【中文标题】Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame【英文标题】:Pyspark: Unable to turn RDD into DataFrame due to data type str instead of StringType 【发布时间】:2020-10-21 12:34:49 【问题描述】:我正在 Pyspark 中执行一些复杂的操作,其中最后一个操作是 flatMap
,它产生一个类型为 pyspark.rdd.PipelinedRDD
的对象,其内容只是一个字符串列表:
print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']
我正在像这样开始我的 Spark-Session(用于测试的本地会话):
spark = SparkSession.builder.appName("my_app")\
.config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()
我的输入数据如下所示:
input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
('a', ('a', [[('a', 66)], 66, False, 1])),
('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
('b', ('bc', [[('bc', 25)], 25, False, 2])),
('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
('b', ('b', [[('b', 13)], 13, False, 1])),
('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)
我想把那个输出 RDD 变成一个 DataFrame,其中有一列是这样的:
schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)
这不起作用,我收到此错误:
TypeError: StructType can not accept object 'a' in type <class 'str'>
所以我在没有schema
的情况下尝试了它并得到了这个错误:
TypeError: Can not infer schema for type: <class 'str'>
编辑:尝试toDF()
时会发生同样的错误。
所以出于某种原因,我有一个 pyspark.rdd.PipelinedRDD
,其元素不是 StringType
,而是标准 Python str
。
我对 Pyspark 比较陌生,所以有人能告诉我为什么会发生这种情况吗?
我很惊讶 Pyspark 无法将 str
隐式转换为 StringType
。
我不能发布整个代码,只是说我正在用字符串做一些复杂的事情,包括字符串比较和 for 循环。不过,我并没有明确地进行任何类型转换。
【问题讨论】:
我想这就是你要找的东西:***.com/questions/48111066/…。也值得一读:***.com/questions/32788387/… 我已经阅读了那些,遗憾的是toDF
对我没有帮助,它会引发同样的错误。
【参考方案1】:
一种解决方案是将String
的RDD 转换为Row
的RDD,如下所示:
from pyspark.sql import Row
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=schema)
# or with a simple list of names as a schema
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=['term'])
# or even use `toDF`:
df = output_data.map(lambda x: Row(x)).toDF(['term'])
# or another variant
df = output_data.map(lambda x: Row(term=x)).toDF()
有趣的是,正如您所提到的,为像字符串这样的原始类型的 RDD 指定模式是行不通的。但是,如果我们只指定类型,它可以工作,但您不能指定名称。因此,另一种方法是这样做并重命名名为 value
的列,如下所示:
from pyspark.sql import functions as F
df = spark.createDataFrame(output_data, StringType())\
.select(F.col('value').alias('term'))
# or similarly
df = spark.createDataFrame(output_data, "string")\
.select(F.col('value').alias('term'))
【讨论】:
以上是关于Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
PySpark:TypeError:StructType 不能接受类型为 <type 'unicode'> 或 <type 'str'> 的对象
nTypeError:无法合并类型 <class \'pyspark.sql.types.DoubleType\'> 和 <class \'pyspark.sql.types.Str