利用jvisualvm.exe搞一个关于生产者消费者的一个纠结的问题

Posted 不想下火车的人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用jvisualvm.exe搞一个关于生产者消费者的一个纠结的问题相关的知识,希望对你有一定的参考价值。

  先看代码:

package com.wlf.service;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 生产者消费者模拟
 *
 * @author wulf
 * @since 20200708
 */
public class LinkedBlockingQueueTest {
    public static void main(String[] args) {
        Puter puter = new Puter(); // 生产者放(put)对象到队列中
        Poller poller = new Poller(); // 消费者从队列中取对象(poll)

        // 消费者线程,把生产者对象加为属性,以便获取生产者队列
        Thread thread1 = new Thread(() -> {
            System.out.println("I\'m coming thread1....");
            poller.setPuter(puter);
            poller.doPoll();
        });

        // 生产者线程,把消费者线程加为属性,以便生产发动后再去发动消费者线程
        Thread thread2 = new Thread(() -> {
            System.out.println("I\'m coming thread2....");
            puter.setThread(thread1);
            try {
                puter.doPut();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 发动生产者线程
        thread2.start();
    }
}

/**
 * 生产者
 */
class Puter {

    // 解耦生产者、消费者的队列
    private Queue<Integer> linkedQueue = new LinkedList<>();
//    private Queue<Integer> linkedQueue = new LinkedBlockingQueue<>();

    // 消费者线程
    private Thread thread;

    // 生产者已经启动生产标志位,默认是启动状态
    private boolean isStarted = true;

    public Queue<Integer> getLinkedQueue() {
        return linkedQueue;
    }

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public boolean isStarted() {
        return isStarted;
    }

    public void doPut() throws InterruptedException {

        Thread.sleep(1000); // 生产者准备一下,开始生产

        // 准备完成,启动消费者线程拉取poll
        if (thread != null) {
            System.out.println("thread not null....");
            thread.start();
        }

        Thread.sleep(20); // 这里很关键,生产者还得稍微准备一下,这就让消费者先去拉取一个空队列了

        for (int i = 0; i < 10000; i++) {
            Thread.sleep(2); // 模拟生产耗时
            linkedQueue.offer(i);
            System.out.println("producer put success: " + i);
        }

        isStarted = false; // 生产结束了,告诉消费者
        System.out.println("producing over.");
    }
}

/**
 * 消费者
 */
class Poller {
    private Puter puter;

    public void setPuter(Puter puter) {
        this.puter = puter;
    }

    public void doPoll() {
        // 队列不为空,或者生产已经开始生产,那么就去消费它,拉取队列中的对象
        while (puter.getLinkedQueue().size() > 0 || puter.isStarted()) {
            Integer element = puter.getLinkedQueue().poll();

            // 如果拉取到的对象是null,跳过继续拉取
            if (element == null) {
                continue;
            }

            // 不为null,拉取成功
            System.out.println("consumer poll success: " + element);
        }

        // 消费结束了,结束流程
        System.out.println("Game is over.");
    }
}

 

  运行现象:打印出了所有生产者对象,消费者一个不打,而且进程没有结束,卡着不动

 

 

 

  这是一个生产者消费者问题,模型简单,通过一个队列LinkedList来当中间人,它传递生产者的产品给消费者消费。打个经典的比方,生产者是洗碗工,消费者是擦盘工,队列是传送带。洗碗工只管埋头洗碗,擦盘工只管埋头擦碗,如果用隔板把他们隔开来,中间只有一个传送带,他们甚至可能都不认识彼此,因为他们只认识传送带。

  现在问题不在生产者,因为它正常打印出来了,那么消费者怎么回事,它为啥就给自己放假了?其实我们误会消费者了,人家也在很努力的干活,只不过我们看不见罢了。它在干什么活?毫无意义的活,浪费CPU的资源的空转。

  接下来需要借用JDK自带的JVisualVM这个工具来看一看消费者在做啥,在java的jdk目录下bin目录里,双击jvisualvm.exe运行:

 

 

  右键点击我们的程序,点击“线程Dump”:

 

  

  这是刚开始生产者还在运行时的情况:

2020-07-08 20:37:48
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode):

"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001a9c2800 nid=0x9380 runnable [0x000000001bb6f000]
   java.lang.Thread.State: RUNNABLE
    at com.wlf.service.Poller.doPoll(LinkedBlockingQueueTest.java:108)
    at com.wlf.service.LinkedBlockingQueueTest.lambda$main$0(LinkedBlockingQueueTest.java:22)
    at com.wlf.service.LinkedBlockingQueueTest$$Lambda$1/1156060786.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x00000000034a4000 nid=0x82c8 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
    - None

"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001a380000 nid=0x4a98 waiting on condition [0x000000001ba6f000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at com.wlf.service.Puter.doPut(LinkedBlockingQueueTest.java:81)
    at com.wlf.service.LinkedBlockingQueueTest.lambda$main$1(LinkedBlockingQueueTest.java:30)
    at com.wlf.service.LinkedBlockingQueueTest$$Lambda$2/1709537756.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
    - None

"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001a128000 nid=0x5dfc runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
    - None

 

  我们看到生产者线程是“Thread-1”,消费者是“Thread-0”。生产者因为生产者过程中需要休眠两毫秒,所以我们看到它是在休眠状态中,其实它是有在干活的,只不过干活的速度相比休眠太快了,就像白驹过隙,所以我们只看到了他在睡觉的假象,还好有日志证明了他的清白。消费者则相反,我们看到它在运行状态中,好像他一直在干活,但没有任何成果,所以他其实是在划水,一直在辛苦的空转着:

while (puter.getLinkedQueue().size() > 0 || puter.isStarted()) {
            Integer element = puter.getLinkedQueue().poll();

            // 如果拉取到的对象是null,跳过继续拉取
            if (element == null) {
                continue;
            }

            // 不为null,拉取成功
            System.out.println("consumer poll success: " + element);
        }

 

  上面标黄的地方就是消费者马不停蹄的做是事情。看似勤奋的消费者,做的确实劳而无功的事情。我们可以看下拉长时间线看下线程的运行状态:  

 

   绿色的条条就是运行状态,蓝色的是休眠状态。这个上面我们的分析吻合。可是为啥消费者要偷偷划水?继续看内存堆的情况,点击“堆Dump”:

 

 

  点击左上角的“类”,再点击“实例..."右边的三角形排个序,先找到只有实例数为1的类,再找到我们的生产者和消费者类:

 

 

  这时消费者程序已经空转很久了,双击消费者类“Poller”进入实例数,我们看看消费者实例的属性,特别是工作队列的情况:

 

 

  我们可以发现,队列已经满了,第一个元素是0,最后一个9999,队列大小10000,没有任何问题。我们发现过了很久,队列依然一直是满的,令人费解的消费者,队列明明有数据,而他为何不从队列中取出对象来消费?说来话长,回头看我们的代码,罪魁祸首仍然是LinkedList队列:

Thread.sleep(20); // 这里很关键,生产者还得稍微准备一下,这就让消费者先去拉取一个空队列了

  看看LinkedList趁着生产者眯眼的这20毫秒的准备时间中,消费者做了什么?

while (puter.getLinkedQueue().size() > 0 || puter.isStarted()) {
            Integer element = puter.getLinkedQueue().poll();

            // 如果拉取到的对象是null,跳过继续拉取
            if (element == null) {
                continue;
            }

  没错,消费者先去拉取队列中的元素了,而此时生产者尚未准备好,队列只能是空的,毫无疑问,element是一个null。看看poll的源码,它很关键,消费者空转的源头:

    /**
     * Retrieves and removes the head (first element) of this list.
     *
     * @return the head of this list, or {@code null} if this list is empty
     * @since 1.5
     */
    public E poll() {
        final Node<E> f = first;
        return (f == null) ? null : unlinkFirst(f);
    }

  unlinkFirst方法我们不用看,因为此时first是null,poll返回的就是null,然后几乎没有停顿又来到poll方法,因为CPU中间没有其他事情可以做。所以unlinkFirst永远不会进去,poll方法得到的element一直是null。因为每次取到一个null就去continue,所以就死循环了,CPU只能空转。为什么?上面堆里的实例不是看到LinkedList的first是0吗?是的,但那是后来生产者线程放进去的,放进去后它通知了消费者线程了吗?并没有,不信我们去看看offer的源码,最后来到这里:

    /**
     * Links e as last element.
     */
    void linkLast(E e) {
        final Node<E> l = last;
        final Node<E> newNode = new Node<>(l, e, null);
        last = newNode;
        if (l == null)
            first = newNode;
        else
            l.next = newNode;
        size++;
        modCount++;
    }

  我们看到无论是poll还是offer,都是线程不安全的。offer方法是给first赋值了,但此时消费者是毫不知情的,它依然蒙在鼓里,还在疯狂的循环中,它根本没有一刻的闲暇来看一眼队列的最新情况,只能取到一个过去的、老去的空队列。这就是并发,生产者在往队列中放对象的同时,消费者在取对象,可以看做它们对同一个队列分别做新增和删除操作,但彼此不知道对方在做什么,因为这个队列对这两个线程来说就是一个共享资源,而且是没有加锁、没有通知的竞态资源。

  接下来我们来看一个极端:把生产者的生产数目改下,从一万改为一,可以看到同样的事情还是发生了,因为消费者先去执行,从队列里取出了一个null,接下去就是空转:

 

 

   堆内存的实例属性显示队列此时只有一个元素0: 

  

  消费者在空转:

 

 

 

  怎么破?解决并发安全性问题的通用做法很简单,使用一个线程安全的队列,比如LinkedBlockingQueue。看看人家的源码就知道了:

    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue\'s capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 

  其他不动,把生产者Puter类改一行代码:

    // 解耦生产者、消费者的队列
//    private Queue<Integer> linkedQueue = new LinkedList<>();
    private Queue<Integer> linkedQueue = new LinkedBlockingQueue<>();

 

  再次运行,结果如预期:

 

  另一种解法不够安全,但能让我们搞明白这个问题的其他变种。见利用jvisualvm.exe搞一个关于生产者消费者的另一些纠结的问题

 

  

 

以上是关于利用jvisualvm.exe搞一个关于生产者消费者的一个纠结的问题的主要内容,如果未能解决你的问题,请参考以下文章

关于Python的协程问题总结

用阻塞队列和线程池简单实现生产者和消费者场景

操作系统关于多线程同步中的死锁问题一篇文章让你彻底搞明白死锁到底是什么情况及如何解决死锁

java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现

java heap space以及jvisualvm.exe 工具

关于生产者/消费者/订阅者模式的那些事