sparkSession.sql 抛出 NullPointerException
Posted
技术标签:
【中文标题】sparkSession.sql 抛出 NullPointerException【英文标题】:sparkSession.sql throwing NullPointerException 【发布时间】:2019-01-16 08:18:56 【问题描述】:我有两个 scala 类作为我的 spark-sql 工作的一部分,即 Driver.scala 和 ExtractorOne.scala。
Driver.scala 将不同的参数(如 sparkSession 对象等)传递给不同的提取器,如 ExtractorOne.scala 等。
在 Extractor 类中,我从 oracle 中提取数据并将其作为 parquet 文件写入 hdfs 位置。
作为业务逻辑的一部分,我必须调用 sparkSession.sql() 来执行一些操作。但是在提取器/调用类 sparkSession 的 extract() 方法内部导致 Nullpointer 异常......所以我尝试通过调用 sparkSession.sql("show tables").show() 在调用函数中检查它,它给出了结果即对象没有问题。什么时候调用相同的,即 sparkSession.sql("show tables").show() 在被调用函数中抛出 Nullpointer 异常......知道我在这里做错了什么吗?
' Driver.scala
val spark = ConfigUtils.getSparkSession( ...); //spark session initialization successful
val parquetDf = spark.read.format("parquet"); // able to read parquet file data and got the dataframe.
val extractors : LinkedHashMap[String, (DataFrameReader, SparkSession, String, String,String,String) => Unit] = Utils.getAllDefinedExtractors();
///ExtractorOne.scala ExtractorTwo.scala ..etc are extractors as shown in other scala file
for ( key:String <- extractors.keys)
extractors.get(key).map
spark.sql("show tables").show() ///output
fun => fun(ora_df_options_conf,spark,keyspace,key.trim(),"","")
'
spark.sql("show tables").show() :::
的输出 spark.sql("show tables").show()
> Blockquote
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
但相同的东西在 ExtractorOne.scala 中出现错误
'
ExtractorOne.scala
def extract(oraOptionDfConfig: DataFrameReader, sparkSession: SparkSession, keyspace: String,
columnFamilyName: String, fromDate:String , toDate:String ) : Unit =
val company_df = ..// some opeartion to read the data from oracle to company_df
val dist_df = company_df.distinct("id")
company_df.createOrReplaceTempView("company")
dist_df.foreach( row =>
if(row.anyNull)
else
val sqlQuery:String = s" select * from company where id='%s' and quarter='%s' and year='%s' ".format( row.get(0) , row.get(1) , row.get(2))
sparkSession.sql("show tables").show() ///output...
var partitionDf = sparkSession.sql(sqlQuery)
partitionDf.show(1)
writeAsParquet(...) ///save as parquet file/s
'
sparkSession.sql("show tables").show() :::
的输出错误:
' 引起:java.lang.NullPointerException 在 org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) 在 org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) 在 org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) 在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:126) 在 com.snp.extractors.CompanyModelValsExtractor$$anonfun$extract$1.apply(ExtractorOne.scala:113) 在 scala.collection.Iterator$class.foreach(Iterator.scala:891) 在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
'
【问题讨论】:
【参考方案1】:您不能在执行程序端代码中使用 SparkSession(即在 dist_df.foreach
-loop 中),在这种情况下 Spark Session 为空(它只存在于驱动程序上)
【讨论】:
先生如何处理这个?或者我有什么,请提供样品吗? intestingly 我在另一个 scala 类中使用带有 out (for) 循环的 sparkSesssion.sql () ,它工作正常。 在哪里可以阅读或了解更多信息,如何以及在驱动程序端和执行器端分离什么?即最佳性能编码的最佳实践。 先生这仍然给出同样的错误...``` dist_company_model_vals_df.foreach(row => var partitionDf = company_model_vals_df.select("*").where( col("model_id").= ==(row.getString(0)).and(col("fiscal_quarter").===(row.getString(1))).and(col("fiscal_year").===(row.getString( 2))) ) ````这里有什么问题? @user3252097 你在这里做同样的事情,你不能在执行程序代码中使用数据帧(df.foreach
在执行程序上执行),数据帧只存在于驱动程序中。解决方法:首先将数据帧收集到驱动程序,然后阅读有关 spark 的基础知识以上是关于sparkSession.sql 抛出 NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章
SparkSession.sql 和 Dataset.sqlContext.sql 有啥区别?