ActiveMQ 主从故障转移丢失消息

Posted

技术标签:

【中文标题】ActiveMQ 主从故障转移丢失消息【英文标题】:ActiveMQ master slave failover loses messages 【发布时间】:2019-11-06 13:40:43 【问题描述】:

当涉及故障转移时,ActiveMQ 会丢失大量消息(仅在主题上)。 生产者在主题中写入 1000 条消息,而(同时)消费者正在从同一主题中读取。在这个过程的中间,我关闭了 ActiveMQ 主服务器,并继续使用 ActiveMQ 从服务器。进行转换时,会丢失很多消息(约 100 条消息)。我正在开发的产品涉及不丢失消息。 我可以做些什么来坚持主题? 制作人:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>

std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");

std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";

void CreateMessage()

    for (int i = 0; i < _multiplyFactor; i++) 
        _bodyMessage += _garbageMessage;
    


int main()

    activemq::library::ActiveMQCPP::initializeLibrary();
    CreateMessage();
    activemq::core::ActiveMQConnectionFactory factory;
    factory.setBrokerURI(_amqURI);
    std::auto_ptr<cms::TextMessage> message;
    std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));

    connection->start();

    std::auto_ptr<cms::Session> session(connection->createSession());
    std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));

    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);

    for (int i = 0; i < _countMessages; i++) 
        std::stringstream ss;
        ss << i;
        std::string number = ss.str();
        message.reset(session->createTextMessage(number));
        producer->send(message.get());
        std::cout << i << std::endl;
    

    //message.reset(session->createTextMessage("DONE"));
    //producer->send(message.get());

    //connection->close();

    //activemq::library::ActiveMQCPP::shutdownLibrary();

    return 0;

消费者:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener

public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) 
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        
        else 
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        */

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) 
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        
        else 
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        

        this->_consumer = this->_session->createConsumer(this->_destination);
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    

    ~MsgListener()
    

    

    void onMessage(const cms::Message* CMSMessage)
    
        static int count = 0;

        try
        

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) 
                text = textMessage->getText();
            
            else 
                text = "NOT A TEXTMESSAGE!";

            

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        
        catch (cms::CMSException& e)
        
            e.printStackTrace();
        

        if (this->_sessionTransacted) 
            this->_session->commit();
        

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    
;

int main()

    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();

Consumer_durable:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener

public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) 
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        
        else 
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        */

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) 
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        
        else 
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        

        //this->_consumer = this->_session->createConsumer(this->_destination);



        static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
        this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    

    ~MsgListener()
    

    

    void onMessage(const cms::Message* CMSMessage)
    
        static int count = 0;

        try
        

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) 
                text = textMessage->getText();
            
            else 
                text = "NOT A TEXTMESSAGE!";
            

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        
        catch (cms::CMSException& e)
        
            e.printStackTrace();
        

        if (this->_sessionTransacted) 
            this->_session->commit();
        

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    
;

int main()

    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();

【问题讨论】:

【参考方案1】:

如果您想要消息持久性,那么您应该使用队列,或者使用持久主题订阅。无论生产者的持久模式如何,主题本身都不会持久化消息,事实上,如果没有消费者订阅并且您将消息发送到主题,则消息将被丢弃,同样,用于控制主题的持续挂起消息限制的 ActiveMQ 配置将丢弃消费者无法跟上的主题上的旧消息,因为主题的服务保证级别较低。

您需要使用队列并在生产者上设置持久性,或者确保您具有预先存在的持久主题订阅并使用分配持久性标志的生产者发送消息,如果您希望将消息写入存储和在代理故障转移时恢复。

【讨论】:

我修改了我的代码。我必须使用持久消费者而不是队列(因此​​队列不是产品的选项)。我已经编辑了问题(添加了 Consumer_durable)。现在的问题是监听器正在处理 100 条消息,然后什么也没有发生。我切断了连接(与主人)。然后侦听器处理另外 100 条消息。生产者将所有 1000 条消息都放入主题中。我没有在 ActiveMQ 文档上找到有关此的内容。正常吗? 您需要进行一些调试。我看不到您在连接上设置客户端 ID 可能会导致问题。 ActiveMQ 的 C++ 客户端代码未维护,因此它可能是一个错误。我会尝试 Java 客户端,因为它仍在维护中。 我在监听器中添加了clientID,但同样的事情正在复制,100 条消息被处理然后停止。我所看到的是连接重新启动(主代理关闭)另外 100 个被读取。然后做同样的事情大约 9 次 => 读取所有这 1000 条消息。通过“做同样的事情”,我的意思是重新打开关闭的主代理,现在将成为从属代理(锁定由旧的从属代理获取)并关闭新的主代理。我需要批量 ACK 或类似的东西吗?我没有发现任何关于这种行为的信息。 PS:java不是一个选项:(

以上是关于ActiveMQ 主从故障转移丢失消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用故障转移传输处理 Activemq 的最大帧大小异常

ActiveMQ NMS:当代理关闭时,connection.start() 使用故障转移协议挂起

32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

ActiveMQ 故障转移传输 - 为啥有这么多连接?

Mule JMS ActiveMQ 传输失败到故障转移

11 RocketMQ主从原理读写分离与故障转移