redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了相关的知识,希望对你有一定的参考价值。

之前分析了通过redis源码阅读二-终于把redis的启动流程搞明白了分析了redis的启动流程,通过redis源码阅读三-终于把主线任务执行搞明白了分析了redis的主线任务,这次从redis6.2分支上分析redis6的io多线程。

我注释的代码主要在redis5.0分支上。6.2上注释的比较少。

https://github.com/yxkong/redis/tree/6.2

https://github.com/yxkong/redis/tree/5.0

acceptTcpHandler处理的差异

虽然最终都是添加了一个处理器是readQueryFromClient的FileEvent事件,但是6.2分支就有点绕,可能是6以后想向面向对象靠拢了。

在5.0中createClient的时候直接创建了一个FileEvent

aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)

在6.2分支中在createClient的时候,通过connSetReadHandler间接的创建FileEvent

在networking.c中
//注册readQueryFromClient,当connection读的时候回调时调用
connSetReadHandler(conn, readQueryFromClient);

这块就有点绕,对于熟悉java的同学来说
在connection.h中定义了结构体ConnectionType,且connection有个ConnectionType的属性,这里是一堆接口

typedef struct ConnectionType 
    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    int (*write)(struct connection *conn, const void *data, size_t data_len);
    int (*read)(struct connection *conn, void *buf, size_t buf_len);
    void (*close)(struct connection *conn);
    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    const char *(*get_last_error)(struct connection *conn);
    int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
    ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    int (*get_type)(struct connection *conn);
 ConnectionType;

struct connection 
    ConnectionType *type;
    ConnectionState state;
    short int flags;
    short int refs;
    int last_errno;
    void *private_data;
    ConnectionCallbackFunc conn_handler;
    ConnectionCallbackFunc write_handler;
    ConnectionCallbackFunc read_handler;
    int fd;
;

在connection.c中,对ConnectionType有个默认实现(我不知道这么理解对不对)

ConnectionType CT_Socket = 
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
;
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) 
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        //真正的目标是这里,创建一个fileEvent事件,回调函数是readQueryFromClient
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;



static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) 
    //所以这里调用的是connection.c中的connSocketSetReadHandler
    return conn->type->set_read_handler(conn, func);




fileEvent创建好了以后,怎么触发就不说了,前面都讲过。

InitServerLast 中的差异

在redis6.2分支里有个initThreadedIO(),在这里

  • 初始化了io线程池数组

  • 主线程是io线程池中的第一个线程

  • 同时设置了其他线程的执行方法,轮训等待处理分配给自己的io_threads_list(io读写任务链表)

  • 真正的处理IO读,IO写

/**
 * @brief 初始化IO线程
 */
void initThreadedIO(void) 
    server.io_threads_active = 0; /* We start with threads not active. */
    //io_threads_num=1退出,因为主线程承担了
    if (server.io_threads_num == 1) return;

    //大于128个线程就退出启动流程
    if (server.io_threads_num > IO_THREADS_MAX_NUM) 
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    

    /* Spawn and initialize the I/O threads. */
    /**
     * @brief 创建和初始化io线程池
     */
    for (int i = 0; i < server.io_threads_num; i++) 
        /* Things we do for all the threads including the main thread. */
        //i=0 是主线程,主线程也处理io
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        //初始化互斥锁,并加锁
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        //io等待数量,设置io_threads_pending[i]为0
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) 
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        
        io_threads[i] = tid;
    


我们再看下IOThreadMain里

  • 这里主要空轮训,等到任务了就处理

  • 每轮训100万次,跑空后继续轮训100万次,如果主线程

**
 * @brief io线程执行的内容,类似于java线程的run
 * @param myid io数组的索引
 * @return void* 
 */
void *IOThreadMain(void *myid) 
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];
    //io_thd_+线程索引  就是对应的线程名称
    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    //将后io线程绑定到指定的cpu列表上
    redisSetCpuAffinity(server.server_cpulist);
    //设置线程可取消
    makeThreadKillable();

    while(1) 
        /* Wait for start */
        /**
         * @brief 空轮训,直到对应线程编号的io_threads_pending填充了数据或者这100万次执行完,会重新进来进行轮训
         */
        for (int j = 0; j < 1000000; j++) 
            if (getIOPendingCount(id) != 0) break;
        

        /* Give the main thread a chance to stop this thread. */
        //这里让主线程去停止,后续再看下TODO
        if (getIOPendingCount(id) == 0) 
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        

        serverAssert(getIOPendingCount(id) != 0);

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        //遍历每个io线程里的任务
        while((ln = listNext(&li))) 
            client *c = listNodeValue(ln);
            //处理对应的写事件
            if (io_threads_op == IO_THREADS_OP_WRITE) 
                writeToClient(c,0);
             else if (io_threads_op == IO_THREADS_OP_READ) 
                //处理对应的读事件
                readQueryFromClient(c->conn);
             else 
                serverPanic("io_threads_op value is unknown");
            
        
        listEmpty(io_threads_list[id]);
        //设置为0
        setIOPendingCount(id, 0);
    


readQueryFromClient 处理的差异

在6.2分支中有个postponeClientRead©函数。在这里如果开启了io多线程,就把client扔入到server.clients_pending_read中

int postponeClientRead(client *c) 
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
     else 
        return 0;
    


aeProcessEvents

在redis6.2分支里,beforesleep的执行放在了aeApiPoll之前

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
       //6.0之前是在外面,现在调整到了里面
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
        eventLoop->beforesleep(eventLoop);

    /* Call the multiplexing API, will return only on timeout or when
     * some event fires. */
    numevents = aeApiPoll(eventLoop, tvp);

    /* After sleep callback. */
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop); 


同时beforeSleep和afterSleep的设置挪到了initServer

void initServer(void) 
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);



在redis6.2分支里的beforeSleep 中我们重点关注以下两个方法,这两个方法都在networking.c中

//使用io线程池处理客户端等待读
handleClientsWithPendingReadsUsingThreads()
//使用io线程池处理客户端等待写
handleClientsWithPendingWritesUsingThreads();

  1. 先从server.clients_pending_read拿到任务,从0开始取模分发给所有的io线程

  2. 主线程处理分配给自己的任务io读取任务

  3. 主线程等待所有的io线程执行完分配的任务

  4. 处理clients_pending_read中的任务(因为在 aeApiPoll后才会触发添加到这里,所以这里的任务不会变)

  5. 这个时候才会把client从clients_pending_read中删除

  6. 主线程执行对应的命令

  7. 把执行完的任务加入到server.clients_pending_write

/**
 * @brief 使用io线程读
 * @return int 
 */
int handleClientsWithPendingReadsUsingThreads(void) 
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    //从等待读队列里拿到任务
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) 
        client *c = listNodeValue(ln);
        //通过取模放入到了对应的io_threads_list队尾,这就是里面吹的Round Robin
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    

    io_threads_op = IO_THREADS_OP_READ;
    //设置对应的io线程中的任务的数量
    for (int j = 1; j < server.io_threads_num; j++) 
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    

    //主线程处理分配给自己的io读
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) 
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    
    listEmpty(io_threads_list[0]);

    //主线程等所有的io线程都处理完了才会继续往下走
    while(1) 
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    

    //server.clients_pending_read长度不为0,就一直执行
    while(listLength(server.clients_pending_read)) 
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        //从clients_pending_read删除client
        listDelNode(server.clients_pending_read,ln);

        serverAssert(!(c->flags & CLIENT_BLOCKED));
        //执行命令
        if (processPendingCommandsAndResetClient(c) == C_ERR) 
            continue;
        
        //读取
        processInputBuffer(c);

        /* We may have pending replies if a thread readQueryFromClient() produced
         * replies and did not install a write handler (it can't).
         */

        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            //写到server.clients_pending_write
            clientInstallWriteHandler(c);
    

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;


handleClientsWithPendingWritesUsingThreads的代码处理逻辑和上面的差不多,就不再写了。

最后总结下(开启多线程io的情况下)

  • 按tcp接入的顺序,创建fd

  • 然后通过epoll使用readQueryFromClient读入到server.clients_pending_read

  • 主线程从server.clients_pending_read读取client并分发任务给各IO线程(主线程是io线程池中索引为0的线程)

  • 主线程执行分配到自己的io任务,其他io线程通过空轮训的方式等待分配到的任务并执行

  • 主线程等待所有io线程执行完(通过各个io线程的任务量总和为0来判断)

  • 再次从server.clients_pending_read里读取client,然后从server.clients_pending_read删除

  • 顺序执行client上绑定的任务,执行完写入到server.clients_pending_write(这些都是顺序性的)

redis源码分析系列

**redis源码阅读-入门篇
**

**redis源码阅读二-终于把redis的启动流程搞明白了
**

**redis源码阅读三-终于把主线任务执行搞明白了
**

**redis的key过期了还能取出来?
**

以上是关于redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了的主要内容,如果未能解决你的问题,请参考以下文章

redis6.0安装与使用

Redis6 源码调式

源码阅读:从三个重量级的开源框架中看线程池的应用(redisnginxskynet)

CentOS 7 装 Redis 6,重新焕发活力

史上最强Redis6.0,给分布式源码跪了!

❤ 超强超详细 | Redis入门详解