通过 Apache Spark Streaming 从 RabbitMq 读取消息

Posted

技术标签:

【中文标题】通过 Apache Spark Streaming 从 RabbitMq 读取消息【英文标题】:Reading message from RabbitMq via Apache Spark Streaming 【发布时间】:2015-11-18 06:43:44 【问题描述】:

我正在尝试使用 Spark Streaming 和 RabbitMq 编写一个简单的“Hello World”类型的应用程序,其中 Apache Spark Streaming 将通过 RabbitMqReceiver 从 RabbitMq 读取消息并将其打印在控制台中。但是有些我无法将从 Rabbit Mq 读取的字符串打印到控制台中。火花流代码正在打印以下消息:-

Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33

消息通过下面的简单代码发送到rabbitmq:-

package helloWorld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send 

  private final static String QUEUE_NAME = "hello1";

  public static void main(String[] argv) throws Exception 
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World! is a code. Hi Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  

我正在尝试通过 Apache Streaming 读取消息,如下所示:-

package rabbitmq.example;

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.stratio.receiver.RabbitMQUtils;

public class RabbitMqEx 

    public static void main(String[] args) 
        System.out.println("Creating    Spark   Configuration");
        SparkConf conf = new SparkConf();
        conf.setAppName("RabbitMq Receiver Example");
        conf.setMaster("local[2]");

        System.out.println("Retreiving  Streaming   Context from    Spark   Conf");
        JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
                Durations.seconds(2));      

        Map<String, String>rabbitMqConParams = new HashMap<String, String>();
        rabbitMqConParams.put("host", "localhost");
        rabbitMqConParams.put("queueName", "hello1");       
        System.out.println("Trying to connect to RabbitMq");
        JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams);
        receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>()            
            @Override
            public Void call(JavaRDD<String> arg0) throws Exception                
                System.out.println("Value Received " + arg0.toString());
                return null;
            
         );        
        streamCtx.start();
        streamCtx.awaitTermination();
    

输出控制台只有如下消息:-

Creating    Spark   Configuration
Retreiving  Streaming   Context from    Spark   Conf
Trying to connect to RabbitMq
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33

在日志中我看到以下内容:-

15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0)
15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jabong); users with modify permissions: Set(jabong)
15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
15/11/18 13:20:46 INFO Remoting: Starting remoting
15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.3:42978]
15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on port 42978.
15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker
15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster
15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1
15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB
15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5
15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server
15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' on port 37150.
15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator
15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040
15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost
15/11/18 13:20:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306.
15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306
15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager
15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306)
15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager
Trying to connect to RabbitMq
15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers
15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started
15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated com.stratio.receiver.RabbitMQInputDStream@5d00adc2
15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@4c132773
15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time 1447833054000
15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms
15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler
15/11/18 13:20:53 INFO StreamingContext: StreamingContext started
15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) with 1 output partitions
15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at RabbitMqEx.java:38)
15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started
15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List()
15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List()
15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has no missing parents
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with curMem=0, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 45.4 KB, free 947.7 MB)
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with curMem=46496, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.8 KB, free 947.6 MB)
15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:47306 (size: 14.8 KB, free: 947.7 MB)
15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556)
15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 2729 bytes)
15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at time 1447833053800
15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator
15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread
15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
    at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
    at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 
15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time 1447833054000 ms (execution: 0.007 s)
15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata: 
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again
15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again
15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
    at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
    at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 
15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms
15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 ms.0 from job set of time 1447833056000 ms

执行list_queues 列出以下内容:-

sudo rabbitmqctl list_queues
Listing queues ...
hello1  2

我还打印了arg0.count 的值。它报告为 0。似乎 spark 流无法读取来自 rabbitmq 的消息。

但是我可以使用here 中提到的简单 java 接收器从队列中读取数据。

环境

RabbitMq 版本 - 3.5.6 Spark 1.5.2 Java 8(更新 66)

谁能告诉我出了什么问题?

【问题讨论】:

【参考方案1】:

here 的一位作者已回答此问题rabbitmq-receiver。只是在下面引用它:-

嗨,

您已将队列声明为非持久队列。接收方预计 从持久队列中读取。尝试这样声明:

channel.queueDeclare(QUEUE_NAME, false, true, false, null);

我还发现将队列声明为被动也可以正常工作,如下所示:- channel.queueDeclare(QUEUE_NAME, true, false, false, null)

【讨论】:

为了后代,并防止链接腐烂,您能否编辑您的答案以包含链接的相关部分?非常感谢。

以上是关于通过 Apache Spark Streaming 从 RabbitMq 读取消息的主要内容,如果未能解决你的问题,请参考以下文章

spark streaming kafka example

对象 kafka010 不是包 org.apache.spark.streaming 的成员

Spark Streaming带状态更新

Apache Spark Streaming:在内存中积累数据并在很久以后才输出

Spark Streaming源代码学习总结

Spark Streaming整合Flume