千万级日志回放引擎设计稿#yyds干货盘点#

Posted FunTester

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了千万级日志回放引擎设计稿#yyds干货盘点#相关的知识,希望对你有一定的参考价值。

现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是Java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。

所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下Java实现日志回放功能。主要就是读了goreplay的源码以及它设计思路,用Java重现实现一遍。

这里用到了前两天分享的Disruptor高性能队列常用API演示高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。

思路

总体设计思路如下:

PS:流量递增和动态增减尚未实现,还在研究goreplay的源码。

日志拉取和解析

日志的拉取和初步解析依旧采取原来项目中的逻辑,通过SQL语句网关日志中拉取日志,并对日志内容进行初步解析,放入云OSS中,并将链接存入数据库(此步骤放在录制流量成功之后)。

PS:目前日志解析保留的有用信息只有URL

日志格式如下:

/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-

实现步骤

  • 首先将日志中有用信息(URL)以及token放到内存中
  • 通过配置host,读取URL,以及响应header(token,压测标识,常用header,模拟盘标识)组装HTTP请求。
  • 创建Disruptor对象,使用异步创建生产者
  • 通过消费者消费(发出请求)消息(HTTP请求对象),达到HTTP接口日志流量回复功能。

性能指标

  • 本机6C16G配置测试数据

  • 实测1千万URL读取速度约为9s ~ 13s,内存无压力,如果后续更大日志量需求,可以通过stream方式异步读取日志,实测日志读取速度在80万/s以上,满足目前需求。
  • 单生产者速度25万QPS
  • 单机测试QPS  8.8万,CPU跑满,触及物理极限,此数据与之前工具对比压测差异不大。

风险

  • 消费者异步对消息进行存储,超过一定数量将会丢弃消息。这个问题在消费者速度小于生产者速度时会触发。
  • 消费者数量需要在启动前设定,如果参数设置不合理,会导致消费者压力瓶颈,无法动态增加消费者。

PS:这些风险后续会逐个解决。

代码实现

生产者Demo:

def ft = 
    output("创建线程")
    fun 
        int i = 0
        while (key) 
            def url = logs.get(i % logs.size())
            def get = getHttpGet(HOST + url)
            get.addHeader("token", tokens.get(i % tokens.size()))
            get.addHeader(HttpClientConstant.USER_AGENT)
            ringBuffer.publishEvent e, s ->
                e.setRequest(get)
            
            i++
        
    

ft()

读取文件代码

/**
 * 通过闭包传入方法读取超大文件部分内容
 *
 * @param filePath
 * @param function
 * @return
 */
public static List<String> readByLine(String filePath, Function<String, String> function) 
    if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())
        ParamException.fail("文件信息错误!" + filePath);
    logger.debug("读取文件名:", filePath);
    List<String> lines = new ArrayList<>();
    File file = new File(filePath);
    if (file.isFile() && file.exists())  // 判断文件是否存在
        try (FileInputStream fileInputStream = new FileInputStream(file);
             InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);
             BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) 
            String line = null;
            while ((line = bufferedReader.readLine()) != null) 
                String apply = function.apply(line);
                if (StringUtils.isNotBlank(apply)) lines.add(apply);
            
         catch (Exception e) 
            logger.warn("读取文件内容出错", e);
        
     else 
        logger.warn("找不到指定的文件:", filePath);
    
    return lines;

演示Demo

package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.httpclient.ClientManage
import com.funtester.httpclient.FunLibrary
import com.funtester.utils.ArgsUtil
import com.funtester.utils.RWUtil
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase
import org.junit.platform.commons.util.StringUtils

import java.util.concurrent.LinkedBlockingDeque
import java.util.function.Function

class ReplayTest extends FunLibrary 

    static String url = "http://localhost:12345/test";

    static HttpGet httpGet = getHttpGet(url);

    //    static LinkedBlockingQueue<HttpRequestBase> requests = new LinkedBlockingQueue<>()

    static def HOST = "http://localhost:12345"

    static def key = true

    static Disruptor<RequestEvent> disruptor

    public static void main(String[] args) 
        def logfile = "/Users/oker/Desktop/log.csv"
        //        def logfile = "/Users/oker/Desktop/fun.csv"
        //1千万日志
        def tokenfile = "/Users/oker/Desktop/token.csv"
        //2万用户token
        List<String> logs = RWUtil.readByLine(logfile, new Function<String, String>() 

            @Override
            String apply(String s) 
                return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null
            
        );
        List<String> tokens = RWUtil.readByLine(tokenfile, new Function<String, String>() 

            @Override
            String apply(String s) 
                return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null
            
        );

        output("总计 $formatLong(logs.size()) 条日志")
        disruptor = new Disruptor<RequestEvent>(
                RequestEvent::new,
                512 * 512,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();

        def ft = 
            output("创建线程")
            fun 
                int i = 0
                while (key) 
                    def url = logs.get(i % logs.size())
                    def get = getHttpGet(HOST + url)
                    get.addHeader("token", tokens.get(i % tokens.size()))
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    ringBuffer.publishEvent e, s ->
                        e.setRequest(get)
                    
                    i++
                
            
        
        ft()
        disruptor.handleEventsWith(new FunTester(10))
        //        5.times ft()

        //下面开始测试
        ClientManage.init(10, 5, 0, "", 0)
        def util = new ArgsUtil(args)
        def thread = util.getIntOrdefault(0, 20)
        def times = util.getIntOrdefault(1, 60000)
        RUNUP_TIME = util.getIntOrdefault(2, 0)
        def tasks = []
        thread.times 
            def tester = new FunTester(times)
            disruptor.handleEventsWith(tester);
            tasks << tester
        
        disruptor.start();
        new Concurrent(tasks, "这是千万级日志回放演示Demo").start()

    

    private static class FunTester extends FixedThread implements EventHandler<RequestEvent>, WorkHandler<RequestEvent> 

        LinkedBlockingDeque<HttpRequestBase> reqs = new LinkedBlockingDeque<HttpRequestBase>()

        FunTester(int limit) 
            super(null, limit, true)
        

        @Override
        protected void doing() throws Exception 
            FunLibrary.executeOnly(reqs.take())
        

        @Override
        FixedThread clone() 
            return new FunTester(limit)
        

        @Override
        protected void after() 
            super.after()
            key = false
            disruptor.shutdown()
        

        @Override
        void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception 
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        

        @Override
        void onEvent(RequestEvent event) throws Exception 
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        
    

    private static class RequestEvent 

        HttpRequestBase request;

        public HttpRequestBase getRequest() 
            return request;
        

        public void setRequest(HttpRequestBase request) 
            this.request = request;
        

    

PS:这里用到了多个group,原因在设计稿中标记了。

Have Fun ~ Tester !

以上是关于千万级日志回放引擎设计稿#yyds干货盘点#的主要内容,如果未能解决你的问题,请参考以下文章

Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#

高性能队列Disruptor在测试中应用#yyds干货盘点#

哈希算法篇 - 布隆过滤器#yyds干货盘点#

#yyds干货盘点#千万别再用了,这些加密算法

#yyds干货盘点# 设计模式之代理模式:静态代理

Redis | 第10章 二进制数组慢查询日志和监视器《Redis设计与实现》#yyds干货盘点#