FastAPI 源码阅读 (五) 其余主体内容

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FastAPI 源码阅读 (五) 其余主体内容相关的知识,希望对你有一定的参考价值。

参考技术A

还有一部分 security 安全相关将单独开一章,本章将 FastAPI 剩余值得读的部分过一遍

这里定义了例如Depends(),Body()等类,内容比较简单。在此将它们列出,具体用法可直接参考官方文档。

上述类的工厂函数模式,返回实例。例如:

工具类,其中大部分函数,都已经在前几篇中被提及

一个JSON解码的工具

相比 Starlette 添加了一个线程池上下文管理器

Starlette 的 UploadFile 进行了封装

加了两种异常处理

新加了四种异常

添加了ORJSONResponse, orjson 是一种现在python性能最高的json库。有需求可以指定该种response。

redis源码阅读五-为什么大量过期key会阻塞redis?

在之前的 redis的key过期了还能取出来?,在这里对key的过期策略进行了简单的分析,惰性过期和定期过期,对定期清理这块讲的并不透彻,这次来根据源码分析下。

先上下逻辑,此逻辑图根据redis官方5.0分支梳理。

定期过期-慢循环

redis源码阅读二-终于把redis的启动流程搞明白了这一文中。

我们知道redis启动会注册一个Time Event 事件,里面的执行逻辑是:serverCron 在server.c中,其中databaseCron()函数中会调用慢循环进行过期key清理。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
     updateCachedTime(1);

    //hz的默认值是10
    server.hz = server.config_hz;
    //开启动态调整
    if (server.dynamic_hz) 
        /**
         * 默认10000 clients 算出来 server.hz= 80hz,hz最大500
         */
        while (listLength(server.clients) / server.hz >MAX_CLIENTS_PER_CLOCK_TICK)
            server.hz *= 2;
            if (server.hz > CONFIG_MAX_HZ) 
                server.hz = CONFIG_MAX_HZ;
                break;
            
        
    
    /**
     * 1,处理超时key,发送过期事件并删除
     * 2,内存碎片整理
     * 3,rehash
     */
    databasesCron();


/**
 * 1,处理超时key,发送过期事件并删除
 * 2,内存碎片整理
 * 3,rehash
 */
void databasesCron(void) 
    //随机抽样处理过期key,只处理master,从库由master同步
    if (server.active_expire_enabled) 
        if (server.masterhost == NULL) 
            //慢循环
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
         else 
            expireSlaveKeys();
        
    

    /* Defrag keys gradually. */
    //碎片处理
    if (server.active_defrag_enabled)
        activeDefragCycle();

   
    //调整hash表大小,并进行rehash
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) 

    



定期过期-快循环

在每次循环前,都会有一次beforesleep的调用,在beforeSleep()函数里触发了快循环,具体代码如下:

void aeMain(aeEventLoop *eventLoop) 
    eventLoop->stop = 0;
     //只要没有停止,就循环执行,这个是主线程
    while (!eventLoop->stop) 
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    


/**
 * @brief 循环处理前执行
 * 
 * @param eventLoop 
 */
void beforeSleep(struct aeEventLoop *eventLoop) 
        //激活快循环
    if (server.active_expire_enabled && server.masterhost == NULL)
        //执行快循环
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);


使用自适应算法的过期处理activeExpireCycle

最核心的逻辑在这里,从5.0分支来看,快循环是执行不了的。但是代码从3.0到6.0都有这个逻辑。

/**
 * @brief 使用自适应算法让一些超时key 过期
 * 如果key少,使用较少的cpu时间片处理
 * 如果key多,会使用较多的cpu时间片
 * type为1,占用cpu时间片只有一毫秒,
 * type为0,占用cpu的时间片根据默认server.hz大约占用25毫秒
 *   开启server.dynamic_hz时,根据链接数算出来的刷新频率来,一万链接大概3.1毫秒,链接数越多占用时间越多
 * @param type  type有两个值,0和1
 *  ACTIVE_EXPIRE_CYCLE_SLOW 0
    ACTIVE_EXPIRE_CYCLE_FAST 1
 */
void activeExpireCycle(int type) 
    
    /**
     * @brief static 静态变量作用域是本文件,在本文件内是共享,反复进入都是用的同一个值
     */
    static unsigned int current_db = 0; /* Last DB tested. */
    /**
     * 超过时间限制会把timelimit_exit设置为1,然后退出,
     * 快循环进来就不会执行
     * 慢循环进来会把timelimit_exit重新置为0
     */
    static int timelimit_exit = 0;      /* Time limit hit in previous call? */
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    /**
     * start 开始执行时间
     * timelimit 限制执行时间
     * elapsed now()-start 已经使用的时间
     */
    long long start = ustime(), timelimit, elapsed;
    if (clientsArePaused()) return;

    //快循环
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) 
        /* Don't start a fast cycle if the previous cycle did not exit
         * for time limit. Also don't repeat a fast cycle for the same period
         * as the fast cycle total duration itself. */
        /*
         *如果是0,就直接返回,看注释的意思,如果是timelimit_exit=1,快循环不再进来,但是代码却将timelimit_exit=0给拦截了
         * 这代码从3.0就有了,到6.0加了一个判断条件,后续再研究下别的版本分析下
         */
        if (!timelimit_exit) return;
        // 如果和上次执行时间差2000微秒以内,也不执行
        if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
        last_fast_cycle = start;
    

    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    /**
     * @brief 最多占用微秒数,这里限制过期处理的时间
     * 未开启server.dynamic_hz的时候,server.hz=10,timelimit=25000微秒,25毫秒
     * 开启server.dynamic_hz的时候
     * 10000 clients server.hz是80,timelimit=3125微秒 约3.1毫秒 
     */
    timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
     //慢循环会重新置为0
    timelimit_exit = 0;
    //=0就给默认值1微秒
    if (timelimit <= 0) timelimit = 1;

    //快速模式的情况下只占用1毫秒
    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */

    /* Accumulate some global stats as we expire keys, to have some idea
     * about the number of keys that are already logically expired, but still
     * existing inside the database. */

    /**
     * @brief 采样
     */
    long total_sampled = 0;
    long total_expired = 0;

    //遍历db
    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) 
        //默认值为0
        int expired;
        redisDb *db = server.db+(current_db % server.dbnum);

        /* Increment the DB now so we are sure if we run out of time
         * in the current DB we'll restart from the next. This allows to
         * distribute the time evenly across DBs. */
        current_db++;

        /* Continue to expire if at the end of the cycle more than 25%
         * of the keys were expired. */
        
        /**
         * @brief 过期比例超过25%会循环直到时间限制为止
         */
        do 
            unsigned long num, slots;
            long long now, ttl_sum;
            int ttl_samples;
            //迭代次数,是总的迭代次数,和db无关
            iteration++;
            //当前db.expires的总大小为0,跳出当前循环,直接执行下个db
            if ((num = dictSize(db->expires)) == 0) 
                db->avg_ttl = 0;
                break;
            
            //当前db.expires使用总数
            slots = dictSlots(db->expires);
            now = mstime();

            //如果使用占比<1%,不处理
            if (num && slots > DICT_HT_INITIAL_SIZE &&
                (num*100/slots < 1)) break;
    
            //每次都重新计算过期数
            expired = 0;
            ttl_sum = 0;
            ttl_samples = 0;

            //每次执行只处理20个
            if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
                num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;

            while (num--) 
                dictEntry *de;
                long long ttl;
                //随机取一个,没取到就跳出循环
                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                ttl = dictGetSignedIntegerVal(de)-now;
                /**
                 * @brief 触发删除并发送过期事件,成功,expired+1
                 * 采样到的可能没有到期
                 */
                if (activeExpireCycleTryExpire(db,de,now)) expired++;
                if (ttl > 0) 
                    ttl_sum += ttl;
                    ttl_samples++;
                
                //总采样数
                total_sampled++;
            
            //总过期数
            total_expired += expired;

            /* Update the average TTL stats for this database. */
            if (ttl_samples) 
                long long avg_ttl = ttl_sum/ttl_samples;

                /* Do a simple running average with a few samples.
                 * We just use the current estimate with a weight of 2%
                 * and the previous estimate with a weight of 98%. */
                if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
                db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
            

            //迭代16次,就有一次逻辑处理,为了不阻塞正常的业务处理
            if ((iteration & 0xf) == 0)  /* check once every 16 iterations. */
                elapsed = ustime()-start;
                //执行时间超过了timelimit直接中断,如果没有直到
                if (elapsed > timelimit) 
                    //超时设置timelimit_exit=1
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                
            
        //如果过期key超过25%,就会一直执行,直到超过了时间限制
         while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
    
    //计算持续时间
    elapsed = ustime()-start;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. */
    double current_perc;
    if (total_sampled) 
        //当前的过期比例
        current_perc = (double)total_expired/total_sampled;
     else
        current_perc = 0;
    //设置可能过期的比例
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);



  • 快慢循环都会改变timelimit_exit的值,按理说快循环会快但是从梳理代码来看,快循环在timelimit_exit为0 的时候是不会进来的,代码 if (!timelimit_exit) return 这块进行了拦截

  • 一旦redis中有过多的key过期,redis会循环删除,以16次为一个周期,代码(iteration & 0xf) == 0,去判断执行时间有没有超过timelimit,有就跳出,执行别的业务

  • 所以说呢,redis大量的过期key,在redis采样删除时,默认情况超过25%不会完全阻塞redis,只是周期性的阻塞25毫秒,但是如果25毫秒阻塞,5毫秒执行任务(5/6的时间都在阻塞了),这时候的redis对外的性能会急剧下降

以上是关于FastAPI 源码阅读 (五) 其余主体内容的主要内容,如果未能解决你的问题,请参考以下文章

FastAPI 源码阅读 (四) Endpoint解析

openjdk源码阅读导航

干货:使用Fastapi开发自己的Mock server(附源码)

redis源码阅读五-为什么大量过期key会阻塞redis?

源码阅读结构特征提取ReFex

chromium源码阅读--Browser进程初始化