调用 onMessage 后 ActiveMQ 消费者内存使用量不断增加

Posted

技术标签:

【中文标题】调用 onMessage 后 ActiveMQ 消费者内存使用量不断增加【英文标题】:ActiveMQ consumer memory usage keep increasing after onMessage call 【发布时间】:2019-08-29 14:59:11 【问题描述】:

我正在尝试使用 Flatbuffers 的 ActiveMQ。生产者的一切似乎都运行良好,但进程运行的时间越长,消费者的内存就会不断增加。

生产者将消息标记为NON_PERSISTENT,每秒发送约30次。每条消息都是一个字节消息,大约 3000 字节。

Producer.cpp

void Producer::send_message(uint8_t* pointer, size_t size) 
    auto msg = std::unique_ptr<cms::BytesMessage>(session->createBytesMessage(pointer, size));
    producer->send(msg.get());


void Producer::run() 
    try 
        std::unique_ptr <activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        producer.reset(session->createProducer(destination.get()));
        producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
        connection->start();
    
    catch (cms::CMSException& e) 
        e.printStackTrace();
    

Consumer.cpp

void Consumer::onMessage(const cms::Message * message)

    try
    
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        const auto data = msg->getBodyBytes();
        const auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) 
            // Do something with the buffer
        
    
    catch (cms::CMSException& e) 
        e.printStackTrace();
    


void Consumer::run()

    try 
        std::unique_ptr<activemq::core::ActiveMQConnectionFactory> connectionFactory(new activemq::core::ActiveMQConnectionFactory(brokerURI));

        connection.reset(connectionFactory->createConnection());

        std::shared_ptr<activemq::core::ActiveMQConnection> amqConnection = std::dynamic_pointer_cast<activemq::core::ActiveMQConnection>(connection);
        if (amqConnection != nullptr) 
            amqConnection->addTransportListener(this);
        

        connection->start();
        connection->setExceptionListener(this);
        session.reset(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
        destination.reset(session->createTopic(destURI));
        consumer.reset(session->createConsumer(destination.get()));
        consumer->setMessageListener(this);
    
    catch (cms::CMSException& e) 
        e.printStackTrace();
        activemq::library::ActiveMQCPP::shutdownLibrary();
    

然后,我打电话给Consumer

int main()

    activemq::library::ActiveMQCPP::initializeLibrary();
    Consumer consumer("failover:(tcp://127.0.0.1:61616)", "Test-Topic");
    consumer.run();

    while (1) 

    consumer.close();
    activemq::library::ActiveMQCPP::shutdownLibrary();


Consumer 能够接收和处理消息。但是,Consumer 的内存不断增加。运行 10 分钟后内存约为 200MB。在 CMS 概述中,他们提到传递给 onMessage 的指针是调用所拥有的,所以我不应该尝试删除它。但是,调用者似乎从未删除该消息,这使得内存不断增加。

有什么方法可以在每次onMessage 调用后释放消息的内存?

非常感谢您的时间和帮助。

【问题讨论】:

【参考方案1】:

我想通了。

getBodyBytes() 返回一个指向我应该在调用后清理的数组的指针。因此,我只需要将其包装在 std::unique_ptr 中即可正确清理。

onMessage() 应如下所示:

void Consumer::onMessage(const cms::Message * message)

    try
    
        const auto msg = dynamic_cast<const cms::BytesMessage*>(message);

        std::unique_ptr<unsigned char> data(msg->getBodyBytes());

        auto size = msg->getBodyLength();
        flatbuffers::Verifier verifier((uint8_t*)(data), size);

        if (Ditto::VerifyDataBuffer(verifier)) 
            // Do something with the buffer
        
    
    catch (cms::CMSException& e) 
        e.printStackTrace();
    

【讨论】:

以上是关于调用 onMessage 后 ActiveMQ 消费者内存使用量不断增加的主要内容,如果未能解决你的问题,请参考以下文章

消费端从activemq中取出一定量消息后,是一个一个进行处理,还是开启多个线程同时处理呢?

ActiveMq笔记2-消息持久化

ActiveMq 使用指北 - 1

activeMQ能否实现消息推送?

SSE:“onmessage”永远不会被调用

onMessage:对传入消息调用两次回调 Flutter web