RocketMQ与Dubbo之间线程之间如何阻塞和唤醒

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ与Dubbo之间线程之间如何阻塞和唤醒相关的知识,希望对你有一定的参考价值。

参考技术A 在上一篇 RocketMQ与Dubbo相爱相杀引起的FullGC 文章中,我们讲解了由于Dubbo接口调用耗时太久,而消息生产者发送的消息非常快,导致消息消费者不能及时消费消息,造成消息队列堆积,最终导致FullGC.
本篇文章,我们看一下RocketMQ线程和Dubbo线程如何协作的.

我们向MQ消费者发送一个消息,我们分析MQ线程是如何调用Dubbo的线程,以及接收到Dubbo的返回值之后,Dubbo线程又是如何和MQ线程交互的.

Dubbo调用者的配置如下

作为Dubbo调用者,我们会配置接口调用超时时间.上面我们配置timeout=600000ms. 我们要看下这个timeout是给哪个线程使用的.配置成600000ms这么长是为了我们Debug分析的时候,不至于调用太快而受影响.

同时为了让我们通过工具可以观察线程的状态,我们也刻意让Dubbo提供者睡眠60s.
Dubbo提供者的具体实现如下

先启动Dubbo提供者,再启动MQ消费者(它的内部会调用Dubbo提供者的接口),最后给MQ消费者发送一条消息.

可以看到,当前线程是ConsumeMessageThread_1这个线程,准备调用Dubbo提供者的接口,继续进入

线程依然是ConsumeMessageThread_1,同时我们也看到timeout=600000,就是我们在Dubbo消费者配置文件中配置的超时时间.request方法内部会使用Netty将请求发送给Dubbo提供者.最后会调用get()方法.

MQ线程最后使用我们配置的超时时间timeout=600000,调用await(timeout)阻塞.

我们再使用JDK自带的jvisualvm工具将线程堆栈信息dump.

观察线程堆栈信息,ConsumeMessageThread_1线程处于TIMED_WAITING状态.也就是说,MQ线程在调用Dubbo接口的时候,如果一直没有返回结果,那么MQ线程就会一直阻塞,直到超时.

当收到Dubbo提供者返回的结果时

线程DubboClientHandler-192.168.0.102:20880-thread-1会唤醒之前被阻塞的MQ线程.

最后,MQ线程拿到返回结果,继续后面的逻辑处理.

MQ线程通过ReentrantLock和Condition与Dubbo线程完成阻塞和唤醒.

同时timeout这个值是一个重要的调优参数,如果Dubbo接口调用耗时很久,而timeout设置的又很大,就会严重阻塞MQ线程.所以,timeout这个值是需要特别关注的.

实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)

【中文标题】实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)【英文标题】:implement bounded buffer (non-block between reader and writer, block among readers, block among writers) 【发布时间】:2016-09-13 21:52:14 【问题描述】:

实现有界队列

阅读: 如果队列为空,则等待它可以返回一个超时值 如果另一个线程正在从队列中读取,则等待该线程完成 从队列中移除第一个元素并返回它 如果线程正在写入队列,请不要阻塞

写: 如果队列已满,请等待读取一个值并超时 如果另一个线程正在写入队列,请等待该线程完成 将元素写入队列末尾 如果线程正在从队列中读取,请不要阻塞

我不确定我的实现是否正确

using namespace std;

template <typename T, int N>
class BoundedBuffer 
private:
    std::array<T, N> buffer;
    int read_pos;
    int write_pos;

    std::mutex reader_mutex; //mutex for between readers
    std::mutex writer_mutex; //mutex for between writers

    std::mutex shared_mutex;
    std::condition_variable reader_queue;
    std::condition_variable writer_queue;
    int timeout; //timeout in millisecond


public:
    BoundedBuffer(const BoundedBuffer&) = delete;
    BoundedBuffer& operator=(const BoundedBuffer&) = delete;

    BoundedBuffer(int t) :
        read_pos(0),
        write_pos(0),
        timeout(t) 
    

    inline bool empty() 
        return read_pos == write_pos;
    

    inline bool full() 
        return write_pos >= read_pos + N;
    

    bool put(const T& data) 
        unique_lock<mutex> writer_lock(writer_mutex);

        
            unique_lock<mutex> shared_lock(shared_mutex);
            if (full())  //buffer full
                if (writer_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout)) ==
                    std::cv_status::timeout)
                    return false;
            
        

        buffer[write_pos%N] = data;
        write_pos++;
        reader_queue.notify_one();
        return true;
    

    pair<T, bool> get() 
        unique_lock<mutex> reader_lock(reader_mutex);

        
            unique_lock<mutex> shared_lock(shared_mutex);
            if (empty())  //buffer empty
                if (reader_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout)) ==
                    std::cv_status::timeout) 
                    T t;
                    return make_pair(t, false);
                
            
        

        pair<T, bool> result = make_pair(buffer[read_pos%N], true);
        read_pos++;
        writer_queue.notify_one();
        return result;
    
;

【问题讨论】:

【参考方案1】:

在我的代码中发现了一个错误: 在 put() 和 get() 方法中,它调用 wait_for() 并检查返回值。 如果超时,则返回 false,否则假定满足等待条件并且代码继续放置/获取数据。 如果 wait_for() 由于虚假唤醒而返回,并且当时条件不满足: put() 不满,get() 不空, 那么它将覆盖现有数据或读取错误数据。

解决方法是在 wait_for() 中使用谓词。它将忽略虚假唤醒。

    bool put(const T& data) 
            unique_lock<mutex> writer_lock(writer_mutex);

            
                    unique_lock<mutex> shared_lock(shared_mutex);
                    if (full())  //buffer full
                            if (writer_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout),
                                    [this]()  return !full(); ) == false) 
                                    return false;
                            
                    
            

            buffer[write_pos%N] = data;
            write_pos++;
            reader_queue.notify_one();
            return true;
    


    pair<T, bool> get(int i) 
            unique_lock<mutex> reader_lock(reader_mutex);
            cout << "i : " << i << endl;

            
                    unique_lock<mutex> shared_lock(shared_mutex);
                    if (empty())  //buffer empty
                            if (reader_queue.wait_for(shared_lock, std::chrono::milliseconds(timeout),
                                    [this]()  return !empty(); )==false) 
                                    T t;
                                    return make_pair(t, false);
                            
                    
            

            pair<T, bool> result = make_pair(buffer[read_pos%N], true);
            read_pos++;
            writer_queue.notify_one();
            return result;
    

【讨论】:

以上是关于RocketMQ与Dubbo之间线程之间如何阻塞和唤醒的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo的通讯方式:NIO+长连接

阻塞非阻塞同步异步

RocketMQ 端云一体化设计与实践

Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀

实现有界缓冲区(读取器和写入器之间的非阻塞,读取器之间的阻塞,写入器之间的阻塞)

dubbo调用多次服务的问题