无法使用 spark scala 从数据集中的行中获取第一列的值

Posted

技术标签:

【中文标题】无法使用 spark scala 从数据集中的行中获取第一列的值【英文标题】:Unable to get the value of first column from a row in dataset using spark scala 【发布时间】:2022-01-08 15:40:09 【问题描述】:

我正在尝试使用 foreachpartition 迭代数据框以将值插入数据库。我使用了 foreachpartition 并对行进行分组,并使用 foreach 迭代每一行。请在下面找到我的代码,

val endDF=spark.read.parquet(path).select("pc").filter(col("pc").isNotNull);

endDF.foreachpartition((partition: Iterator[Row]) =>
    class.forname(driver)
    val con=DriverManager.connection(jdbcurl,user,pwd)
      partition.grouped(100).foreach(batch => 
        val st=con.createStatement()
           batch.foreach(row => 
         val pc=row.get(0).toString()
         val in=s"""insert tshdim (pc) values($pc)""".stripMargin
         st.addBatch(in)
      )
         st.executeLargeBatch
)
con.close()
)

当我尝试从行 (val pc=row.get(0).toString()) 获取 pc 值时,它会引发以下异常。我在 spark-shell 中这样做

org.apache.spark.SparkException:任务不可序列化。 .

原因:

Java.io.NotSerializable 异常: org.apache.spark.sql.DataSet$RDDQueryExecution$ 序列化栈: 对象不可序列化 (类:org.apache.spark.sql.DataSet$RDDQueryExecution$,值: org.apache.spark.sql.DataSet$RDDQueryExecution$@jfaf ) -field(类:org.apache.spark.sql.DataSet,名称:RDDQueryExecutionModule,类型: org.apache.spark.sql.DataSet$RDDQueryExecution$) -object(class:org.apache.spark.sql.DataSet,[pc:String])

【问题讨论】:

如果你只是想插入值为什么不直接使用dataframe write api而不是使用foreachpartition循环? 【参考方案1】:

foreachpartition 中的函数需要被序列化并传递给执行器。 因此,在您的情况下,spark 正在尝试为您的 jdbc 连接序列化 DriverManager 类和所有内容,其中一些是不可序列化的。 foreachPartition 在没有 DriverManager 的情况下工作 -

endDF.foreachPartition((partition: Iterator[Row]) => 
  partition.grouped(100).foreach(batch => 
    batch.foreach(row => 
      val pc=row.get(0)
      println(pc)
    )
  )
)

要将其保存在您的数据库中,请先执行.collect

【讨论】:

感谢您的解决方案。当我在集群中作为纱线作业执行时,我在问题中提到的代码运行良好。 可能,在纱线集群中,这些类在执行器的类路径中。 为什么叫class.forname(driver)driver的价值是什么?

以上是关于无法使用 spark scala 从数据集中的行中获取第一列的值的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark 数据框列中 ArrayType 类型的行中获取不同的元素

Spark Window 函数:是不是可以直接从使用第一个/最后一个函数找到的行中获取其他值?

从 scala Spark 中的 RDD[type] 获取不同的行

使用 Java 在 Apache Spark 中从数据集中复制一行 n 次

如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?

如何使用过滤器从scala中的数据框中获取包含空值的行集