Java&Go高性能队列之LinkedBlockingQueue性能测试#yyds干货盘点#

Posted FunTester

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java&Go高性能队列之LinkedBlockingQueue性能测试#yyds干货盘点#相关的知识,希望对你有一定的参考价值。

在写完高性能队列Disruptor在测试中应用千万级日志回放引擎设计稿之后,我就一直在准备Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。

测试场景设计的思路参考的两个方面:

  • 消息体大小,我用的不同大小GET请求区分
  • 生产者和消费者线程数,Go语言中称协程goroutine

PS:后续的文章中,Go语言文章中如果出现线程,均指goroutine。

结论

总体来说,java.util.concurrent.LinkedBlockingQueue性能还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来三点比较通用的参考:

  • 消息体尽可能小
  • 线程数增益有限
  • 尽量避免消息积压

简介

首先介绍一下第一个被测试的对象java.util.concurrent.LinkedBlockingQueue,分解名字可以得到这是个由链表实现的阻塞单向的对象。官方给的定义是:

在我查到的几种JDK自带的队列实现类中,java.util.concurrent.LinkedBlockingQueue性能是最高的,还有一个候选的类java.util.concurrent.ArrayBlockingQueue,资料说java.util.concurrent.LinkedBlockingQueue性能大概是java.util.concurrent.ArrayBlockingQueue性能的2 ~ 3倍,差距过于明显,这个有机会再来测试。

测试结果

这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。

数据说明

这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。

小对象:

def get = new HttpGet()

中对象:

def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

大对象:

def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

生产者

对象大小 队列长度 (百万) 线程数 速率(/ms)
1 1 838
1 5 837
1 10 823
5 1 483
10 1 450
1 1 301
1 5 322
1 10 320
1 20 271
5 1 失败
10 1 失败
0.5 1 351
0.5 5 375
1 1 214
1 5 240
1 10 241
0.5 1 209
0.5 5 250
0.5 10 246
0.2 1 217
0.2 5 309
0.2 10 321
0.2 20 243

中间两次测试失败,是因为等待时间太长了,进行到300万左右开始停滞,所以放弃了。

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 长度保持在十万量级
  2. 生产者线程数5-10线程
  3. 消息体尽可能小

消费者

对象大小 队列长度 (百万) 线程数 速率(/ms)
1 1 1893
1 5 1706
1 10 1594
1 20 1672
2 1 2544
2 5 2024
5 1 3419
1 1 1897
1 5 1485
1 10 1345
1 20 1430
2 1 2971
2 5 1576
1 1 1980
1 5 1623
1 10 1689
0.5 1 1136
0.5 5 1096
0.5 10 1072

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 数据上看长度越长越好
  2. 消费者线程越少越好
  3. 消息体尽可能小

这里跟生产者标准有点不一样,基本上就是锁的竞争越少越好,测试消息数越多越好(这个工作中暂时用不到)。

生产者 & 消费者

这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。

对象大小 次数 (百万) 线程数 队列长度 (百万) 速率(/ms)
1 1 0.1 1326
1 1 0.2 1050
1 1 0.5 1054
1 5 0.1 1091
1 10 0.1 1128
2 1 0.1 1798
2 1 0.2 1122
2 5 0.2 946
5 5 0.1 1079
5 10 0.1 1179
1 1 0.1 632
1 1 0.2 664
1 5 0.2 718
1 10 0.2 683
2 1 0.2 675
2 5 0.2 735
2 10 0.2 788
2 15 0.2 828
1 1 0.1 505
1 1 0.2 558
1 5 0.2 609
1 10 0.2 496
2 1 0.2 523
2 5 0.2 759
2 10 0.2 668

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 消息队列积累消息越少,速率越快
  2. 消费速率随时间推移越来越快,不明显
  3. 消息体尽可能小

测试用例

测试用例使用Groovy语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。所以这个用例对于Java同学来讲可能有点看着熟悉,仔细阅读起来有点费劲,我会尽量写一些注释。大家可以把终点放在测试结果上,这可以对以后大家使用java.util.concurrent.LinkedBlockingQueue类有个基本的参考。

测试用例会根据上述的测试场景进行微调,例如线程数、消息体对象的大小等等,这个我会着重进行三种用例场景的测试。当然在工作中使用场景肯定比我提到的三种复杂多,各位有兴趣可以自己亲自上手测试,这里我就不班门弄斧了。

生产者场景

package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger

class QueueT extends SourceCode 

    static AtomicInteger index = new AtomicInteger(0)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 1

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) 

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def start = Time.getTimeStamp()
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def barrier = new CyclicBarrier(threadNum + 1)
        def funtester = //创建异步闭包的方法
            fun 
                barrier.await()
                while (true) 
                    if (index.getAndIncrement() % piece == 0) 
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("$formatLong(index.get())添加总消耗$formatLong(l)")
                        start = Time.getTimeStamp()
                    
                    if (index.get() > total) break

                    def get = new HttpGet(url)
                    get.addHeader("token",token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.put(get)
                
                latch.countDown()
            
        
        threadNum.times funtester()
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率$total / (et - st)")
        outRGB(CountUtil.index(ts).toString())
    


消费者场景

package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueTconsume extends SourceCode 

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 5

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) 

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
        def pwait = new CountDownLatch(10)
        def produces = 
            fun 
                while (true) 
                    if (linkedQ.size() > total) break
                    def get = new HttpGet(url)
                    get.addHeader("token", token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.add(get)
                
                pwait.countDown()
            
        
        10.times produces()
        pwait.await()
        outRGB("数据构造完成!$linkedQ.size()")

        def start = Time.getTimeStamp()
        def barrier = new CyclicBarrier(threadNum + 1 )
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def funtester = 
            fun 
                barrier.await()
                while (true) 
                    if (index.getAndIncrement() % piece == 0) 
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("$formatLong(index.get())消费总消耗$formatLong(l)")
                        start = Time.getTimeStamp()
                    
                    def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS)
                    if (poll == null) break
                
                latch.countDown()
            
        
        threadNum.times funtester()
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率$total / (et - st)")
        outRGB(CountUtil.index(ts).toString())
    


生产者 & 消费者 场景

这里我引入了另外一个变量:初始队列长度length,用例运行之前将队列按照这个长度进行单线程填充。

package com.funtest.groovytest

import com.funtester.frame.SourceCode
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueBoth extends SourceCode 

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 500_0000

    static int length = 50_0000

    static int threadNum = 5

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) 
        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def latch = new CountDownLatch(threadNum * 2)
        def barrier = new CyclicBarrier(threadNum * 2 + 1)
        def ts = []
        def funtester = f ->
            
                fun 
                    barrier.await()
                    while (true) 
                        if (index.getAndIncrement() > total) break
                        f()
                    
                    latch.countDown()
                
            
        
        def produces =  
            def get = new HttpGet(url)
            get.addHeader("token", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)
            linkedQ.put(get)
        
        length.times produces()

        threadNum.times 
            funtester produces
            funtester linkedQ.poll(100, TimeUnit.MILLISECONDS)
        
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率$total / (et - st) / 2")
    


补充

性能非常不稳定

其中有两个问题需要补充说明,java.util.concurrent.LinkedBlockingQueue性能在测试过程中非常不稳定,我每次打印日志以1/10为节点打印时间戳,下面分享一些在队列长度100万时,生产者模式中的日志:

INFO-> 23.731 F-2  107,942添加总消耗523
INFO-> 23.897 F-10 200,061添加总消耗165
INFO-> 24.137 F-9  300,024添加总消耗239
INFO-> 24.320 F-2  400,037添加总消耗182
INFO-> 25.200 F-5  500,065添加总消耗879
INFO-> 25.411 F-2  600,094添加总消耗211
INFO-> 25.604 F-8  700,090添加总消耗193
INFO-> 26.868 F-1  800,047添加总消耗1,264
INFO-> 26.927 F-4  900,053添加总消耗57
INFO-> 28.454 F-3  1,000,009添加总消耗1,527
INFO-> 28.457 main 每毫秒速率190.0779319521
INFO-> 28.476 main 平均值:524.0 ,最大值1527.0 ,最小值:57.0 ,中位数:239.0 p99:1527.0 p95:1527.0

INFO-> 43.930 F-10 112,384添加总消耗385
INFO-> 44.072 F-9  200,159添加总消耗140
INFO-> 44.296 F-1  300,058添加总消耗223
INFO-> 44.445 F-7  400,075添加总消耗149
INFO-> 45.311 F-10 500,086添加总消耗866
INFO-> 45.498 F-8  600,080添加总消耗187
INFO-> 45.700 F-1  700,088添加总消耗202
INFO-> 45.760 F-9  800,057添加总消耗59
INFO-> 47.245 F-6  900,095添加总消耗1,485
INFO-> 47.303 F-6  1,000,009添加总消耗58
INFO-> 47.305 main 每毫秒速率262.7430373095
INFO-> 47.320 main 平均值:375.4 ,最大值1485.0 ,最小值:58.0 ,中位数:202.0 p99:1485.0 p95:1485.0

INFO-> 00.916 F-1  100,000添加总消耗568
INFO-> 01.269 F-1  200,000添加总消耗353
INFO-> 01.461 F-1  300,000添加总消耗192
INFO-> 01.635 F-1  400,000添加总消耗174
INFO-> 02.536 F-1  500,000添加总消耗899
INFO-> 02.777 F-1  600,000添加总消耗240
INFO-> 03.015 F-1  700,000添加总消耗237
INFO-> 03.107 F-1  800,000添加总消耗91
INFO-> 04.519 F-1  900,000添加总消耗1,412
INFO-> 05.940 F-1  1,000,000添加总消耗96
INFO-> 05.943 main 每毫秒速率184.5358922310
INFO-> 05.959 main 平均值:426.2 ,最大值1412.0 ,最小值:91.0 ,中位数:240.0 p99:1412.0 p95:1412.0

可以看出最大值最小值能相差十几倍,甚至二十几倍,这种情况随着消息队列总长度增长而增长,大多数发生在80万 ~ 100万阶段,如果将长度降低到50万,这种情况就会得到明显改善。所以还有一个附加观点:消息队列长度应当尽可能少一些。

基准测试

下面是我使用FunTester性能测试框架对三种消息对象的生产代码进行的测试结果。

测试对象 线程数 个数(百万) 速率(/ms)
1 1 5681
5 1 8010
5 5 15105
1 1 1287
5 1 2329
5 5 4176
1 1 807
5 1 2084
5 5 3185

测试用例如下:

package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.httpclient.FunLibrary
import org.apache.http.client.methods.HttpGet

class TTT extends FunLibrary 

    static int total = 100_0000

    static int thread = 1

    static int times = total / thread

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) 
        RUNUP_TIME = 0
        def tasks = []
        thread.times tasks << new FunTester(times)
        new Concurrent(tasks,"测试生产者代码性能").start()

    

    private static class FunTester extends FixedThread 

        FunTester(int limit) 
            super(null, limit, true)
        

        @Override
        protected void doing() throws Exception 
//            def get = new HttpGet()

//            def get = new HttpGet(url)
//            get.addHeader("token", token)
//            get.addHeader(HttpClientConstant.USER_AGENT)
//            get.addHeader(HttpClientConstant.CONNECTION)

            def get = new HttpGet(url + token)
            get.addHeader("token", token)
            get.addHeader("token1", token)
            get.addHeader("token5", token)
            get.addHeader("token4", token)
            get.addHeader("token3", token)
            get.addHeader("token2", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)

        

        @Override
        FixedThread clone() 
            return new FunTester(limit)
        
    


Have Fun ~ Tester !

以上是关于Java&Go高性能队列之LinkedBlockingQueue性能测试#yyds干货盘点#的主要内容,如果未能解决你的问题,请参考以下文章

高性能消息队列之nsq

高性能队列Disruptor在测试中应用#yyds干货盘点#

第五节 Go数据结构之队列

Go 容器之队列的几种实现方式

高性能无锁队列,代码注释

Java常用消息队列原理介绍及性能对比