PySpark AnalysisException:无法解析列名

Posted

技术标签:

【中文标题】PySpark AnalysisException:无法解析列名【英文标题】:PySpark AnalysisException : Cannot resolve column name 【发布时间】:2020-03-28 16:59:19 【问题描述】:

我正在尝试解决以下错误,我已经看到了一些关于此的帖子,但无法解决。 org.apache.spark.sql.AnalysisException:无法解析*所有列中的列名“特征”

我尝试过的事情:

tempList = [] 
for col in Df.columns:
    new_name = col.strip()
    new_name = "".join(new_name.split())
    new_name = new_name.replace('.','')
    tempList.append(new_name) 
Df = Df.toDF(*tempList) 

我的 Spark 数据框中只有 6 列,所有列都有字符和下划线。 数据框方案:

StructType(List(StructField(A,ShortType,true),StructField(B,ShortType,true),StructField(C,IntegerType,true),StructField(D,IntegerType,true),StructField(E,StringType,true),StructField(F,DoubleType,true),StructField(G,IntegerType,true)))

我正在尝试从这里实现 PCA,https://www.nodalpoint.com/pca-in-spark-1-5/

参考代码:

df = sc.parallelize([[1,2,3], [2,3,4]]).toDF(("a_1", "b", "c"))

def estimateCovariance(df):
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

def pca(df, k=2):
  cov = estimateCovariance(df)
  col = cov.shape[1]
  eigVals, eigVecs = eigh(cov)
  inds = np.argsort(eigVals)
  eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
  components = eigVecs[0:k]
  eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvalues
  score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
  scoreDF = sqlContext.createDataFrame(score.map(lambda x: (DenseVector(x),)), ['pca_features'])
  # Return the `k` principal components, `k` scores, and all eigenvalues

  return components.T, scoreDF, eigVals

comp, score, eigVals = pca(df)
score.collect()

对可能出现的问题有什么想法吗?

【问题讨论】:

你能提供一个minimal reproducible example吗? @oldwooki 创建了一个数据框来复制相同的错误(请参阅编辑)。希望这没问题? 【参考方案1】:

来自您链接到的文章:

pca 过程的输入包含一个 Spark 数据框,其中包含一个名为 features 的列,其中包含 DenseVectors 的特征。

再进一步,给你一个如何构建样本数据集的示例:

>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])

您的数据集包含许多不同列中的数据。您需要将其转换为单列向量。 Spark ML 为此提供了一个工具,即pyspark.ml.feature.VectorAssembler

在你的情况下,你需要这样的东西:

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["a_1", "b", "c"], outputCol="features")
comp, score, eigVals = pca(vectorAssembler.transform(df))

【讨论】:

【参考方案2】:

您似乎没有列 features - 如果我对问题的理解正确,则在此示例中所有列都是特征,因此您需要选择所有列。

【讨论】:

以上是关于PySpark AnalysisException:无法解析列名的主要内容,如果未能解决你的问题,请参考以下文章

pyspark.sql.utils.AnalysisException:找不到数据源:kafka

PySpark AnalysisException:无法解析列名

Pyspark 从 S3 存储桶读取 csv 文件:AnalysisException:路径不存在

pyspark.sql.utils.AnalysisException:参考“标题”不明确,可能是:标题,标题

发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

pyspark.sql.utils.AnalysisException:u'无法推断Parquet的模式。必须手动指定。