pyspark pandas 对象作为数据框 - TypeError

Posted

技术标签:

【中文标题】pyspark pandas 对象作为数据框 - TypeError【英文标题】:pyspark pandas object as dataframe - TypeError 【发布时间】:2019-09-30 06:24:02 【问题描述】:

编辑已解决 我认为问题出在 Elmo 推理生成的多维数组上。我对所有向量进行平均,然后将句子中所有单词的最终平均向量用作输出,它现在可以转换为数据帧。现在,我必须让它更快,将检查是否使用线程。

尝试使用来自 github 的 ElmoForManyLangs 预训练模型为 pyspark 数据帧中的句子生成 Elmo 嵌入。但是,我无法将生成的对象转换为数据框。

https://github.com/HIT-SCIR/ELMoForManyLangs

import sys
from pyspark.sql.functions import split
import pandas as pd
import numpy as np
from pyspark.sql.functions import trim

sys.path.append('/tmp/python-elmo/elmoManyLangs/elmoManyLangsGit/ELMoForManyLangs-master')
from elmoformanylangs import Embedder
e = Embedder('/mnt/tmp/python-elmo/elmoManyLangs/english/')

new_list = []

input = spark.read.parquet("/path/to/input/file")

words = input.withColumn("wordlist", split(trim(input["description"]), " ")).dropna().select("product_name","wordlist").limit(1)

wordsPd=words.toPandas()

for t in wordsPd.itertuples():
        new_list.append(np.average(np.array([np.average(x,axis=0) for x in e.sents2elmo(t[2])]), axis=0).tolist())

wordsPd = wordsPd.assign(embeddings=new_list)
myDf = spark.createDataFrame(wordsPd)
myDf.registerTempTable("myDf")


wordsPd

0 my_product_name ... 0 [[0.1606223, 0.09298285, -0.3494971, 0.2... [1 行 x 3 列]

wordsPd.dtypes

product_name    object 
description      object 
embeddings    object 
dtype: object

这是创建数据框的错误。

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1068, in _infer_type
    return _infer_schema(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1094, in _infer_schema
    raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <class 'object'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1068, in _infer_type
    return _infer_schema(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1096, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1096, in <listcomp>
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'object'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1068, in _infer_type
    return _infer_schema(obj)
.........
.........
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'pandas.core.indexes.range.RangeIndex'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7355529425587840217.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
...........
...........  
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'pandas.core.series.Series'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7355529425587840217.py", line 367, in <module>
    raise Exception(traceback.format_exc())
.........
.........
    raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <class 'object'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1068, in _infer_type
    return _infer_schema(obj)
.........
.........
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'object'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1068, in _infer_type
    return _infer_schema(obj)
........
........
........
........
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'pandas.core.indexes.range.RangeIndex'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-7355529425587840217.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 17, in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 691, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
........
........
........
........
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <class 'pandas.core.series.Series'>

【问题讨论】:

Converting Pandas dataframe into Spark dataframe error的可能重复 【参考方案1】:

我需要使用下面的方法来聚合向量,这使得多维数组成为一个列表。

for t in wordsPd.itertuples():
        new_list.append(np.average(np.array([np.average(x,axis=0) for x in e.sents2elmo(t[2])]), axis=0).tolist())

【讨论】:

以上是关于pyspark pandas 对象作为数据框 - TypeError的主要内容,如果未能解决你的问题,请参考以下文章

重构pyspark数据框

减去 Pandas 或 Pyspark 数据框中的连续列

使用 sql 或 pandas 数据框获取前 5 行的 pyspark 数据框

Pyspark:从列表的 RDD 创建一个火花数据框,其中列表的某些元素是对象

在 pyspark 中使用 pandas_udf 过滤数据框

将 Pyspark RDD 转换为 Pandas 数据框