独立运行 UDF 的 Spark 错误

Posted

技术标签:

【中文标题】独立运行 UDF 的 Spark 错误【英文标题】:Spark errors with UDFs running on standalone 【发布时间】:2021-01-11 12:00:11 【问题描述】:

我正在运行我的 spark 程序,该程序在本地工作但不是远程工作。 我的程序有这些组件(容器):

我的应用程序基于 spring(用于 REST 调用),它启动驱动程序(使用 getOrCreate 的 Spark Session)并具有我构建的所有转换器。 基于 bitnami 图像的 Spark Master。 Spark Worker 基于 bitnami 映像,但也具有我的应用程序的所有依赖项(即 /dependencies 目录下的所有 jar)。

本地一切正常,但远程运行带有 UDF 的转换器时出现该错误(其余转换器(即没有 UDF)工作正常):

原因:java.lang.ClassCastException:无法将 scala.collection.immutable.List$SerializationProxy 的实例分配给 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ 的字段在 java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) 的 java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) 的 org.apache.spark.rdd.MapPartitionsRDD 实例中键入 scala.collection.Seq在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) 在 scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490) 在 sun.reflect.GeneratedMethodAccessor3.invoke(未知来源)在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ...

这是我的 Spark 会话代码:

val sparkConf = new SparkConf()
.setMaster("spark://spark-master:7077")
.setAppName("My-App")
.set("spark.executor.extraClassPath", "/dependencies/*")

val spark = SparkSession.builder().config(sparkConf).getOrCreate()

因此,具有外部依赖项的作业可以正常工作,但 UDF 会产生上述错误。 我还尝试将我的应用程序 jar(其中包含驱动程序和 spring 代码以及工作程序中已经存在的所有其他依赖项)添加到工作程序中的依赖项文件夹,但仍然会产生错误。还尝试将其放置在与驱动程序相同的位置的工作人员中,并使用“spark.jars”将其位置添加到 sparkConf,但没有成功。 有什么建议吗?

【问题讨论】:

这对***.com/questions/39953245/…有帮助吗? 不,我写在问题的底部。尝试使用“spark.jars”将我的应用程序 jar 添加到工作映像,但没有成功 【参考方案1】:

经过大量谷歌搜索后,我发现了如何集成 Spring-Boot 和 Spark 的解决方案。 我需要更改我的 pom 以使用 shade 插件制作一个 uber-jar。所以我替换了这个:

            <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <fork>true</fork>
                <executable>true</executable>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

与:

            <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>$spring-boot.version</version>
                </dependency>
            </dependencies>
            <configuration>
                <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>module-info.class</exclude>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                        <resource>META-INF/spring.factories</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>$start-class</mainClass>
                    </transformer>
                </transformers>
            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

然后,我将项目 jar 添加到每个工作人员,并将这些配置添加到 Spark 会话:

    "spark.executor.extraClassPath", "/path/app.jar",
    "spark.driver.extraClassPath", "/path/app.jar",
    "spark.jars", "/path/app.jar",

【讨论】:

以上是关于独立运行 UDF 的 Spark 错误的主要内容,如果未能解决你的问题,请参考以下文章

注册 UDF 时出现 Spark 错误:不支持 AnyRef 类型的架构

使用 spark.sparkContext.addPyFile 导入 Pandas UDF

Apache Spark - UDF 似乎不适用于 spark-submit

我可以让 Spark 仅在必要的行上运行 UDF 吗?

使用 udf 的 pyspark 出错:您必须使用 Hive 构建 Spark。导出 'SPARK_HIVE=true' 并运行 build/sbt 程序集

在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构