在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错

Posted

技术标签:

【中文标题】在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错【英文标题】:error when use filter(),map(),... in spark java api( org.apache.spark.SparkException ) 【发布时间】:2018-02-05 17:56:54 【问题描述】:

我是 spark 的新手 当我在java api中使用spark过滤器时,我得到这个错误(如果collect()所有表都正确工作,我可以看到所有数据都来自cassandra。)我检查了master和workers版本是否相同,并且当应用程序启动时spark的web ui我可以看到但是:

[Stage 0:>                                                          (0 + 0) / 6]
[Stage 0:>                                                          (0 + 2) / 6]
[Stage 0:>                                                          (0 + 4) / 6]

2017-08-28 16:37:16,239 错误 TaskSetManager:70 - 阶段 0.0 中的任务 1 失败 4 次;中止工作 2017-08-28 16:37:21,351 错误 DefaultExceptionMapper:170 - 发生意外错误 org.apache.wicket.WicketRuntimeException:针对接口org.apache.wicket.behavior.IBehaviorListener的方法onRequest org.apache.wicket.extensions.ajax.markup.html.AjaxLazyLoadPanel$1@e7e7465 在组件 [AjaxLazyLoadPanel [Component id = panel]] 上抛出了一个 例外在 org.apache.wicket.RequestListenerInterface.internalInvoke(RequestListenerInterface.java:268) 在 org.apache.wicket.RequestListenerInterface.invoke(RequestListenerInterface.java:241) 在 org.apache.wicket.core.request.handler.ListenerInterfaceRequestHandler.invokeListener(ListenerInterfaceRequestHandler.java:248) 在 org.apache.wicket.core.request.handler.ListenerInterfaceRequestHandler.respond(ListenerInterfaceRequestHandler.java:234) 在 org.apache.wicket.request.cycle.RequestCycle$HandlerExecutor.respond(RequestCycle.java:895) 在 org.apache.wicket.request.RequestHandlerStack.execute(RequestHandlerStack.java:64) 在 org.apache.wicket.request.cycle.RequestCycle.execute(RequestCycle.java:265) 在 org.apache.wicket.request.cycle.RequestCycle.processRequest(RequestCycle.java:222) 在 org.apache.wicket.request.cycle.RequestCycle.processRequestAndDetach(RequestCycle.java:293) 在 org.apache.wicket.protocol.http.WicketFilter.processRequestCycle(WicketFilter.java:261) 在 org.apache.wicket.protocol.http.WicketFilter.processRequest(WicketFilter.java:203) 在 org.apache.wicket.protocol.http.WicketFilter.doFilter(WicketFilter.java:284) 在 org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) 在 org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) 在 org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:217) 在 org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) 在 org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) 在 org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142) 在 org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) 在 org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) 在 org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) 在 org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:518) 在 org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1091) 在 org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:673) 在 org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) 在 org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) 在 java.lang.Thread.run(Thread.java:748)

引起:java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.wicket.RequestListenerInterface.internalInvoke(RequestListenerInterface.java:258) ... 29 更多

原因:java.lang.RuntimeException:无法构造面板 me.SparkTestPanel。在...

原因:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 0.0 中的任务 1 失败 4 次,最近一次 失败:在 0.0 阶段丢失任务 1.3(TID 10、21.1.0.41、执行者 1): java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) 在 java.lang.Class.forName0(本机方法)在 java.lang.Class.forName(Class.java:348) 在 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) 在 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) 在 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 在 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 在 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) 在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) 在 org.apache.spark.rdd.RDD.count(RDD.scala:1158) 在 org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) 在 org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) 在 me.SparkTestPanel.(SparkTestPanel.java:77) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance0(本机方法) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) ... 39 更多

原因:java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) 在 java.lang.Class.forName0(本机方法)在 java.lang.Class.forName(Class.java:348) 在 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) 在 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) 在 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) 在 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) 在 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) 在 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 在 java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 在 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 在 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) 在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 更多

我的代码是:

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.wicket.markup.html.form.Form;

/**
 *
 * @author mohamadreza
 */
public class SparkTestPanel extends Panel 

    private Form form;

    public SparkTestPanel(String id) 
        super(id);
        form = new Form("form");
        form.setOutputMarkupId(true);
        this.add(form);             
        SparkConf conf = new SparkConf(true);
        conf.setAppName("Spark Test");
        conf.setMaster("spark://192.16.11.18:7049");
        conf.set("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer");
        conf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");

        conf.set("spark.cassandra.connection.host", "192.16.11.18");
        conf.set("spark.cassandra.connection.port", "7005");
        conf.set("spark.cassandra.auth.username", "user");
        conf.set("spark.cassandra.auth.password", "password");
        JavaSparkContext sc = null;
        try 
            sc = new JavaSparkContext(conf);
            JavaRDD<CassandraRow> cache = javaFunctions(sc).cassandraTable("keyspace", "test").cache();
            Long count = cache.filter(new Function<CassandraRow, Boolean>() 
                @Override
                public Boolean call(CassandraRow t1) throws Exception 
                    return t1.getString("value").contains("test");
                
            ).count();
            String a = count.toString();
         finally 
            sc.stop();
        
    

还有 spark 2.1.1 版、scala 2.11 版、JAVA 8 和我的 pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.5</version>
    </dependency>

我将 docker 用于 cassandra 和 spark 节点。(cassandra 3.0 版) 谁能帮帮我?

【问题讨论】:

【参考方案1】:

问题解决了 :)

当您想使用 Apache Spark 的 JAVA Api 时,您必须将项目的 .jar(位于项目根目录的目标目录中)复制到每个 Spark 节点(主节点和工作节点)中的 $SPARK_PATH/jars/。如果您的 @987654323 @非常大,你可以拆分ui和spark代码,只复制spark代码项目的.jar,并在你的ui项目中使用这个spark代码。

【讨论】:

barik bar javan irani aziz

以上是关于在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 是不是调用 java api,然后 java api 在 Apache Spark 中调用 scala api?

PySpark 是不是调用 java api,然后 java api 在 Apache Spark 中调用 scala api?

在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错

在 java spark 中从 REST API 读取 csv

Spark Java API 之 CountVectorizer

java异常求助~高手来啊