Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame

Posted

技术标签:

【中文标题】Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame【英文标题】:Pyspark: Unable to turn RDD into DataFrame due to data type str instead of StringType 【发布时间】:2020-10-21 12:34:49 【问题描述】:

我正在 Pyspark 中执行一些复杂的操作,其中最后一个操作是 flatMap,它产生一个类型为 pyspark.rdd.PipelinedRDD 的对象,其内容只是一个字符串列表:

print(output_data.take(8))
> ['a', 'abc', 'a', 'aefgtr', 'bcde', 'bc', 'bhdsjfk', 'b']

我正在像这样开始我的 Spark-Session(用于测试的本地会话):

spark = SparkSession.builder.appName("my_app")\
    .config('spark.sql.shuffle.partitions', '2').master("local").getOrCreate()

我的输入数据如下所示:

input_data = (('a', ('abc', [[('abc', 23)], 23, False, 3])),
              ('a', ('abcde', [[('abcde', 17)], 17, False, 5])),
              ('a', ('a', [[('a', 66)], 66, False, 1])),
              ('a', ('aefgtr', [[('aefgtr', 65)], 65, False, 6])),
              ('b', ('bc', [[('bc', 25)], 25, False, 2])),
              ('b', ('bcde', [[('bcde', 76)], 76, False, 4])),
              ('b', ('b', [[('b', 13)], 13, False, 1])),
              ('b', ('bhdsjfk', [[('bhdsjfk', 36)], 36, False, 7])))
input_data = sc.parallelize(input_data)

我想把那个输出 RDD 变成一个 DataFrame,其中有一列是这样的:

schema = StructType([StructField("term", StringType())])
df = spark.createDataFrame(output_data, schema=schema)

这不起作用,我收到此错误:

TypeError: StructType can not accept object 'a' in type <class 'str'>

所以我在没有schema 的情况下尝试了它并得到了这个错误:

TypeError: Can not infer schema for type: <class 'str'>

编辑:尝试toDF() 时会发生同样的错误。

所以出于某种原因,我有一个 pyspark.rdd.PipelinedRDD,其元素不是 StringType,而是标准 Python str

我对 Pyspark 比较陌生,所以有人能告诉我为什么会发生这种情况吗?

我很惊讶 Pyspark 无法将 str 隐式转换为 StringType

我不能发布整个代码,只是说我正在用字符串做一些复杂的事情,包括字符串比较和 for 循环。不过,我并没有明确地进行任何类型转换。

【问题讨论】:

我想这就是你要找的东西:***.com/questions/48111066/…。也值得一读:***.com/questions/32788387/… 我已经阅读了那些,遗憾的是toDF 对我没有帮助,它会引发同样的错误。 【参考方案1】:

一种解决方案是将String 的RDD 转换为Row 的RDD,如下所示:

from pyspark.sql import Row
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=schema)
# or with a simple list of names as a schema
df = spark.createDataFrame(output_data.map(lambda x: Row(x)), schema=['term'])
# or even use `toDF`:
df = output_data.map(lambda x: Row(x)).toDF(['term'])
# or another variant
df = output_data.map(lambda x: Row(term=x)).toDF()

有趣的是,正如您所提到的,为像字符串这样的原始类型的 RDD 指定模式是行不通的。但是,如果我们只指定类型,它可以工作,但您不能指定名称。因此,另一种方法是这样做并重命名名为 value 的列,如下所示:

from pyspark.sql import functions as F
df = spark.createDataFrame(output_data, StringType())\
          .select(F.col('value').alias('term'))
# or similarly
df = spark.createDataFrame(output_data, "string")\
          .select(F.col('value').alias('term'))

【讨论】:

以上是关于Pyspark:由于数据类型 str 而不是 StringType,无法将 RDD 转换为 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

PySpark UDF,输入端只有 None 值

pyspark 检查每个名字是不是有3个数据

str() 函数避免类型错误

PySpark:TypeError:StructType 不能接受类型为 <type 'unicode'> 或 <type 'str'> 的对象

nTypeError:无法合并类型 <class \'pyspark.sql.types.DoubleType\'> 和 <class \'pyspark.sql.types.Str

从 pyspark 中的数据框数组类型列中获取“名称”元素