两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败

Posted

技术标签:

【中文标题】两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败【英文标题】:Simple join of two Spark DataFrame failing with "org.apache.spark.sql.AnalysisException: Cannot resolve column name" 【发布时间】:2015-09-02 14:47:00 【问题描述】:

更新 事实证明,这与 Databricks Spark CSV 阅读器创建 DataFrame 的方式有关。在下面这个不起作用的示例中,我使用 Databricks CSV 阅读器读取人员和地址 CSV,然后将生成的 DataFrame 以 Parquet 格式写入 HDFS。

我更改了创建 DataFrame 的代码:(与 people.csv 类似)

JavaRDD<Address> address = context.textFile("/Users/sfelsheim/data/address.csv").map(
            new Function<String, Address>() 
                public Address call(String line) throws Exception 
                    String[] parts = line.split(",");

                    Address addr = new Address();
                    addr.setAddrId(parts[0]);
                    addr.setCity(parts[1]);
                    addr.setState(parts[2]);
                    addr.setZip(parts[3]);

                    return addr;
                
            );

然后将生成的 DataFrame 以 Parquet 格式写入 HDFS,连接按预期工作

在这两种情况下,我都在读取完全相同的 CSV。


在尝试对从 HDFS 上的两个不同 parquet 文件创建的两个 DataFrame 执行简单连接时遇到问题。


[main] INFO org.apache.spark.SparkContext - 运行 Spark 版本 1.4.1

使用 Hadoop 2.7.0

中的 HDFS

这里有一个示例来说明。

 public void testStrangeness(String[] args) 
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("joinIssue");
    JavaSparkContext context = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(context);

    DataFrame people = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet");
    DataFrame address = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet");

    people.printSchema();
    address.printSchema();

    // yeah, works
    DataFrame cartJoin = address.join(people);
    cartJoin.printSchema();

    // boo, fails 
    DataFrame joined = address.join(people,
            address.col("addrid").equalTo(people.col("addressid")));

    joined.printSchema();


人物内容

first,last,addressid 
your,mom,1 
fred,flintstone,2

地址内容

addrid,city,state,zip
1,sometown,wi,4444
2,bedrock,il,1111

people.printSchema(); 

结果...

root
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)

address.printSchema();

结果...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)


DataFrame cartJoin = address.join(people);
cartJoin.printSchema();

笛卡尔连接工作正常,printSchema() 结果...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)

这个加入...

DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));

导致以下异常。

Exception in thread "main" org.apache.spark.sql.AnalysisException: **Cannot resolve column name "addrid" among (addrid, city, state, zip);**
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:558)
    at dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36)
    at dw.dataflow.DataflowParser.main(DataflowParser.java:119)

我尝试更改它,以便人员和地址具有共同的关键属性(addressid)并使用..

address.join(people, "addressid");

但得到了相同的结果。

有什么想法吗??

谢谢

【问题讨论】:

【参考方案1】:

原来问题在于 CSV 文件是带有 BOM 的 UTF-8 格式。 DataBricks CSV 实现不处理带有 BOM 的 UTF-8。将文件转换为 UTF-8 没有 BOM,一切正常。

【讨论】:

您能解释一下这里的 BOM 是什么吗? BOM 是字节顺序标记***.com/questions/2223882/…【参考方案2】:

可以通过使用 Notepad++ 来解决这个问题。在“编码”菜单下,我将它从“UTF-8 BOM 编码”切换到“UTF-8 编码”。

【讨论】:

以上是关于两个 Spark DataFrame 的简单连接因“org.apache.spark.sql.AnalysisException:无法解析列名”而失败的主要内容,如果未能解决你的问题,请参考以下文章

使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe

在 Spark 中连接两个数据帧

Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间

Spark Dataframe Join shuffle

Spark-on-Hbase:通过Spark的DataFrame访问Hbase表

[Spark][Python]DataFrame的左右连接例子