无法使用 spark scala 从数据集中的行中获取第一列的值
Posted
技术标签:
【中文标题】无法使用 spark scala 从数据集中的行中获取第一列的值【英文标题】:Unable to get the value of first column from a row in dataset using spark scala 【发布时间】:2022-01-08 15:40:09 【问题描述】:我正在尝试使用 foreachpartition 迭代数据框以将值插入数据库。我使用了 foreachpartition 并对行进行分组,并使用 foreach 迭代每一行。请在下面找到我的代码,
val endDF=spark.read.parquet(path).select("pc").filter(col("pc").isNotNull);
endDF.foreachpartition((partition: Iterator[Row]) =>
class.forname(driver)
val con=DriverManager.connection(jdbcurl,user,pwd)
partition.grouped(100).foreach(batch =>
val st=con.createStatement()
batch.foreach(row =>
val pc=row.get(0).toString()
val in=s"""insert tshdim (pc) values($pc)""".stripMargin
st.addBatch(in)
)
st.executeLargeBatch
)
con.close()
)
当我尝试从行 (val pc=row.get(0).toString()) 获取 pc 值时,它会引发以下异常。我在 spark-shell 中这样做
org.apache.spark.SparkException:任务不可序列化。 .
原因:
Java.io.NotSerializable 异常: org.apache.spark.sql.DataSet$RDDQueryExecution$ 序列化栈: 对象不可序列化 (类:org.apache.spark.sql.DataSet$RDDQueryExecution$,值: org.apache.spark.sql.DataSet$RDDQueryExecution$@jfaf ) -field(类:org.apache.spark.sql.DataSet,名称:RDDQueryExecutionModule,类型: org.apache.spark.sql.DataSet$RDDQueryExecution$) -object(class:org.apache.spark.sql.DataSet,[pc:String])
【问题讨论】:
如果你只是想插入值为什么不直接使用dataframe write api而不是使用foreachpartition循环? 【参考方案1】:foreachpartition
中的函数需要被序列化并传递给执行器。
因此,在您的情况下,spark 正在尝试为您的 jdbc 连接序列化 DriverManager 类和所有内容,其中一些是不可序列化的。
foreachPartition 在没有 DriverManager 的情况下工作 -
endDF.foreachPartition((partition: Iterator[Row]) =>
partition.grouped(100).foreach(batch =>
batch.foreach(row =>
val pc=row.get(0)
println(pc)
)
)
)
要将其保存在您的数据库中,请先执行.collect
【讨论】:
感谢您的解决方案。当我在集群中作为纱线作业执行时,我在问题中提到的代码运行良好。 可能,在纱线集群中,这些类在执行器的类路径中。 为什么叫class.forname(driver)
,driver
的价值是什么?以上是关于无法使用 spark scala 从数据集中的行中获取第一列的值的主要内容,如果未能解决你的问题,请参考以下文章
从 Spark 数据框列中 ArrayType 类型的行中获取不同的元素
Spark Window 函数:是不是可以直接从使用第一个/最后一个函数找到的行中获取其他值?
从 scala Spark 中的 RDD[type] 获取不同的行
使用 Java 在 Apache Spark 中从数据集中复制一行 n 次