将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException

Posted

技术标签:

【中文标题】将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException【英文标题】:ClassCastException at convert RDD to DataFrame Spark Streaming 【发布时间】:2019-03-28 19:23:50 【问题描述】:

大家好,我有下一个问题。我正在使用带有 Java 的 Apache Spark Streaming v1.6.0 来获取来自 IBM MQ 的一些消息。我为 MQ 制作了自定义接收器,但我遇到的问题是我需要将 RDD 从 JavaDStream 转换为 DataFrame。为此,我使用 foreachRDD 迭代 JavaDStream,并为 DataFrame 定义了架构,但是当我运行作业时,第一条消息会引发下一个异常:

java.lang.ClassCastException: org.apache.spark.rdd.BlockRDDPartition 不能转换为 org.apache.spark.rdd.ParallelCollectionPartition 在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​0) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 19/03/28 12:53:26 WARN TaskSetManager:在阶段 0.0(TID 0,本地主机)中丢失任务 0.0:java.lang.ClassCastException:org.apache.spark.rdd.BlockRDDPartition 无法转换为 org.apache.spark .rdd.ParallelCollectionPartition 在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:27​​0) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

然后代码执行得很好。即使我在 MQ 中没有任何消息,也只是我运行作业时的第一条消息。

这是我的 CustomMQReceiver

public CustomMQReceiver() 

        super(StorageLevel.MEMORY_ONLY_2());

    

    @Override
    public void onStart() 

        new Thread() 
            @Override
            public void run() 
                try 
                    initConnection();
                    receive();
                 catch (JMSException ex) 
                    ex.printStackTrace();
                
            
        .start();

    

    @Override
    public void onStop() 

    

    private void receive() 

        System.out.print("Started receiving messages from MQ");

        try 

            Message receivedMessage = null;

            while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null) 

                String userInput = convertStreamToString(receivedMessage);
                System.out.println("Received data :'" + userInput + "'");
                store(userInput);
            

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

         catch (Exception e) 
            e.printStackTrace();
            restart("Trying to connect again");
         catch (Throwable t) 

            restart("Error receiving data", t);
        

    

    public void initConnection() throws JMSException 

        MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
        conFactory.setHostName(HOST);
        conFactory.setPort(PORT);
        conFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        conFactory.setQueueManager(QMGR);
        conFactory.setChannel(CHANNEL);
        conFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
        conFactory.setStringProperty(WMQConstants.USERID, APP_USER);
        conFactory.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);

        qCon = (MQQueueConnection) conFactory.createConnection();
        MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, 1);
        MQQueue queue = (MQQueue) qSession.createQueue(QUEUE_NAME);
        consumer = (MQMessageConsumer) qSession.createConsumer(queue);
        qCon.start();

    

    @Override
    public StorageLevel storageLevel() 
        return StorageLevel.MEMORY_ONLY_2();
    

    private static String convertStreamToString(final Message jmsMsg) throws Exception 

        String stringMessage = "";
        JMSTextMessage msg = (JMSTextMessage) jmsMsg;
        stringMessage = msg.getText();

        return stringMessage;
    

这是我的 spark 代码

SparkConf sparkConf = new SparkConf()
                    .setAppName("MQStreaming")
                    .set("spark.driver.allowMultipleContexts", "true")
                    .setMaster("local[*]");

            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            final SQLContext sqlContext = new SQLContext(jsc);
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(propertiesConf.getProperty("duration"))));

            JavaDStream<String> customReceiverStream = ssc.receiverStream(new CustomMQReceiver());

            customReceiverStream.foreachRDD(new VoidFunction<JavaRDD<String>>() 

                @Override
                public void call(JavaRDD<String> rdd) throws Exception 

                    JavaRDD<Row> rddRow = rdd.map(new Function<String, Row>() 

                        @Override
                        public Row call(String v1) throws Exception 

                            return RowFactory.create(v1);

                        

                    );

                    try 

                        StructType schema = new StructType(new StructField[]
                            new StructField("trama", DataTypes.StringType, true, Metadata.empty())
                        );

                        DataFrame frame = sqlContext.createDataFrame(rddRow, schema);

                        if (frame.count() > 0) 
                            //Here is where the first messages throw the exception
                            frame.show();
                            frame.write().mode(SaveMode.Append).json("file:///C:/tmp/");

                        

                     catch (Exception ex) 

                        System.out.println(" INFO " + ex.getMessage());

                    

                

            );

            ssc.start();
            ssc.awaitTermination();

我无法更改 spark 的版本,因为此作业将在带有 spark 1.6 的旧 cloudera 集群中运行。我不知道我做错了什么还是只是一个错误。救命!!!!

【问题讨论】:

【参考方案1】:

我自己的问题解决了,这个异常是我创建SQLContext的方式抛出的,正确的方法是用JavaStreamingContext创建sqlContext

//JavaStreamingContext jsc = ...
SQLContext sqlContext = new SQLContext(jsc.sparkContext());

【讨论】:

以上是关于将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章

在scala中将Spark Dataframe转换为RDD

如何将 spark DataFrame 转换为 RDD mllib LabeledPoints?

将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException

Spark RDD转换成DataFrame的两种方式

如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark

spark-sql将Rdd转换为DataFrame进行操作的两种方法