美团一面:如何实现一个100W ops 生产者消费者程序?

Posted 疯狂创客圈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了美团一面:如何实现一个100W ops 生产者消费者程序?相关的知识,希望对你有一定的参考价值。

说在前面

在40岁老架构师 尼恩的读者社群(50+)中,最近有小伙伴拿到了一线互联网企业如极兔、有赞、希音、百度、网易的面试资格,遇到一几个很重要的面试题:

如何设计一个100W ops 生产者、消费者程序?

与之类似的、其他小伙伴遇到过的问题还有:

手写一个 生产者、消费者程序?

设计一个 高性能的 生产者、消费者程序?

这里尼恩给大家做一下系统化、体系化的线程池梳理,使得大家可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”

也一并把这个题目以及参考答案,收入咱们的 《尼恩Java面试宝典 PDF》V61版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。

注:本文以 PDF 持续更新,最新尼恩 架构笔记、面试题 的PDF文件,请从这里获取:码云或者语雀

最佳答案至少要包括以下5个版本:

  • 版本1:不安全的生产者-消费者模式版本
  • 版本2:使用 内置锁实现的 生产者-消费者模式版本
  • 版本3:使用信号量实现(Semaphore)
  • 版本4:使用Blockingqueue 实现
  • 版本5:无锁实现生产者-消费者模式版本

什么是生产者-消费者模式

首先,来看什么是生产者-消费者问题?

生产者-消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Bounded-Buffer Problem),是一个多线程同步问题的经典案例。

生产者-消费者问题描述了两个访问共享缓冲区的线程,即生产者线程和消费者线程,在实际运行时会发生的问题。生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。消费者线程的主要功能是从缓冲区提取(或消耗)数据。

生产者―消费者问题关键是:

1)保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中为空时消耗数据。

2)保证在生产者加入过程、消费者消耗过程中,不会产生错误的数据和行为。

生产者-消费者问题不仅仅是一个多线程同步问题的经典案例,而且业内已经将解决该问题的方案,抽象成为了一种设计模式——“生产者-消费者”模式。

现在,来看看什么是生产者-消费者模式?

生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向数据缓冲区(DataBuffer)加入数据,消费者线程则从DataBuffer消耗数据。生产者和消费者、内存缓冲区之间的关系结构图如下:

生产者-消费者模式中,有4个关键点:

(1)生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。

(2)数据缓冲区是有容量上限的。数据缓冲区满后,生产者不能再加入数据;DataBuffer空时,消费者不能再取出数据。

(3)数据缓冲区是线程安全的。在并发操作数据区的过程中,不能出现数据不一致情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况。

(4)生产者或者消费者线程在空闲时,需要尽可能阻塞而不是执行无效的空操作,尽量节约CPU资源。

面试题:如何实现一个100W ops 生产者、消费者程序?

尼恩提示,遇到这样的面试题,我们可以从基础的版本开始,一步一步进行性能优化。

  • 版本1:不安全的生产者-消费者模式版本
  • 版本2:使用 内置锁实现的 生产者-消费者模式版本
    顺便说说,锁的代价
  • 版本3:使用 内置锁实现的 生产者-消费者模式版本

版本1:不安全的生产者-消费者模式版本

根据生产者―消费者模式的结构图和描述先来实现一个非线程安全版本,包含了:

  • 数据缓冲区(DataBuffer)类、
  • 生产者(Producer)类、
  • 消费者(Consumer)类。

首先定义其数据缓冲区类,具体的代码如下:

//共享数据区,类定义
class NotSafeDataBuffer<T> 
    public static final int MAX_AMOUNT = 10;
    //保存具体数据元素
    private List<T> dataList = new LinkedList<>();

    //保存元素数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public void add(T element) throws Exception 
        if (amount.get() > MAX_AMOUNT) 
            Print.tcfo("队列已经满了!");
            return;
        
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) 
            throw new Exception(amount + "!=" + dataList.size());
        
    

    /**
     * 从数据区取出一个元素
     */
    public T fetch() throws Exception 
        if (amount.get() <= 0) 
            Print.tcfo("队列已经空了!");
            return null;
        
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) 
            throw new Exception(amount + "!=" + dataList.size());
        
        return element;
    

上面的代码:

  • 在add()实例方法中,加入元素之前首先会对amount是否达到上限进行判断,如果数据区满了,则不能加入数据;
  • 在fetch()实例方法中,消耗元素前首先会对amount是否大于零进行判断,如果数据区空了,就不能取出数据。

生产者-消费者模式中,数据缓冲区(DataBuffer)类以及相应的生产、消费动作(Action)是可变的,生产者类、消费者类的执行逻辑是不同的,

那本着“分离变与不变”的软件设计基本原则,可以将生产者类、消费者类与具体的生产、消费Action解耦,

从而使得生产者类、消费者类的代码在后续可以复用,生产者、消费者逻辑与对应Action解耦后的类结构图如下:

通用Producer类组合了一个Callable类型的成员action实例,代表了生产数据所需要执行的实际动作,需要在构造Producer实例时传入。

通用生产者类的代码具体如下:

/**
 * 生产者任务的定义
 * Created by 尼恩@疯狂创客圈.  源码来自 《Java高并发核心编程 卷2 加强版》
 */
public class Producer implements Runnable 
    //生产的时间间隔,产一次等待的时间,默认为200ms
    public static final int PRODUCE_GAP = 200;

    //总次数
    // 注意:
    // 不是单个的次数
    // 是所有生产者的总的生产次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;

    //生产的动作
    Callable action = null;

    int gap = PRODUCE_GAP;

    public Producer(Callable action, int gap) 
        this.action = action;
        this.gap = gap;
        if (this.gap <= 0) 
            this.gap = PRODUCE_GAP;
        
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    

    public Producer(Callable action) 
        this.action = action;
        this.gap = PRODUCE_GAP;
        name = "生产者-" + PRODUCER_NO.incrementAndGet();

    

    @Override
    public void run() 
        while (true) 

            try 
                //执行生产动作
                Object out = action.call();
                //输出生产的结果
                if (null != out) 
                    Print.tcfo("第" + TURN.get() + "轮生产:" + out);
                
                //每一轮生产之后,稍微等待一下
                sleepMilliSeconds(gap);

                //增加生产轮次
                TURN.incrementAndGet();

             catch (Exception e) 
                e.printStackTrace();
            
        
    

通用Consumer类也组合了一个Callable类型的成员action实例,代表了消费者所需要执行的实际消耗动作,需要在构造Consumer实例时传入。

通用Consumer类的代码具体如下:

/**
 * 消费者任务的定义
 * Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》
 */
public class Consumer implements Runnable 

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费总次数
    // 注意:
    // 不是单个消费者的次数
    // 是所有消费者的总的消费次数
    static final AtomicInteger TURN = new AtomicInteger(0);

    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;

    //消费的动作
    Callable action = null;

    //消费一次等待的时间,默认为1000ms
    int gap = CONSUME_GAP;

    public Consumer(Callable action, int gap) 
        this.action = action;
        this.gap = gap;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();

    

    public Consumer(Callable action) 
        this.action = action;
        this.gap = gap;
        this.gap = CONSUME_GAP;
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    

    @Override
    public void run() 
        while (true) 
            //增加消费次数
            TURN.incrementAndGet();
            try 
                //执行消费动作
                Object out = action.call();
                if (null != out) 
                    Print.tcfo("第" + TURN.get() + "轮消费:" + out);
                
                //每一轮消费之后,稍微等待一下
                sleepMilliSeconds(gap);
             catch (Exception e) 
                e.printStackTrace();
            
        
    

在完成了数据缓冲区类的定义、生产者类定义、消费者类的定义之后,

接下来定义一下数据缓冲区实例、生产动作和消费动作,具体的代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class NotSafePetStore 
    //共享数据区,实例对象
    private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();

    //生产者执行的动作
    static Callable<IGoods> produceAction = () ->
    
        //首先生成一个随机的商品
        IGoods goods = Goods.produceOne();
        //将商品加上共享数据区
        try 
            notSafeDataBuffer.add(goods);
         catch (Exception e) 
            e.printStackTrace();
        
        return goods;
    ;
    //消费者执行的动作
    static Callable<IGoods> consumerAction = () ->
    
        // 从PetStore获取商品
        IGoods goods = null;
        try 
            goods = notSafeDataBuffer.fetch();

         catch (Exception e) 
            e.printStackTrace();
        
        return goods;
    ;

利用以上NotSafePetStore类所定义的三个静态成员,可以快速组装出一个简单的生产者-消费者模式的Java实现版本,具体的代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public static void main(String[] args) throws InterruptedException 
    System.setErr(System.out);

    // 同时并发执行的线程数
    final int THREAD_TOTAL = 20;
    //线程池,用于多线程模拟测试
    ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
    for (int i = 0; i < 5; i++) 
        //生产者线程每生产一个商品,间隔500ms
        threadPool.submit(new Producer(produceAction, 500));
        //消费者线程每消费一个商品,间隔1500ms
        threadPool.submit(new Consumer(consumerAction, 1500));
    

在NotSafePetStore的main()方法中,利用for循环向线程池提交了5个生产者线程和5个消费者实例。

每个生产者实例生产一个商品间隔500毫秒;消费者实例每消费一个商品间隔1500毫秒;

也就是说,生产的速度大于消费的速度。

执行结果如下:

从以上异常可以看出,在向数据缓冲区进行元素的增加或者提取时,多个线程在并发执行对amount、dataList两个成员操作时次序已经混乱,导致了数据不一致和线程安全问题。

版本2:使用 内置锁实现的 生产者-消费者模式版本

前面说了,为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。先看 内置锁实现的 生产者-消费者模式版本

synchronized+wait()+notify() 的实现方式

使用synchronized解决生产者和消费者模式,首先我们需要找出临界区资源和临界区代码块。

首先,我们来看下什么是临界区资源。临界区资源表示一种可以被多个线程使用的公共资源或共享数据,但是每一次只能有一个线程使用它。一旦临界区资源被占用,想使用该资源的其他线程则必须等待。在并发情况下,临界区资源是受保护的对象。

接下来,我们再来看下什么是临界区代码块。临界区代码段(Critical Section)是每个线程中访问临界资源的那段代码,多个线程必须互斥地对临界区资源进行访问。线程进入临界区代码段之前,必须在进入区申请资源,申请成功之后进行临界区代码段,执行完成之后释放资源。临界区代码段的进入和退出如下:

最后,我们来看下竟态条件(Race Conditions)可能是由于在访问临界区代码段时没有互斥地访问而导致的特殊情况。

如果多个线程在临界区代码段的并发执行结果可能因为代码的执行顺序不同而出现不同的结果,我们就说这时在临界区出现了竞态条件问题。

那咱们回过头来看生产者-消费者模式, 这个模式中, 生产者和消费者都需要操作DataBuffer(数据缓冲区)中,可以知道,临界区代码段在DataBuffer(数据缓冲区)中。

在数据缓冲区中,主要是数据进行操作, 那么 由两个临界区资源,分别是amount和dataList。

由生产者-消费者模式的关键点我们可知, 生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。

那么添加数据和消耗数据是临界区代码,即其add()和fetch()两个方法。

那么创建建一个安全的数据缓存区类SafeDataBuffer类,在其add()和fetch()两个实例方法的public声明后面加上synchronized关键字即可。那线程安全的SafeDataBuffer类代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

//共享数据区,类定义
class SafeDataBuffer<T> 
    public static final int MAX_AMOUNT = 10;
    private List<T> dataList = new LinkedList<>();

    //保存数量
    private AtomicInteger amount = new AtomicInteger(0);

    /**
     * 向数据区增加一个元素
     */
    public synchronized void add(T element) throws Exception 
        if (amount.get() > MAX_AMOUNT) 
            Print.tcfo("队列已经满了!");
            return;
        
        dataList.add(element);
        Print.tcfo(element + "");
        amount.incrementAndGet();

        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) 
            throw new Exception(amount + "!=" + dataList.size());
        
    

    /**
     * 从数据区取出一个元素
     */
    public synchronized T fetch() throws Exception 
        if (amount.get() <= 0) 
            Print.tcfo("队列已经空了!");
            return null;
        
        T element = dataList.remove(0);
        Print.tcfo(element + "");
        amount.decrementAndGet();
        //如果数据不一致,抛出异常
        if (amount.get() != dataList.size()) 
            throw new Exception(amount + "!=" + dataList.size());
        
        return element;
    

由于其他的代码没有发生变化,我们执行看下结果:

运行这个线程安全的生产者-消费者模式的实现版本,

等待一段时间,之前出现的amount数量和dataList的长度不相等的受检异常没有再抛出;

另外,之前出现的数据不一致情况以及线程安全问题也被完全解除。

目前的SafeDataBuffer类中,还存在一个性能的问题:消费者每一轮消费,不管数据区是否为空,都需要进行数据区的询问和判断。

循环的代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

/**
 * 从数据区取出一个元素
 */
public synchronized T fetch() throws Exception 
    if (amount.get() <= 0) 
        Print.tcfo("队列已经空了!");
        return null;
    
    ....

当数据区空时(amount <= 0),消费者无法取出数据,但是仍然做一个无用的数据区询问工作,白白耗费了CPU的时间片

对于生产者来说,也存在类似的无效轮询问题。

当数据区满时,生产者无法加入数据,这时候生产者执行add(T element)方法也白白耗费了CPU的时间片。


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

/**
 * 向数据区增加一个元素
 */
public synchronized void add(T element) throws Exception 
    if (amount.get() > MAX_AMOUNT) 
        Print.tcfo("队列已经满了!");
        return;
    
    ....

在生产者或者消费者空闲时节约CPU时间片,免去巨大的CPU资源浪费的方法是使用“等待-通知”方式进行生产者与消费者之间的线程通信。

具体实现:

(1)在数据区满(amount.get() > MAX_AMOUNT)时,可以让生产者等待,等到下次数据区中可以加入数据时,给生产者发通知,让生产者唤醒。

(2)在数据区空(amount <= 0)时,可以让消费者等待,等到下次数据区中可以取出数据时,消费者才能被唤醒。

(3)可以在消费者取出一个数据后,由消费者去唤醒等待的生产者。

(4)可以在生产者加入一个数据后,由生产者去唤醒等待的消费者。

Java语言中“等待-通知”方式的线程间的通信使用对象的wait()、notify()两类方法来实现。

每个Java对象都有wait()、notify()两类实例方法,并且wait()、notify()方法和对象的监视器是紧密相关的。

Java对象中的wait()、notify()两类方法就如同信号开关,用来进行等待方和通知方之间的交互。

对象的wait()方法的主要作用是让当前线程阻塞并等待被唤醒。wait()方法与对象监视器紧密相关,使用wait()方法时也一定需要放在同步块中。

wait()方法的调用方法如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

synchronized(locko)

    //同步保护的代码块
    locko.wait();
        ...

对象的notify()方法的主要作用是唤醒在等待的线程。notify()方法与对象监视器紧密相关,使用notify()方法时也需要放在同步块中。notify()方法的调用方法如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

synchronized(locko)

    //同步保护的代码块
    locko.notify();
        ...

为了避免空轮询导致CPU时间片浪费,提高生产者-消费者实现版本的性能,接下来演示使用“等待-通知”的方式在生产者与消费者之间进行线程间通信。

使用“等待-通知”机制通信的生产者-消费者实现版本定义三个同步对象,具体如下:

(1)LOCK_OBJECT:用于临界区同步,临界区资源为数据缓冲区的dataList变量和amount 变量。

(2)NOT_FULL:用于数据缓冲区的未满条件等待和通知。生产者在添加元素前,需要判断数据区是否已满,如果是,生产者进入NOT_FULL的同步区去等待被通知,只要消费者消耗一个元素,数据区就是未满的,进入NOT_FULL的同步区发送通知。

(3)NOT_EMPTY:用于数据缓冲区的非空条件等待和通知。消费者在消耗元素前需要判断数据区是否已空,如果是,消费者进入NOT_EMPTY的同步区等待被通知,只要生产者添加一个元素,数据区就是非空的,生产者会进入NOT_EMPTY的同步区发送通知。

具体代码如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class CommunicatePetStore 

    public static final int MAX_AMOUNT = 10; //数据区长度

    //共享数据区,类定义
    static class DateBuffer<T> 
        //保存数据
        private List<T> dataList = new LinkedList<>();
        //保存数量
        private volatile int amount = 0;

        private final Object LOCK_OBJECT = new Object();
        private final Object NOT_FULL = new Object();
        private final Object NOT_EMPTY = new Object();

        // 向数据区增加一个元素
        public void add(T element) throws Exception 
            synchronized (NOT_FULL) 
                while (amount >= MAX_AMOUNT) 
                    Print.tcfo("队列已经满了!");
                    //等待未满通知
                    NOT_FULL.wait();
                
            
            synchronized (LOCK_OBJECT) 

                if (amount < MAX_AMOUNT)  // 加上双重检查,模拟双检锁在单例模式中应用
                    dataList.add(element);
                    amount++;
                
            
            synchronized (NOT_EMPTY) 
                //发送未空通知
                NOT_EMPTY.notify();
            
        

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception 
            synchronized (NOT_EMPTY) 
                while (amount <= 0) 
                    Print.tcfo("队列已经空了!");
                    //等待未空通知
                    NOT_EMPTY.wait();
                
            

            T element = null;
            synchronized (LOCK_OBJECT) 
                if (amount > 0)   // 加上双重检查,模拟双检锁在单例模式中应用
                    element = dataList.remove(0);
                    amount--;
                
            

            synchronized (NOT_FULL) 
                //发送未满通知
                NOT_FULL.notify();
            
            return element;
        
    

那以上就是使用synchronized+wait+notify实现的线程安全的生产者-消费者模式。

虽然线程安全问题顺利解决,但是以上的解决方式使用了SafeDataBuffer的实例的对象锁作为同步锁,这样一来,所有的生产、消费动作在执行过程中都需要抢占同一个同步锁,最终的结果是所有的生产、消费动作都被串行化了。

而且在锁竞争激烈的情况下,synchronized锁会膨胀升级为重量级锁,严重的影响的程序的性能。

尼恩提示:

synchronized锁的膨胀底层原理,非常重要, 这部分内容可以阅读 《Java高并发核心编程 卷2 加强版》。

这里不做赘述。

版本3:使用信号量实现(Semaphore)

为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。那接下我们看下显示锁 的 核心成员之一 信号量(semaphore)实现线程安全的生产者-消费者模式。

什么是信号量?

信号量是Dijkstra在1965年提出的一种方法,它使用一个整型变量来累计唤醒次数,供以后使用。

在他的建议中引入了一个新的变量类型,称作信号量(semaphore)。

一个信号量的取值可以为0(表示没有保存下来的唤醒操作)或者正值(表示有一个或多个唤醒操作)。

Dijkstra建议设立两种操作:down和up(分别为一般化后的sleep和wakeup)。

对一个信号量执行down操作,则是检查其值是否大于0。若该值大于0,则将其减1(即用掉一个保存的唤醒信号)并继续;若该值为0,则进程将睡眠,而且此时down操作并未结束。

原子操作:所谓原子操作,是指一组相关联的操作要么都不间断地执行,要么不执行。

检查数值、修改变量值以及可能发生的睡眠操作,均作为一个单一的、不可分割的原子操作完成。保证一旦一个信号量操作开始,则在该操作完成或阻塞之前,其他进程均不允许访问该信号量。

这种原子性对于解决同步问题和避免竞争条件是绝对必要的。

up操作对信号量的值增1。

如果一个或多个进程在该信号量上睡眠,无法完成一个先前的down操作,则由系统选择其中的一个(如随机挑选)并允许该进程完成它的down操作。

于是,对一个有进程在其上睡眠的信号量执行一次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了一个。信号量的值增加1和唤醒一个进程同样也是不可分割的,不会有某个进程因执行up而阻塞,正如前面的模型中不会有进程因执行wakeup而阻塞一样。

Dijkstra论文中的信号量含义

在Dijkstra原来的论文中,他分别使用名称P和V而不是down和up,

荷兰语中,Proberen的意思是尝试,Verhogen的含义是增加或升高。

从物理上说明信号量的P、V操作的含义。

P(S)表示申请一个资源,S.value>0表示有资源可用,其值为资源的数目;S.value=0表示无资源可用;S.value<0, 则|S.value|表示S等待队列中的进程个数。

V(S)表示释放一个资源,信号量的初值应该大于等于0。P操作相当于“等待一个信号”,而V操作相当于“发送一个信号”,在实现同步过程中,V操作相当于发送一个信号说合作者已经完成了某项任务,在实现互斥过程中,V操作相当于发送一个信号说临界资源可用了。

实际上,在实现互斥时,P、V操作相当于申请资源和释放资源。

Dijkstra的解决方案使用了三个信号量:

一个称为full,用来记录充满缓冲槽数目,

一个称为empty,记录空的缓冲槽总数;

一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。

full的初值为0,empty的初值为缓冲区中槽的数目,mutex的初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称作二元信号量。如果每个进程在进入临界区前都执行down操作,并在刚刚退出时执行一个up操作,就能够实现互斥。

信号量的另一种用途是用于实现同步,信号量full和empty用来保证某种事件的顺序发生或不发生。在本例中,它们保证当缓冲区满的时候生产者停止运行,以及当缓冲区空的时候消费者停止运行。

对于无界缓冲区的生产者—消费者问题,两个进程共享一个不限大小的公共缓冲区。

由于是无界缓冲区(仓库是无界限制的),即生产者不用关心仓库是否满,只管往里面生产东西,但是消费者还是要关心仓库是否空。所以生产者不会因得不到缓冲区而被阻塞,不需要对空缓冲区进行管理,可以去掉在有界缓冲区中用来管理空缓冲区的信号量及其PV操作。

JUC中的信号量 Semaphore

在JUC中的信号量 Semaphore属于共享锁。Semaphore可以用来控制在同一时刻访问共享资源的线程数量,通过协调各个线程以保证共享资源的合理使用。Semaphore维护了一组虚拟许可,其数量可以通过构造器的参数指定。线程在访问共享资源前必须使用Semaphore的acquire()方法获得许可,如果许可数量为0,该线程就一直阻塞。线程访问完成资源后,必须使用Semaphore的release()方法释放许可。更形象的说法是:Semaphore是一个是许可管理器。

JUC包中Semaphore类的主要方法大致如下:

Semaphore类的主要方法大致如下:

(1) Semaphore(permits):构造一个Semaphore实例,初始化其管理的许可数量为permits参数值。

(2) Semaphore(permits,fair):构造一个Semaphore实例,初始化其管理的许可数量为permits参数值,以及是否以公平模式(fair参数是否为true)进行许可的发放。

Semaphore和ReentrantLock类似,Semaphore发放许可时有两种模式:公平模式和非公平模式,默认情况下使用非公平模式。

(3) availablePermits():获取Semaphore对象可用的许可数量。

(4) acquire():当前线程尝试获取Semaphore对象的一个许可。此过程是阻塞的,线程会一直等待Semaphore发放一个许可,直到发生以下任意一件事:

  • 当前线程获取了一个可用的许可。
  • 当前线程被中断,就会抛出InterruptedException异常,并停止等待,继续往下执行。

(5) acquire(permits) :当前线程尝试阻塞地获取permits个许可。此过程是阻塞的,线程会一直等待Semaphore发放permits个许可。如果没有足够的许可而当前线程被中断,就会抛出InterruptedException异常并终止阻塞。

(6) acquireUninterruptibly():当前线程尝试阻塞地获取一个许可,阻塞的过程不可中断,直到成功获取一个许可。

(7) acquireUninterruptibly(permits):当前线程尝试阻塞地获取permits个许可,阻塞的过程不可中断,直到成功获取permits个许可。

(8) tryAcquire():当前线程尝试获取一个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了一个许可,就返回true;如果当前线程没有获得许可,就返回false

(9) tryAcquire(permits):当前线程尝试获取permits个许可。此过程是非阻塞的,它只是进行一次尝试,会立即返回。如果当前线程成功获取了permits个许可,就返回true;如果当前线程没有获得permits个许可,就返回false。

(10) tryAcquire(timeout,TimeUnit):限时获取一个许可。此过程是阻塞的,会一直等待许可,直到发生以下任意一件事:

  • 当前线程获取了一个许可,则会停止等待,继续执行,并返回true。
  • 当前线程等待timeout后超时,则会停止等待,继续执行,并返回false。
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

(11) tryAcquire(permits,timeout,TimeUnit):与tryAcquire(timeout,TimeUnit)方法在逻辑上基本相同,不同之处在于:在获取许可的数量上不同,此方法用于获取permits个许可。

(12) release():当前线程释放一个可用的许可。

(13) release(permits):当前线程释放permits个可用的许可。

(14) drainPermits():当前线程获得剩余的所有可用许可。

(15) hasQueuedThreads():判断当前Semaphore对象上是否存在正在等待许可的线程。

(16) getQueueLength():获取当前Semaphore对象上正在等待许可的线程数量。

使用Semaphore实现的生产者-消费者模式

那接下来我们就看下使用Semaphore实现的生产者-消费者模式的代码,主要是针对临界区资源和临界区代码进行修改,具体修改如下:


// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class SemaphorePetStore 
    public static final int MAX_AMOUNT = 10; //数据区长度

    //共享数据区,类定义
    static class DateBuffer<T> 
        //保存数据
        private LinkedBlockingDeque<T> dataList = new LinkedBlockingDeque<>(MAX_AMOUNT);

        //保存数量
        private volatile int amount = 0;
        // 每次处理的次数
        private static final int times = 100;

        //信号量标识
        private static AtomicInteger signal = new AtomicInteger(0);


        // 向数据区增加一个元素
        public void add(T element) throws Exception 
            while (amount < times) 
                if (signal.get() >= 0 && dataList.size() == 0) 
                    synchronized (signal) 
                        //生产者: P操作 -1
                        Print.fo("生产者: P操作 -1 ");
                        signal.incrementAndGet();
                        Print.fo("生产者: 生产,放入一个对象");
                        dataList.add(element);
                        amount++;
                        //生产者: P操作 -1
                        Print.fo("生产者: V操作 +1");
                        signal.decrementAndGet();
                        Print.fo("生产者: 通知消费者,生产者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    
                 else 
                    Thread.sleep(10);
                
            
        

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception 
            T element = null;
            while (amount < times) 
                if (signal.get() <= 0 && dataList.size() > 0) 
                    synchronized (signal) 
                        //消费者: P操作 -1
                        Print.fo("消费者: P操作 -1 ");
                        signal.decrementAndGet();
                        Print.fo("消费者: 消费,取出一个对象");
                        element = dataList.take();
                        amount--;
                        //生产者: P操作 -1
                        Print.fo("消费者: V操作 +1");
                        signal.incrementAndGet();
                        Print.fo("消费者: 通知生产者,消费者阻塞");
                        signal.notifyAll();
                        // 阻塞
                        signal.wait();
                        ;
                    
                 else 
                    Thread.sleep(10);
                
            
            return element;
        
    

由于其他代码未做更改,小伙伴可参考前面的线程不安全的生产者类、消费者类以及组装生产者-消费者模式的实现。

部分执行结果如下:

版本4:使用Blockingqueue 实现

回顾前面: 为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

当然,咱们这个小节,仅仅关注有锁版本,后面的小节,再无锁编程。

咱们就挨个来看下。那接下我们看下基于 显示锁 实现的核心结构 Blockingqueue实现线程安全的生产者-消费者模式。

在多线程环境中,通过BlockingQueue(阻塞队列)可以很容易地实现多线程之间数据共享和通信。

阻塞队列与普通队列(ArrayDeque等)之间的最大不同点在于阻塞队列提供了阻塞式的添加和删除方法。

(1)阻塞添加

所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直队列元素不满时,才重新唤醒线程执行元素添加操作。

(2)阻塞删除

阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程再执行删除操作。

BlockingQueue的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,具体如下:

ArrayBlockingQueue是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组存储元素。除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识队列的头部和尾部在数组中的位置。ArrayBlockingQueue的添加和删除操作都是共用同一个锁对象,由此意味着添加和删除无法并行运行,这一点不同于LinkedBlockingQueue。ArrayBlockingQueue完全可以将添加和删除的锁分离,从而添加和删除操作完全并行。Doug Lea之所以没有这样去做,是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。

LinkedBlockingQueue是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

DelayQueue中的元素只有当其指定的延迟时间到了,才能从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。DelayQueue使用场景较少,但是相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

基于优先级的阻塞队列和DelayQueue类似,PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

相对于有缓冲的阻塞队列(如LinkedBlockingQueue)来说,SynchronousQueue少了中间缓冲区(如仓库)的环节。如果有仓库,生产者直接把商品批发给仓库,不需要关心仓库最终会将这些商品发给哪些消费者,由于仓库可以中转部分商品,总体来说有仓库进行生产和消费的吞吐量高一些。反过来说,又因为仓库的引入,使得商品从生产者到消费者中间增加了额外的交易环节,单个商品的及时响应性能可能会降低,所以对单个消息的响应要求高的场景可以使用SynchronousQueue。声明一个SynchronousQueue有两种不同的方式:公平模式和非公平模式。公平模式的SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略。非公平模式(默认情况)的SynchronousQueue采用非公平锁,同时配合一个LIFO堆栈(TransferStack内部实例)来管理多余的生产者和消费者。对于后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。

了解完阻塞队列的基本方法、主要类型之后,下面通过ArrayBlockingQueue队列实现一个生产者-消费者的案例。

具体的代码在前面的生产者和消费者实现基础上进行迭代——Consumer(消费者)和Producer(生产者)通过ArrayBlockingQueue队列获取和添加元素。其中,消费者调用了take()方法获取元素,当队列没有元素就阻塞;生产者调用put()方法添加元素,当队列满时就阻塞。通过这种方式便实现生产者-消费者模式,比直接使用等待唤醒机制或者Condition条件队列更加简单。基于ArrayBlockingQueue的生产者和消费者实现版本具体的UML类图如下:

出于“分离变与不变”的原则,此版本的Producer(生产者)、Consumer(消费者)等的逻辑不用变化,直接复用前面原的代码即可。此版本DataBuffer(共享数据区)需要变化,使用一个ArrayBlockingQueue用于缓存数据,具体的代码如下:

// Created by 尼恩@疯狂创客圈.   源码来自 《Java高并发核心编程 卷2 加强版》

public class ArrayBlockingQueuePetStore 

    public static final int MAX_AMOUNT = 10; //数据区长度


    //共享数据区,类定义
    static class DateBuffer<T> 
        //保存数据
        private ArrayBlockingQueue<T> dataList = new ArrayBlockingQueue<>(MAX_AMOUNT);


        // 向数据区增加一个元素
        public void add(T element) throws Exception 
            dataList.put(element);
        

        /**
         * 从数据区取出一个商品
         */
        public T fetch() throws Exception 
            return dataList.take();
        
    

运行程序,部分执行结果如下:

锁的代价

锁提供了互斥,并能够确保变化能够以一个确定的顺序让其它的线程看见。

锁其实是很昂贵的,因为他们在竞争的时候需要进行仲裁。这个仲裁会涉及到操作系统的上下文切换,操作系统会挂起所有在等待这把锁的线程,直到锁持有者释放该锁。

上下文切换期间,执行线程会丧失对操作系统的控制,导致执行线程的执行上下文丢失之前缓存的数据和指令集,这会给现代处理器带来严重的性能损耗。

当然效率更高的用户态锁是另一种选择,但用户锁只有在没有竞争的时候才真正会带来益处。

注:因为用户态的锁往往是通过自旋锁来实现(或者带休眠的自旋锁),而自旋在竞争激烈的时候开销是很大的(一直在消耗CPU资源)。

网上有小伙伴为了进行效果验证,写了一个很简单程序,就是调用一个循环5亿次递增操作的函数。

这个java函数在单线程,2.4G Intel Westmere EP的CPU上只需要300ms。

一旦引入锁,即使没有发生竞争,程序的执行时间也会发生显著的增加。

循环5亿次递增操作 实验结果如下:

Method Time (ms)
Single thread 300
Single thread with lock 10,000
Two threads with lock 224,000
Single thread with CAS 5,700
Two threads with CAS 30,000
Single thread with volatile write 4,700

CAS的代价

无锁编程 场景中, 线程之间的协调 主要使用 CAS的机制。 但是 从上面的实验看到, CAS也是有代价的。

为啥呢?

CAS依赖于处理器的支持,当然大部分现代处理器都支持。

CAS相对于锁是非常高效的,因为它不需要涉及内核上下文切换进行仲裁。

但cas并不是免费的,处理器需要对CPU指令pipeline加锁以确保原子性,

并且cas只保证原子性,不保证可见性,所以cas一般和 volatile内存屏障一起使用,以确保对其他线程的可见性。

尼恩备注:cas+ volatile内存屏障 的底层原理,非常重要

如果大家对 cas+ volatile内存屏障 的知识不清楚, 请细致阅读尼恩 《Java高并发核心编程 卷2 》, 这本书做了非常详细的介绍。

版本5:无锁实现生产者-消费者模式版本

回顾前面: 为了实现 线程安全的生产者-消费者模式实现,可以使用锁来实现。

  • 比较低级的办法是用内置锁,也就是synchronized+wait+notify方式解决
  • 更加高级一点的版本,使用显示锁 比如信号量(Semaphore)、Blockingqueue
  • 当然,终极版本是无锁编程 disruptor 的方式来解决,

咱们就挨个来看下无锁版本。

为了提升性能,需要使用 CAS实现生产者、消费者。

从实操的角度来说,CAS的一个问题就是太复杂了,本来用锁进行并发编程就已经很头疼了,用CAS来实现复杂逻辑就更头痛了。

但有一个好消息是,目前有一个现成的Disruptor框架,它已经帮助我们实现了这一个功能。

Disruptor框架的简单介绍

Disruptor框架有着1000W ops性能,非常复杂的底层原理,光介绍清楚这个框架,尼恩的 《Disruptor 红宝书》 PDF 电子书就有100多页。

具体的Disruptor框架底层原理,请参见尼恩的 《Disruptor 红宝书》 PDF ,和《100Wqps日志平台实操》视频。

这里仅仅对这个 框架进行简单介绍。

Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。

它使用无锁的方式实现了一个环形队列(RingBuffer),非常适合实现生产者-消费者模式,比如事件和消息的发布。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

Disruptor框架别出心裁地使用了环形队列来代替普通线形队列,这个环形队列内部实现为一个普通的数组。

对于一般的队列,势必要提供队列同步head和尾部tail两个指针,用于出队和入队,这样无疑增加了线程协作的复杂度。但如果队列是环形的,则只需要对外提供一个当前位置cursor,利用这个指针既可以进行入队操作,也可以进行出队操作。由于环形队列的缘故,队列的总大小必须事先指定,不能动态扩展。为了能够快速从一个序列(sequence)对应到数组的实际位置(每次有元素入队,序列就加1),Disruptor框架要求我们必须将数组的大小设置为2的整数次方。这样通过sequence &(queueSize-1)就能立即定位到实际的元素位置index,这比取余(%)操作快得多。

如果queueSize是2的整数次幂,则这个数字的二进制表示必然是10、100、1000、10000等形式。因此,queueSize-1的二进制则是一个全1的数字。因此它可以将sequence限定在queueSize-1范围内,并且不会有任何一位是浪费的。

RingBuffer的结构如下:

其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

RingBuffer的指针(Sequence)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据,该数据是Long类型,不用担心会爆掉。有人计算过: long的范围最大可以达到9223372036854775807,一年365 * 24 * 60 * 60 = 31536000秒,每秒产生1W条数据,也可以使用292年。

Disruptor 不像传统的队列,分为一个队头指针和一个队尾指针,而是只有一个角标(上面的seq),

在Disruptor中生产者分为单生产者和多生产者,在枚举类ProducerType中定义单生产(SINGLE)和多生产(MULTI)。而消费者并没有区分。

单生产者情况下,就是普通的生产者向RingBuffer中放置数据,消费者获取最大可消费的位置,并进行消费。单生产者线程写数据的流程比较简单,具体如下:

(1)申请写入m个元素;

(2)若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;

(3)若是返回的正确,则生产者开始写入元素。

采用多生产者时,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是又会碰到新问题:如何防止读取的时候,读到还未写的元素。那么Disruptor引入了一个跟RingBuffer同样大小的Buffer,称为AvailableBuffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。多生产者流程如下:

(1)申请写入m个元素;

(2)若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;

(3)生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

那么生产者和消费者模式在RingBuffer上的情况如下

生产者向缓冲区中写入数据,而消费者从中读取数据。生产者写入数据时,使用CAS操作,消费者读取数据时,为了防止多个消费者处理同一个数据,也使用CAS操作进行数据保护。

ArrayBlockingQueue和Disruptor 的性能PK

参考文献中,有小伙伴选取了Doug Lea的ArrayBlockingQueue的实现作为参考目标进行测试,ArrayBlockingQueue是所有有界队列中性能最好的,测试是按照阻塞的方式进行的。

  

下表展示了总共处理5亿条消息时每秒吞吐量的性能测试结果,

测试环境为:没有HT的1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core i7 860 @ 2.8 GHz ,以及Intel Core i7-2720QM, Ubuntu 11.04。

我们取了最好的前三条结果,这个结果使用于任何JVM运行环境,表中显示的结果并不是我们发现最好的结果。

Nehalem 2.8Ghz – Windows 7 SP1 64-bit Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
ABQ Disruptor ABQ Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336 4,057,453 22,381,378
Pipeline: 1P – 3C 2,128,918 16,806,157 2,006,903 15,857,913
Sequencer: 3P – 1C 5,539,531 13,403,268 2,056,118 14,540,519
Multicast: 1P – 3C 1,077,384 9,377,871 260,733 10,860,121
Diamond: 1P – 3C 2,113,941 16,143,613 2,082,725 15,295,197

无论在linux 环境在是在windows 环境, 无论 是多个生产者还是单个生产者, Disruptor 的性能稳稳的都在 1000W ops 以上。

基于Disruptor的实现100W ops+ 生产者和消费者设计

基于Disruptor的高性能生产者和消费者模式的类图如下:

MsgEven 是存放数据对象的载体,具体代码如下:

public class MsgEven 

    private IGoods goods;

    public IGoods getGoods() 
        return goods;
    

    public void setGoods(IGoods goods) 
        this.goods = goods;
    

消费者的作用是读取数据进行处理。

这里,数据的读取已经由Disruptor框架进行封装了,onEvent()方法为框架的回调方法。

因此,只需要简单地进行数据处理即可。

具体代码如下:

public class Consumer  implements EventHandler<MsgEven> 

    //消费的时间间隔,默认等待100毫秒
    public static final int CONSUME_GAP = 100;


    //消费者对象编号
    static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);

    //消费者名称
    String name;


    public Consumer() 
        name = "消费者-" + CONSUMER_NO.incrementAndGet();
    


    @Override
    public void onEvent(MsgEven msgEven, long sequence, boolean endOfBatch)  
  
        Print.tcfo("消费者中:"+sequence+"商品信息:"+msgEven.getGoods());
    

需要一个产生MsgEven 对象的工厂类GoodsFactory。

它会在Disruptor框架系统初始化时,构造所有的缓冲区中的对象实例,具体代码如下:

public class GoodsFactory implements EventFactory<MsgEven> 
    @Override
    public MsgEven newInstance() 
        return new MsgEven();
    

生产者需要一个RingBuffer的引用,也就是环形缓冲区。

它有一个重要的方法add()将产生的数据推入缓冲区。方法add()接收一个IGood对象。

add()方法的功能就是将传入的IGood对象中的数据提取出来,并装载到环形缓冲区中。

具体代码如下:

public class Produer 

    //生产者对象编号
    static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);

    //生产者名称
    String name = null;



    private  final RingBuffer<MsgEven> ringBuffer ;

    public Produer(RingBuffer<MsgEven> ringBuffer) 
        name = "生产者-" + PRODUCER_NO.incrementAndGet();
        this.ringBuffer = ringBuffer;
    

    public  void add(IGoods goods)
        // 1.ringBuffer 事件队列 下一个槽
        long sequence = ringBuffer.next();
        try 
            //2.取出空的事件队列
            MsgEven    msgEven= ringBuffer.get(sequence);
            msgEven.setGoods(goods);
            //3.获取事件队列传递的数据
            Print.cfo("生产者名称:"+name+",生产商品:"+goods.toString());
        finally 
            //4.发布事件
            ringBuffer.publish(sequence);
        
    

我们的生产者、消费者和数据都已经准备就绪,只差一个统筹规划的主函数将所有的内容整合起来。具体代码如下:

public class DisruptorPetStore 
    public static void main(String[] args) throws InterruptedException 
        // 1.创建工厂
        GoodsFactory dateBufferFactory= new GoodsFactory();
        //2.创建ringBuffer 大小,大小一定要是2的N次方
        int bufferSize=1024*1024;

        //3.创建Disruptor
        Disruptor<MsgEven>  disruptor = new Disruptor<MsgEven>(dateBufferFactory,bufferSize, Executors.defaultThreadFactory(),ProducerType.MULTI,new BlockingWaitStrategy());
        //4.设置事件处理器 即消费者
        disruptor.handleEventsWith(new Consumer());
        // 5.启动
        disruptor.start();

        // 6.创建RingBuffer容器
        RingBuffer<MsgEven> ringBuffer= disruptor.getRingBuffer();

        //7.创建生产者
        Produer produer = new Produer(ringBuffer);

        for (int l=0;true;l++)
            IGoods goods = Goods.produceOne();
            produer.add(goods);
            Thread.sleep(100);

        
    

部分执行结果如下:

学习生产者-消费者模式学习的思想, 消息队列、缓存中也有生产者-消费者模式的思想。

尼恩总结

生产者、消费者,是一道高频的面试题,非常高频,也非常考验水平。

如果按照上面的套路去作答, 无论是美团,还是华为,或者其他的大厂面试官,都会对你 献上膝盖。

如果面试过程中, 遇到什么问题,可以来 《技术自由圈》 社群交流。

作者介绍:

本文1作: 唐欢,资深架构师, 《Java 高并发核心编程 加强版 》作者之1 。

本文2作: 尼恩,40岁资深老架构师, 《Java 高并发核心编程 加强版 卷1、卷2、卷3》创世作者, 著名博主 。 《K8S学习圣经》《Docker学习圣经》等11个PDF 圣经的作者。

相关的面试题

最后,尼恩再给大家来几道相关的面试题

聊聊:如何写代码来解决生产者消费者问题?

在现实中你解决的许多线程问题都属于生产者消费者模型,就是一个线程生产任务供其它线程进行消费,你必须知道怎么进行线程间通信来解决这个问题。

比较低级的办法是用wait和notify来解决这个问题,比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型。

聊聊:什么是竟态条件?

在大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。

如果i线程存取相同的对象,并且每一个线程都调用了一个修改该对象状态的方法,将会发生什么呢?

可以想象,线程彼此踩了对方的脚。

根据线程访问数据的次序,可能会产生讹误的对象。这样的情况通常称为竞争条件。

聊聊:Java中Semaphore是什么?

Java中的Semaphore是一种新的同步类,它是一个计数信号。

从概念上讲,从概念上讲,信号量维护了一个许可集合。

如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。

每个 release()添加一个许可,从而可能释放一个正在阻塞的获取者。

但是,不使用实际的许可对象,Semaphore只对可用许可的号码进行计数,并采取相应的行动。

信号量常常用于多线程的代码中,比如数据库连接池。

聊聊:java中wait()的核心原理是什么?

1)当线程调用了locko(某个同步锁对象)的wait()方法后,JVM会将当前线程加入locko

美团一面:两个有序的数组,如何高效合并成一个有序数组?

在说这个题目之前先来说说一个排序算法 “归并算法”

归并算法采取思想是分治思想,分治思想简单说就是分而治之,将一个大问题分解为小问题,将小问题解答后合并为大问题的答案。

乍一看跟递归思想很像,确实如此,分治思想一般就是使用递归来实现的。但是需要注意的是:递归是代码实现的方式,分治属于理论。

接下来看一副图理解下:

说完它的思想:我们再来分析下时间复杂度。归并算法采用的是完全二叉树的形式。所以可以由完全二叉树的深度可以得知,整个归并排序需要进行log2n次。

然后每一次需要消耗O{n}时间。所以总的时间复杂度为o{nlog2n}。归并排序是一种比较占用内存,但是效率高且稳定的算法

贴上代码:

static void Main(string[] args) {
    int[] arr = new int[] { 14,12,15,13,11,16 ,10};

    int[] newArr = Sort(arr, new int[7], 0, arr.Length - 1);
    for (int i = 0; i < newArr.Length - 1; i++)
    {
        Console.WriteLine(newArr[i]);
    }

    Console.ReadKey();
}

public static int[] Sort(int[] arr, int[] result, int start, int end)
{
    if (start >= end)
        return null;
    int len = end - start, mid = (len >> 1) + start;
    int start1 = start, end1 = mid;
    int start2 = mid + 1, end2 = end;
    Sort(arr, result, start1, end1);
    Sort(arr, result, start2, end2);
    int k = start;
    //进行比较。注意这里++是后执行的,先取出来数组中的值然后++
    while (start1 <= end1 && start2 <= end2)
        result[k++] = arr[start1] < arr[start2] ? arr[start1++] : arr[start2++];
    //将每个分组剩余的进行复制
    while (start1 <= end1) 
        result[k++] = arr[start1++];
    //将每个分组剩余的进行复制
    while (start2 <= end2)
        result[k++] = arr[start2++]; 
    for (k = start; k <= end; k++)
        arr[k] = result[k];
    return result;
}

说完了归并算法回到题目上来 首先分析下 题目给定的是两个已经排好序的数组合并,关键字“合并”,“两个”,正好符合我们的归并算法,并且已经分类好了,只需要去合并就可以了。

来看下几张图。

蓝色的箭头表示最终选择的位置,而红色的箭头表示两个数组当前要比较的元素,比如当前是2与1比较,1比2小,所以1放到蓝色的箭头中,蓝色的箭头后移,1的箭头后移。

然后2与4比较,2比4小那么2到蓝色的箭头中,蓝色箭头后移,2后移,继续比较.......

归并思路就是这样了,最后唯一需要注意的是那个先比较完的话,那么剩下的直接不需要比较,把后面的直接移上去就可以了,这个需要提前判定一下。

贴上代码:

static void Main(string[] args) {
    int[] arr1 = new int[] { 2, 3, 6, 8 };
    int[] arr2 = new int[] { 1, 4, 5, 7 };
    int[] newArr = Sort(arr1, arr2);
    for (int i = 0; i < newArr.Length - 1; i++)
    {
        Console.WriteLine(newArr[i]);
    }

    Console.ReadKey();
}

public static int[] Sort(int[] arr1,int[] arr2)
{
    int[] newArr = new int[arr1.Length + arr2.Length];
    int i = 0, j = 0, k = 0;
    while (i < arr1.Length && j < arr2.Length)
    {
        if (arr1[i] < arr2[j])
        {

            newArr[k] = arr1[i];
            i++;
            k++;
        }
        else
        {

            newArr[k] = arr2[j];
            j++;
            k++;
        }
    }

    while (i < arr1.Length)
        newArr[k++] = arr1[i++];
    while (j < arr2.Length)
        newArr[j++] = arr2[j++];
    return newArr;
}

最后感谢一下大佬提供的思路:https://blog.csdn.net/k_koris/article/details/80508543

原文链接:https://blog.csdn.net/weixin_40097554/article/details/108656165/

版权声明:本文为CSDN博主「貂蝉要睡觉」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

近期热文推荐:

1.1,000+ 道 Java面试题及答案整理(2021最新版)

2.别在再满屏的 if/ else 了,试试策略模式,真香!!

3.卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.5 重磅发布,黑暗模式太炸了!

5.《Java开发手册(嵩山版)》最新发布,速速下载!

觉得不错,别忘了随手点赞+转发哦!

以上是关于美团一面:如何实现一个100W ops 生产者消费者程序?的主要内容,如果未能解决你的问题,请参考以下文章

美团一面:两个有序的数组,如何高效合并成一个有序数组?

美团一面:如何高效地将两个有序的数组合并成一个有序数组

美团一面:Spring Cloud 如何构建动态线程池?

美团一面:InndoDB 单表最多 2000W,为什么?小伙伴竟然面挂

美团一面:InndoDB 单表最多 2000W,为什么?小伙伴竟然面挂

美团一面:说说前后端分离权限控制设计和实现思路?