Redis:命令集构建及关键属性解析
Posted 等你归去来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis:命令集构建及关键属性解析相关的知识,希望对你有一定的参考价值。
上一篇文章,我们从框架层面,主要介绍了redis的启动过程,以及主要的命令处理流程逻辑。这些更多的都是些差不多的道理,而要细了解redis,则需要更细节的东西。
今天我们稍微内围的角度,来看看几个命令执行的重要方法,深入理解下redis的魅力所在。
首先,我们通过上一章知道,processCommand 是其业务主要入口,我们再来回顾下:
// server.c /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ // 如果是 quit 命令,直接回复ok即可,由客户端主动关闭请求 if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; } /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ // 查找命令信息,根据第一个字符串进行查找,这个我们主要看看 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 验证命令查找是否找到,以及参数个数是否匹配,否则直接响应返回 if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command ‘%s‘", (char*)c->argv[0]->ptr); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for ‘%s‘ command", c->cmd->name); return C_OK; } /* Check if the user is authenticated */ // 权限判定,只有先授权后,才能执行后续命令(auth 除外) if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return C_OK; } /* If cluster is enabled perform the cluster redirection here. * However we don‘t perform the redirection if: * 1) The sender of this command is our master. * 2) The command has no key arguments. */ // 集群非master写请求转移 // 此处可见 flags 设计的重要性,代表了n多属性 if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot; if (server.cluster->state != CLUSTER_OK) { flagTransaction(c); clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); return C_OK; } else { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { flagTransaction(c); clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } } /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ // 最大内存检查,释放 if (server.maxmemory) { int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return C_ERR; /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } /* Don‘t accept write commands if there are problems persisting on disk * and if this is a master instance. */ // 持久化异常检测,不接受写操作,不接受 ping if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == C_ERR) || server.aof_last_write_status == C_ERR) && server.masterhost == NULL && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == C_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s ", strerror(server.aof_last_write_errno))); return C_OK; } /* Don‘t accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ // slave 数量不够时,不接受写操作 if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return C_OK; } /* Don‘t accept write commands if this is a read only slave. But * accept write commands if this is our master. */ // 只读 slave 不接受写操作 if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { addReply(c, shared.roslaveerr); return C_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ // pub/sub 模式仅接受极少数命令 if (c->flags & CLIENT_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context"); return C_OK; } /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ // 复制连接超时,slave-serve-stale-data: on 时可以处理请求 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return C_OK; } /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ // db 还在加载中,不接受任何请求 if (server.loading && !(c->cmd->flags & CMD_LOADING)) { addReply(c, shared.loadingerr); return C_OK; } /* Lua script too slow? Only allow a limited number of commands. */ // lua 脚本执行缓慢时,仅接受少数命令 if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == ‘n‘) && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == ‘k‘)) { flagTransaction(c); addReply(c, shared.slowscripterr); return C_OK; } /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { // 事务型命令,将命令入队 queueMultiCommand(c); addReply(c,shared.queued); } else { // 非事务性命令,直接处理请求 call(c,CMD_CALL_FULL); c->woff = server.master_repl_offset; // 如果有待处理事件,继续处理 if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return C_OK; }
零、redis中的几个关键数据结构
1. redisServer
redisServer 是redis中最大的全局变量,负责保存客户端,配置信息,db 等等各种重要信息。各函数之间通信,也是隐藏的使用 redisServer 进行通信。它的定义是在 server.h 中,而 实例化则是在 server.c 中。
struct redisServer { /* General */ pid_t pid; /* Main process pid. */ char *configfile; /* Absolute config file path, or NULL */ char *executable; /* Absolute executable file path. */ char **exec_argv; /* Executable argv vector (copy). */ int hz; /* serverCron() calls frequency in hertz */ redisDb *db; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ char *requirepass; /* Pass for AUTH command, or NULL */ char *pidfile; /* PID file path */ int arch_bits; /* 32 or 64 depending on sizeof(long) */ int cronloops; /* Number of times the cron function run */ char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */ int sentinel_mode; /* True if this instance is a Sentinel. */ /* Networking */ int port; /* TCP listening port */ int tcp_backlog; /* TCP listen() backlog */ char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */ int bindaddr_count; /* Number of addresses in server.bindaddr[] */ char *unixsocket; /* UNIX socket path */ mode_t unixsocketperm; /* UNIX socket permission */ int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ int sofd; /* Unix socket file descriptor */ int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */ int cfd_count; /* Used slots in cfd[] */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client, only used on crash report */ int clients_paused; /* True if clients are currently paused */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don‘t accept external connections. */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; off_t loading_loaded_bytes; time_t loading_start_time; off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, *rpopCommand, *sremCommand, *execCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ long long stat_numconnections; /* Number of connections received */ long long stat_expiredkeys; /* Number of expired keys */ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ long long stat_keyspace_misses; /* Number of failed lookups of keys */ size_t stat_peak_memory; /* Max used memory record */ long long stat_fork_time; /* Time needed to perform latest fork() */ double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_rejected_conn; /* Clients rejected because of maxclients */ long long stat_sync_full; /* Number of full resyncs with slaves. */ long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */ list *slowlog; /* SLOWLOG list of commands */ long long slowlog_entry_id; /* SLOWLOG current entry ID */ long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ size_t resident_set_size; /* RSS sampled in serverCron(). */ long long stat_net_input_bytes; /* Bytes read from network. */ long long stat_net_output_bytes; /* Bytes written to network. */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { long long last_sample_time; /* Timestamp of last sample in ms */ long long last_sample_count;/* Count in last sample */ long long samples[STATS_METRIC_SAMPLES]; int idx; } inst_metric[STATS_METRIC_COUNT]; /* Configuration */ int verbosity; /* Loglevel in redis.conf */ int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ int daemonize; /* True if running as a daemon */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; /* AOF persistence */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ int aof_no_fsync_on_rewrite; /* Don‘t fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ sds aof_buf; /* AOF buffer, written before entering the event loop */ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don‘t stop on unexpected AOF EOF. */ /* AOF pipes used to communicate between parent and child during rewrite. */ int aof_pipe_write_data_to_child; int aof_pipe_read_data_from_parent; int aof_pipe_write_ack_to_parent; int aof_pipe_read_ack_from_child; int aof_pipe_write_ack_to_child; int aof_pipe_read_ack_from_parent; int aof_stop_sending_diff; /* If true stop sending accumulated diffs to child process. */ sds aof_child_diff; /* AOF diff accumulator child side. */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ pid_t rdb_child_pid; /* PID of RDB saving child */ struct saveparam *saveparams; /* Save points array for RDB */ int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ time_t lastsave; /* Unix time of last successful save */ time_t lastbgsave_try; /* Unix time of last attempted bgsave */ time_t rdb_save_time_last; /* Time used by last RDB save run. */ time_t rdb_save_time_start; /* Current RDB save start time. */ int rdb_child_type; /* Type of save by active child. */ int lastbgsave_status; /* C_OK or C_ERR */ int stop_writes_on_bgsave_err; /* Don‘t allow writes if can‘t BGSAVE */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ char *logfile; /* Path of log file */ int syslog_enabled; /* Is syslog enabled? */ char *syslog_ident; /* Syslog ident */ int syslog_facility; /* Syslog facility */ /* Replication (master) */ int slaveseldb; /* Last SELECTed DB in replication output */ long long master_repl_offset; /* Global replication offset */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ char *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_histlen; /* Backlog actual data length */ long long repl_backlog_idx; /* Backlog circular buffer current offset */ long long repl_backlog_off; /* Replication offset of first byte in the backlog buffer. */ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. Only valid if server.slaves len is 0. */ int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ int masterport; /* Port of master */ int repl_timeout; /* Timeout after N seconds of master idle */ client *master; /* Client that is master for this slave */ client *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a slave */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ int repl_transfer_s; /* Slave -> Master SYNC socket */ int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ int repl_serve_stale_data; /* Serve stale data when link is down? */ int repl_slave_ro; /* Slave is read only? */ time_t repl_down_since; /* Unix time at which link with master went down */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int slave_priority; /* Reported in INFO and used by Sentinel. */ char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */ long long repl_master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Replication script cache. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ unsigned int repl_scriptcache_size; /* Max number of elements. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT command. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ int maxmemory_policy; /* Policy for key eviction */ int maxmemory_samples; /* Pricision of random sampling */ /* Blocked clients */ unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; int sort_store; /* Zip structure config, see redis.conf for more information */ size_t hash_max_ziplist_entries; size_t hash_max_ziplist_value; size_t set_max_intset_entries; size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; size_t hll_sparse_max_bytes; /* List parameters */ int list_max_ziplist_size; int list_compress_depth; /* time cache */ time_t unixtime; /* Unix time sampled every cron cycle. */ long long mstime; /* Like ‘unixtime‘ but with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ mstime_t cluster_node_timeout; /* Cluster node timeout. */ char *cluster_configfile; /* Cluster auto-generated config file name. */ struct clusterState *cluster; /* State of the cluster */ int cluster_migration_barrier; /* Cluster replicas migration barrier. */ int cluster_slave_validity_factor; /* Slave max data age for failover. */ int cluster_require_full_coverage; /* If true, put the cluster down if there is at least an uncovered slot.*/ char *cluster_announce_ip; /* IP address to announce on cluster bus. */ int cluster_announce_port; /* base port to announce on cluster bus. */ int cluster_announce_bus_port; /* bus port to announce on cluster bus. */ /* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */ client *lua_client; /* The "fake client" to query Redis from Lua */ client *lua_caller; /* The client running EVAL right now, or NULL */ dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ mstime_t lua_time_limit; /* Script timeout in milliseconds */ mstime_t lua_time_start; /* Start time of script, milliseconds time */ int lua_write_dirty; /* True if a write command was called during the execution of the current script. */ int lua_random_dirty; /* True if a random command was called during the execution of the current script. */ int lua_replicate_commands; /* True if we are doing single commands repl. */ int lua_multi_emitted;/* True if we already proagated MULTI. */ int lua_repl; /* Script replication flags for redis.set_repl(). */ int lua_timedout; /* True if we reached the time limit for script execution. */ int lua_kill; /* Kill the script if true. */ int lua_always_replicate_commands; /* Default replication type. */ /* Lazy free */ int lazyfree_lazy_eviction; int lazyfree_lazy_expire; int lazyfree_lazy_server_del; /* Latency monitor */ long long latency_monitor_threshold; dict *latency_events; /* Assert & bug reporting */ char *assert_failed; char *assert_file; int assert_line; int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ }; // server.c, 实例化 server struct redisServer server; /* server global state */
2. redisObject
java有万事万物皆对象的说法,而redis中也可以用 一切皆 redisObject 来描述,可以说是最通用的redis数据结构。
typedef struct redisObject { // 类型, 4个字节 unsigned type:4; // 编码, 4个字节 unsigned encoding:4; // lru 时间, 24字节 unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */ // 引用计数, 当引用为0时,意味着无用了 int refcount; // 数据指针,存储任意数据 void *ptr; } robj;
3. redisDb
redisDb 是redis作为数据库的主要存储模型,承载了所有的业务数据存储。单从这点来说,要实现一个数据库貌似很简单,但实际却是很难。
typedef struct redisDb { // 一个数据库,就是一个kv字典,查找出 k 后,才能确定其数据类型 如 string, hash, list, set, zset dict *dict; /* The keyspace for this DB */ // 过期数据队列 dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */ // 数据库号, 默认是 16, 如果想支持更多数据库号,改外部db数组大小,增大这个值就可以了 int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ } redisDb;
4. client
每一个客户端连接,就是一个 client 实例,其中包含许多全局引用信息。比如解析完客户端请求之后,会把参数,数据库指针都放到 client 中。
typedef struct client { uint64_t id; /* Client incremental unique ID. */ // socket fd int fd; /* Client socket. */ // 用户目前使用的db,所有的操作都是针对这个db的操作 redisDb *db; /* Pointer to currently SELECTed DB. */ int dictid; /* ID of the currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ // 用户请求相关参数放置 sds querybuf; /* Buffer we use to accumulate client queries. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ // 当前命令和上一个命令指针 struct redisCommand *cmd, *lastcmd; /* Last command executed. */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int flags; /* Client flags: CLIENT_* macros. */ // 是否已授权 int authenticated; /* When requirepass is non-NULL. */ // 复制相关 int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ long long reploff; /* Replication offset if this is our master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;
一、命令查找 dict
当一个请求发到redis服务器后,我们将其数据解析出来,自然先要明白命令是哪个,然后才知道如何处理它。我们看一下,redis是如何查找具体的处理命令的?
// server.c, 其实无非就是一个 map 形式的查找而已 struct redisCommand *lookupCommand(sds name) { // 直接基于 server.commands 查询, server.commands 是在启动的时候初始化好的 return dictFetchValue(server.commands, name); } // dict.c , 查找字典, 返回任意类型的地址 void *dictFetchValue(dict *d, const void *key) { dictEntry *he; // 找到 dict 后,直接取其 value 即可,否则返回 NULL he = dictFind(d,key); return he ? dictGetVal(he) : NULL; } // dict.c, 查找字典 entry, 也就是 hashmap 那一套东西了 dictEntry *dictFind(dict *d, const void *key) { dictEntry *he; unsigned int h, idx, table; if (d->ht[0].size == 0) return NULL; /* We don‘t have a table at all */ // 如果正在进行 rehash 缩扩容, 则进行一次增量式rehash数据迁移,这是redis独有的玩意 if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); // 最大查找 ht的2个表,如果进行 rehash 的话,否则只遍历一次 for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; while(he) { // 找到对应元素,则返回,否则链表查询 if (dictCompareKeys(d, key, he->key)) return he; he = he->next; } // 如果没有进行rehash, 那么应当是一遍历就可以拿到结果或者拿不到 if (!dictIsRehashing(d)) return NULL; } return NULL; } // dict.h, 只有 rehashidx=-1 才表示在进行 rehash, rehashidx 表示正在进行的rehash 的元素数 #define dictIsRehashing(d) ((d)->rehashidx != -1) // dict.c, 内部rehash /* This function performs just a step of rehashing, and only if there are * no safe iterators bound to our hash table. When we have iterators in the * middle of a rehashing we can‘t mess with the two hash tables otherwise * some element can be missed or duplicated. * * This function is called by common lookup or update operations in the * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { // 只会进行一次rehash 操作,不会用时很久 if (d->iterators == 0) dictRehash(d,1); } // dict.c /* Performs N steps of incremental rehashing. Returns 1 if there are still * keys to move from the old to the new hash table, otherwise 0 is returned. * * Note that a rehashing step consists in moving a bucket (that may have more * than one key as we use chaining) from the old to the new hash table, however * since part of the hash table may be composed of empty spaces, it is not * guaranteed that this function will rehash even a single bucket, since it * will visit at max N*10 empty buckets in total, otherwise the amount of * work it does would be unbound and the function may block for a long time. */ int dictRehash(dict *d, int n) { // 最大只会访问 n*10 个元素,避免长时间hash导致暂停 int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; // rehash 并不是一次性完成的,而是遇到 used=0, 则退出了 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can‘t overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsigned long)d->rehashidx); // 从上次 rehash 的地方开始进行, 如果中间的值都是空的,则本次不再进行深度rehash了 while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ // 针对找到的需要 rehash 的元素,做转移 while(de) { unsigned int h; nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 解决冲突问题,与原有slot元素链接 de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; // rehashidx++, 表示正在进行的rehash d->rehashidx++; } /* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0) { zfree(d->ht[0].table); // rehash 完所有元素后,直接交换 ht 0/1 d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return 0; } /* More to rehash... */ // 返回1代表还需要进行rehash return 1; }
综上,可以看出一个 命令的查找过程,其实就是一个hash字典的查找过程。做的比较特别的优化是,进行rehash时,只做部分rehash,将停顿时间最大可能减小。使用 两个hash表进行互相替换,来保证数据的完整性。
hash表是个很有用的数据结构,上面是对对于命令的查找使用,但是对于后面的 kv 的查找,同样可以使用 dict 这种结构,所以后续对其他命令的解析时,只需注意其特有的处理方式即可。
二、溯源: 命令的添加
要想实现如上的查找,我们有必要了解下其是在何时添加的。下面,我们来看看,这些命令是如何写入到 server.commands 中的。
// server.c, 在进行配置初始化 initServerConfig() 时,添加命令集到 server.commands /* Populates the Redis Command Table starting from the hard coded list * we have on top of redis.c file. */ void populateCommandTable(void) { int j; int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); for (j = 0; j < numcommands; j++) { // 通过在文件开头定义的一个数组,将命令集加入 struct redisCommand *c = redisCommandTable+j; char *f = c->sflags; int retval1, retval2; // 转换 flags 到 command 中,用一个位表示一个 flag 标识 while(*f != ‘