如何使用 Spark Streaming 读取 MQ 中的消息,即 ZeroMQ,RabbitMQ?

Posted

技术标签:

【中文标题】如何使用 Spark Streaming 读取 MQ 中的消息,即 ZeroMQ,RabbitMQ?【英文标题】:How to read messages in MQs using spark streaming,i.e ZeroMQ,RabbitMQ? 【发布时间】:2018-11-09 06:03:41 【问题描述】:

正如 spark docs 所说,它支持 kafka 作为数据流源。但是我使用 ZeroMQ,并且没有 ZeroMQUtils。所以我该如何使用它?一般来说,其他MQ怎么样。我对火花和火花流媒体完全陌生,所以如果问题很愚蠢,我很抱歉。谁能给我一个解决方案。谢谢 顺便说一句,我使用 python。

更新,我终于用自定义接收器在 java 中做到了。以下是我的解决方案

public class ZeroMQReceiver extends Receiver<T> 

    private static final ObjectMapper mapper = new ObjectMapper();

    public ZeroMQReceiver() 

        super(StorageLevel.MEMORY_AND_DISK_2());
    

    @Override
    public void onStart() 
        // Start the thread that receives data over a connection
        new Thread(this::receive).start();
    

    @Override
    public void onStop() 
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
    

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() 
        String message = null;

        try 

            ZMQ.Context context = ZMQ.context(1); 
            ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     
            subscriber.connect("tcp://ip:port");    
            subscriber.subscribe("".getBytes());  

            // Until stopped or connection broken continue reading
            while (!isStopped() && (message = subscriber.recvStr()) != null) 
                List<T> results = mapper.readValue(message,
                        new TypeReference<List<T>>() );
                for (T item : results) 
                    store(item);
                
            
            // Restart in an attempt to connect again when server is active again
            restart("Trying to connect again");
         catch(Throwable t) 
            // restart if there is any other error
            restart("Error receiving data", t);
        
    

【问题讨论】:

Spark 2.0.0 twitter streaming driver is no longer available - bahir.apache.org 结构化流或 DStream? @Bernhard Stadler,两个都可以,就像我说的,我是全新的,有什么解决办法吗? @youngjack 我发布的答案如何(结构化流媒体通常从开发人员 POV 获得更好)? :) 【参考方案1】:

我假设您在谈论结构化流。

我不熟悉 ZeroMQ,但 Spark Structured Streaming 源中的一个重点是可重播性(以确保容错),如果我理解正确,ZeroMQ 不会提供开箱即用的功能。

一种实用的方法是在 Kafka 中缓冲数据并使用 KafkaSource 或作为(本地 FS/NFS、HDFS、S3)目录中的文件并使用 FileSource 进行读取。参照。 Spark Docs。如果您使用 FileSource,请确保不要将任何内容附加到 FileSource 的输入目录中的现有文件,而是自动将它们移动到目录中。

【讨论】:

我终于用自定义接收器在 java 中完成了,但仍然感谢,我会检查结构化流。 @youngjack - 你能发布解决方案吗?我需要类似的连接器。

以上是关于如何使用 Spark Streaming 读取 MQ 中的消息,即 ZeroMQ,RabbitMQ?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Spark Structured Streaming连续监视目录

如何将 Spark Streaming DStream 制作为 SQL 表

失败后读取 Spark Streaming 检查点

通过 Apache Spark Streaming 从 RabbitMq 读取消息

[Spark][Streaming]Spark读取网络输入的例子

Spark Streaming 如何在驱动程序和执行程序之间调度映射任务?