Spark-sql 中的 NullPointerException

Posted

技术标签:

【中文标题】Spark-sql 中的 NullPointerException【英文标题】:NullPointerException in spark-sql 【发布时间】:2014-07-18 19:20:49 【问题描述】:

我正在编写一个程序来使用 spark-sql 将两个文件连接到一个公共参数上。我认为我的代码很好,但是当我尝试将其保存为文本文件时,我遇到了错误。我将我的代码如下:-

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;



import java.io.Serializable;


public class JoinCSV 
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable 
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() 
            return ASSETNUM;
        
        public void setASSETNUM(String aSSETNUM) 
            ASSETNUM = aSSETNUM;
        
        public String getASSETTAG() 
            return ASSETTAG;
        
        public void setASSETTAG(String aSSETTAG) 
            ASSETTAG = aSSETTAG;
        
        public String getCALNUM() 
            return CALNUM;
        
        public void setCALNUM(String cALNUM) 
            CALNUM = cALNUM;
        


      

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable 

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() 
            return ASSETNUM;
        
        public void setASSETNUM(String aSSETNUM) 
            ASSETNUM = aSSETNUM;
        
        public String getCHANGEBY() 
            return CHANGEBY;
        
        public void setCHANGEBY(String cHANGEBY) 
            CHANGEBY = cHANGEBY;
        
        public String getCHANGEDATE() 
            return CHANGEDATE;
        
        public void setCHANGEDATE(String cHANGEDATE) 
            CHANGEDATE = cHANGEDATE;
        
    

    private static final Pattern comma = Pattern.compile(",");
    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception 
        String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
        String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() 
                    public CompleteSample call(String line) throws Exception 
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    
                  );

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() 
                    public ExtendedSample call(String line) throws Exception 
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    
                  );

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
          fs.saveAsTextFile("result");                   //Here I am getting error
    


我的错误如下:-

    14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
            java.lang.ProcessBuilder.start(Unknown Source)
            org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
            org.apache.hadoop.util.Shell.run(Shell.java:379)
            org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
            org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------

 14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------

无论我使用的是 spark、spark-sql 还是 spark-streaming,都会出现第二个错误。我不知道这个错误是什么。但似乎第二个错误对代码没有影响,因为即使在这个错误之后,结果也可以正常使用。但是每次运行程序看到一个未知的错误还是很烦人的。

有人可以帮我理解这个问题吗?我非常坚持这一点。谢谢

【问题讨论】:

你在 Linux 上也遇到同样的错误吗? 不,我在这里尝试使用“saveAsTextFile”将文件保存在 Windows OS 上的本地文件系统上。对于本地文件系统,“saveAs”选项都不起作用。然而,这些选项在将文件保存在 hdfs 上时工作得非常完美。 对我来说似乎是一个特定于 Windows 的问题。我不知道如何解决它,但如果它只是本地保存,你可以解决这个问题。使用RDD.collect() 获取数据,然后通过常规Java FileOutputStream 保存。 是的,我发现很多其他人也面临着类似的问题,所以我认为它的错误或者可能是它的代码没有正确编写。无论如何,我现在会尝试常规的 java。谢谢 更新:- 我猜这不是 spark-sql 的问题,而是“JavaRDD.saveAsTextFile("path/to/file")”本身有问题。通过使用这种方法,我可以在我的本地文件系统上创建目录 abd 文件也在其中创建,但这些文件中没有写入任何文本。回答我收到“NullPointerException”。 【参考方案1】:

对于 Windows 上的 rdd.saveAsTextFile() 错误有一个解决方法。它修复了 SparkExceptionIOException 错误,我在本地模式下在 Windows 8.1 上使用 Spark v1.1.0 时也遇到了这些错误。

http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

以下是该链接中的步骤:

1)download compiled winutils.exe;

2) 把它放在像c:\winutil\bin 这样的地方;

3) 将此行添加到您的代码中:System.setProperty("hadoop.home.dir", "c:\\winutil\\")

希望这对你有用。

【讨论】:

非常感谢 Dylan 提供了这个出色的解决方案。我忽略了与 saveTextAsFile 无关的 winutils WARN 消息。现在我已经按照您的步骤进行操作,我的应用程序一直运行顺利。 @DylanHogg 嗨,先生,您能帮我解决这个问题,同时保存到 cassandra 我有问题***.com/questions/53042545/…

以上是关于Spark-sql 中的 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

控制 spark-sql 和数据帧中的字段可空性

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

Spark-sql 数据砖中的变量动态分配值

spark-sql/Scala 中的反透视列名是数字

hbase报错org.apache.hadoop.ipc.RemoteException(java.lang.NullPointerException): java.lang.NullPointerE

通过spark-sql快速读取hive中的数据