ActiveMQ CMS 客户端多线程通过 pthread_create 确认消息

Posted

技术标签:

【中文标题】ActiveMQ CMS 客户端多线程通过 pthread_create 确认消息【英文标题】:ActiveMQ CMS Client multi-thread to ack message by pthread_create 【发布时间】:2019-11-12 12:01:29 【问题描述】:

我对带有本地 C++ CMS 客户端 3.9.3 的 ActiveMQ 5.11 代理有疑问。我修改了official site 中的示例代码,以使用 pthread_create 函数生成一个新线程并尝试从新线程确认消息(CLIENT_ACK 模式)。事实证明存在分段错误。我们如何实现从新生成的线程而不是当前线程返回 ack? ActiveMQ C++ Client 是否支持多线程确认消息?

void* sendAckThreadFunc(void *pMessage) 
    sleep(1);
    const Message* message = (const Message*) pMessage;
    message->acknowledge();
    printf("ACK sent out.");        
    return NULL;




   virtual void onMessage(const Message* message) 

        static int count = 0;

        try 
            count++;
            const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
            string text = "";

            if (textMessage != NULL) 
                text = textMessage->getText();
             else 
                text = "NOT A TEXTMESSAGE!";
            


            if (clientAck) 

                //message->acknowledge();  --> instead of ack the message in the onMessage function, they use pthread_create to generate a new thread and trying to ack the message from there. Is is a supported way??
                pthread_t sendAckThread;                
                if (pthread_create(&sendAckThread, NULL, sendAckThreadFunc,
                    (void*) message)) 
                    printf("Error occured when create threads.");
                
            

            printf("A Message #%d Received: %s\n", count, text.c_str());
         catch (CMSException& e) 
            e.printStackTrace();
        
    

当我运行消费者时,它甚至无法尝试确认一条消息:

[root@amq6-283-1 examples]# ./simple_async_consumer
=====================================================
Starting the example:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
A Message #1 Received: Hello world! from thread 140486368756208
Segmentation fault (core dumped)

这里的事情是,一旦消息对象退出 OnMessage 函数,所有的资源都没有了,无法传递给其他线程。

CMS API 文档清楚地说明了这一点:

        /**
         * Called asynchronously when a new message is received, the message
         * reference can be to any of the Message types. a dynamic cast is used
         * to find out what type of message this is.  The lifetime of this
         * object is only guaranteed to be for the life of the onMessage function
         * after this call-back returns the message may no longer exist. Users should
         * copy the data or clone the message if they wish to retain information that
         * was contained in this Message.
         *
         * It is considered a programming error for this method to throw an
         * exception.  The method has been tagged with the 'throw()' qualifier,
         * this implies that you application will segfault if you throw an error
         * from an implementation of this method.
         *
         * @param message
         *      Message object const pointer recipient does not own.
         */

我知道这个示例只是用于串行处理,但我真诚地要求并行处理,这意味着所有的事情都不是在一个线程中完成的。如果是串行的,在当前消息被处理并返回 ack 之前,当前线程无法接收更多批次的消息。确实不能满足客户的性能需求。

那么任何人都可以说明 CMS API 是如何设计来处理并行性的吗? Receiver 线程只专注于接收OnMessage 函数内部的消息,而其他业务线程则专注于业务处理并根据结果返回 ack。我只想知道 CMS API 如何处理并行性。这就是他们使用 CLIENT ACK 模式的方式。谁能提供一个并行示例?

【问题讨论】:

【参考方案1】:

不确定“onMessage”API 文档的哪一部分在这里不清楚,但为了帮助,我将其粘贴在这里:

    /**
     * Called asynchronously when a new message is received, the message
     * reference can be to any of the Message types. a dynamic cast is used
     * to find out what type of message this is.  The lifetime of this
     * object is only guaranteed to be for life of the onMessage function
     * after this call-back returns the message may no longer exist.  Users should
     * copy the data or clone the message if they wish to retain information that
     * was contained in this Message.
     *
     * It is considered a programming error for this method to throw an
     * exception.  The method has been tagged with the 'throw()' qualifier,
     * this implies that you application will segfault if you throw an error
     * from an implementation of this method.
     *
     * @param message
     *      Message object const pointer recipient does not own.
     */
    virtual void onMessage(const Message* message) = 0;

因此,如果您想存储消息以供以后确认,您需要使用内置的消息对象'clone' API 克隆它,这似乎很清楚。

【讨论】:

以上是关于ActiveMQ CMS 客户端多线程通过 pthread_create 确认消息的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot 整合 ActiveMq

线程 属性取消线程多线程

在 CMS ActiveMQ 中使用 VM

ActiveMQ CMS:在创建消费者和设置监听器之间会丢失消息吗?

消费端从activemq中取出一定量消息后,是一个一个进行处理,还是开启多个线程同时处理呢?

AIX 服务器 6.1 中的 ActiveMQ C 客户端