TiKV 源码阅读三部曲重要模块
Posted TiDB_PingCAP
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TiKV 源码阅读三部曲重要模块相关的知识,希望对你有一定的参考价值。
作者简介:谭新宇,清华大学软件学院研三在读,Apache IoTDB committer,Talent Plan Community mentor。
TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。
作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。
TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料。当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新同学从全局角度理解代码。
基于以上存在的问题,笔者将基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:
- TiKV 源码阅读三部曲(一)重要模块:TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案
- TiKV 源码阅读三部曲(二)读流程:TiKV 中一条读请求的全链路流程
- TiKV 源码阅读三部曲(三)写流程:TiKV 中一条写请求的全链路流程
希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
本文为第一篇博客,将主要介绍 TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案。
基本概念
TiKV 的架构简介可以查看 官方文档。总体来看,TiKV 是一个通过 Multi-Raft 实现的分布式 KV 数据库。
TiKV 的每个进程拥有一个 store,store 中拥有若干 region。每个 region 是一个 raft 组,会存在于副本数个 store 上管理一段 KV 区间的数据。
重要模块
KVService
TiKV 的 Service 层代码位于 src/server 文件夹下,其职责包括提供 RPC 服务、将 store id 解析成地址、TiKV 之间的相互通信等。有关 Service 层的概念解析可以查看阅读 TiKV 源码解析系列文章(九)Service 层处理流程解析。
TiKV 包含多个 gRPC service。其中最重要的一个是 KVService,位于 src/server/service/kv.rs 文件中。
KVService 定义了 TiKV 的 kv_get,kv_scan,kv_prewrite,kv_commit 等事务操作 API,用于执行 TiDB 下推下来的复杂查询和计算的 coprocessor API,以及 raw_get,raw_put 等 Raw KV API。batch_commands 接口则是用于将上述的接口 batch 起来,以优化高吞吐量的场景。另外,TiKV 的 Raft group 各成员之间通信用到的 raft 和 batch_raft 接口也是在这里提供的。
本小节将简单介绍 KVService 及其启动流程,并顺带介绍 TiKV 若干重要结构的初始化流程。
cmd/tikv-server/main.rs 是 TiKV 进程启动的入口,其主要做了以下两个工作:
- 解析配置参数
- 使用
server::server::run_tikv(config)
启动 tikv 进程
fn main()
let build_timestamp = option_env!("TIKV_BUILD_TIME");
let version_info = tikv::tikv_version_info(build_timestamp);
// config parsing
// ...
// config parsing
server::server::run_tikv(config);
对于 components/server/src/server.rs 的 run-tikv 函数,其会调用 run_impl 函数并根据配置参数来启动对应的 KV 引擎。
在 run_impl 函数中,首先会调用 TikvServer::<CER>::init::<F>(config)
函数做若干重要结构的初始化,包含但不限于 batch_system, concurrency_manager, background_worker, quota_limiter 等等,接着在 tikv.init_servers::<F>()
里将 RPC handler 与 KVService 绑定起来,最后在 tikv.run_server(server_config)
中便会使用 TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程 中介绍的 grpc server 绑定对应的端口并开始监听连接了。
/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub fn run_tikv(config: TikvConfig)
...
// Do some prepare works before start.
pre_start();
let _m = Monitor::default();
dispatch_api_version!(config.storage.api_version(),
if !config.raft_engine.enable
run_impl::<RocksEngine, API>(config)
else
run_impl::<RaftLogEngine, API>(config)
)
#[inline]
fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(config: TikvConfig)
let mut tikv = TikvServer::<CER>::init::<F>(config);
...
let server_config = tikv.init_servers::<F>();
...
tikv.run_server(server_config);
signal_handler::wait_for_signal(Some(tikv.engines.take().unwrap().engines));
tikv.stop();
fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>)
let server = self.servers.as_mut().unwrap();
server
.server
.build_and_bind()
.unwrap_or_else(|e| fatal!("failed to build server: ", e));
server
.server
.start(server_config, self.security_mgr.clone())
.unwrap_or_else(|e| fatal!("failed to start server: ", e));
KVService 服务启动后,所有发往监听端口的请求便会路由到 KVService 对应的 handler 上。有关 KVService 目前支持的接口,可以直接查看 kvproto 对应的 service Tikv
,目前的 RPC 接口已经接近 60 个,每个接口在代码中都会对应一个 handler。
// Key/value store API for TiKV.
service Tikv
// Commands using a transactional interface.
rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse)
rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse)
rpc KvPrewrite(kvrpcpb.PrewriteRequest) returns (kvrpcpb.PrewriteResponse)
rpc KvPessimisticLock(kvrpcpb.PessimisticLockRequest) returns (kvrpcpb.PessimisticLockResponse)
rpc KVPessimisticRollback(kvrpcpb.PessimisticRollbackRequest) returns (kvrpcpb.PessimisticRollbackResponse)
...
当 KVService 收到请求之后,会根据请求的类型把这些请求转发到不同的模块进行处理。对于从 TiDB 下推的读请求,比如 sum,avg 操作,会转发到 Coprocessor 模块进行处理,对于 KV 请求会直接转发到 Storage 模块进行处理。
KV 操作根据功能可以被划分为 Raw KV 操作以及 Txn KV 操作两大类。Raw KV 操作包括 raw put、raw get、raw delete、raw batch get、raw batch put、raw batch delete、raw scan 等普通 KV 操作。 Txn KV 操作是为了实现事务机制而设计的一系列操作,如 prewrite 和 commit 分别对应于 2PC 中的 prepare 和 commit 阶段的操作。
与 TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程 中介绍的 handler example 不同,当前 KVService 对事务 API 例如 kv_prewrite, kv_commit 和 Raw API 例如 raw_get, raw_scan 进行了封装,由于他们都会被路由到 Storage 模块,所以接口无关的逻辑都被封装到了 handle_request
宏中,接口相关的逻辑则被封装到了 future_prewirte, future_commit 等 future_xxx 函数中。需要注意的是,对于 coprocessor API,raft API 等相关接口依然采用了原生对接 grpc-rs 的方式。
macro_rules! handle_request
($fn_name: ident, $future_name: ident, $req_ty: ident, $resp_ty: ident) =>
handle_request!($fn_name, $future_name, $req_ty, $resp_ty, no_time_detail);
;
($fn_name: ident, $future_name: ident, $req_ty: ident, $resp_ty: ident, $time_detail: tt) =>
fn $fn_name(&mut self, ctx: RpcContext<'_>, mut req: $req_ty, sink: UnarySink<$resp_ty>)
forward_unary!(self.proxy, $fn_name, ctx, req, sink);
let begin_instant = Instant::now();
let source = req.mut_context().take_request_source();
let resp = $future_name(&self.storage, req);
let task = async move
let resp = resp.await?;
let elapsed = begin_instant.saturating_elapsed();
set_total_time!(resp, elapsed, $time_detail);
sink.success(resp).await?;
GRPC_MSG_HISTOGRAM_STATIC
.$fn_name
.observe(elapsed.as_secs_f64());
record_request_source_metrics(source, elapsed);
ServerResult::Ok(())
.map_err(|e|
log_net_error!(e, "kv rpc failed";
"request" => stringify!($fn_name)
);
GRPC_MSG_FAIL_COUNTER.$fn_name.inc();
)
.map(|_|());
ctx.spawn(task);
impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
for Service<T, E, L, F>
handle_request!(kv_get, future_get, GetRequest, GetResponse, has_time_detail);
handle_request!(kv_scan, future_scan, ScanRequest, ScanResponse);
handle_request!(
kv_prewrite,
future_prewrite,
PrewriteRequest,
PrewriteResponse,
has_time_detail
);
...
handle_request!(raw_get, future_raw_get, RawGetRequest, RawGetResponse);
handle_request!(
raw_batch_get,
future_raw_batch_get,
RawBatchGetRequest,
RawBatchGetResponse
);
handle_request!(raw_scan, future_raw_scan, RawScanRequest, RawScanResponse);
...
fn coprocessor(&mut self, ctx: RpcContext<'_>, mut req: Request, sink: UnarySink<Response>)
forward_unary!(self.proxy, coprocessor, ctx, req, sink);
let source = req.mut_context().take_request_source();
let begin_instant = Instant::now();
let future = future_copr(&self.copr, Some(ctx.peer()), req);
let task = async move
let resp = future.await?.consume();
sink.success(resp).await?;
let elapsed = begin_instant.saturating_elapsed();
GRPC_MSG_HISTOGRAM_STATIC
.coprocessor
.observe(elapsed.as_secs_f64());
record_request_source_metrics(source, elapsed);
ServerResult::Ok(())
.map_err(|e|
log_net_error!(e, "kv rpc failed";
"request" => "coprocessor"
);
GRPC_MSG_FAIL_COUNTER.coprocessor.inc();
)
.map(|_| ());
ctx.spawn(task);
...
在事务相关 API 的 future_xxx 函数实现中,对于带有写语义的 future_prewrite, future_commit 等函数,由于它们会被统一调度到 Storage 模块的 sched_txn_command 函数中,当前又抽象出了 txn_command_future
宏来减少冗余代码;对于带有读语义的 future_get, future_scan 等函数,由于他们会分别调用 Storage 模块的 get/scan 等函数,因而目前并没有进行进一步抽象。
macro_rules! txn_command_future
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($req: ident) $prelude: stmt; ($v: ident, $resp: ident, $tracker: ident) $else_branch: expr ) =>
fn $fn_name<E: Engine, L: LockManager, F: KvFormat>(
storage: &Storage<E, L, F>,
$req: $req_ty,
) -> impl Future<Output = ServerResult<$resp_ty>>
$prelude
let $tracker = GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new(
$req.get_context(),
RequestType::Unknown,
0,
)));
set_tls_tracker_token($tracker);
let (cb, f) = paired_future_callback();
let res = storage.sched_txn_command($req.into(), cb);
async move
defer!
GLOBAL_TRACKERS.remove($tracker);
;
let $v = match res
Err(e) => Err(e),
Ok(_) => f.await?,
;
let mut $resp = $resp_ty::default();
if let Some(err) = extract_region_error(&$v)
$resp.set_region_error(err);
else
$else_branch;
Ok($resp)
;
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($v: ident, $resp: ident, $tracker: ident) $else_branch: expr ) =>
txn_command_future!($fn_name, $req_ty, $resp_ty, (req) ; ($v, $resp, $tracker) $else_branch );
;
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($v: ident, $resp: ident) $else_branch: expr ) =>
txn_command_future!($fn_name, $req_ty, $resp_ty, (req) ; ($v, $resp, tracker) $else_branch );
;
txn_command_future!(future_prewrite, PrewriteRequest, PrewriteResponse, (v, resp, tracker)
if let Ok(v) = &v
resp.set_min_commit_ts(v.min_commit_ts.into_inner());
resp.set_one_pc_commit_ts(v.one_pc_commit_ts.into_inner());
GLOBAL_TRACKERS.with_tracker(tracker, |tracker|
tracker.write_scan_detail(resp.mut_exec_details_v2().mut_scan_detail_v2());
tracker.write_write_detail(resp.mut_exec_details_v2().mut_write_detail());
);
resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());
);
fn future_get<E: Engine, L: LockManager, F: KvFormat>(
storage: &Storage<E, L, F>,
mut req: GetRequest,
) -> impl Future<Output = ServerResult<GetResponse>>
let tracker = GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new(
req.get_context(),
RequestType::KvGet,
req.get_version(),
)));
set_tls_tracker_token(tracker);
let start = Instant::now();
let v = storage.get(
req.take_context(),
Key::from_raw(req.get_key()),
req.get_version().into(),
);
async move
let v = v.await;
let duration_ms = duration_to_ms(start.saturating_elapsed());
let mut resp = GetResponse::default();
if let Some(err) = extract_region_error(&v)
resp.set_region_error(err);
else
match v
Ok((val, stats)) =>
...
Err(e) => resp.set_error(extract_key_error(&e)),
GLOBAL_TRACKERS.remove(tracker);
Ok(resp)
自 3.x 版本以来,KVService 利用了多个宏显著减少了不同 RPC handler 间的冗余代码,然而这些宏目前还不能被 Clion 等调试工具的函数调用关系链捕捉到,这可能会困惑刚开始查看函数调用链却无法找到对应 handler 的新同学。
通过本小节,希望您能够了解 KVService 的作用和 TiKV 的启动流程,不仅具备寻找全局重要结构体初始化代码片段的能力,还能够迅速找到 KVService 中需要的 RPC handler 开始从上到下追踪 RPC 请求的调用路径。
Storage
Storage 模块位于 Service 与底层 KV 存储引擎之间,主要负责事务的并发控制。TiKV 端事务相关的实现都在 Storage 模块中。有关 3.x 版本的 Storage 模块可以参照 TiKV 源码解析系列文章(十一)Storage - 事务控制层。
经过三个大版本的迭代,Storage 和 Scheduler 结构体已经发生了一些变化,本小节将基于之前的源码解析文档做一些更新和补充。
Storage 结构体:
- engine:代表的是底层的 KV 存储引擎,利用 Trait Bound 来约束接口,拥有多种实现。实际 TiKV 使用的是 RaftKV 引擎,当调用 RaftKV 的 async_write 进行写入操作时,如果 async_write 通过回调方式成功返回了,说明写入操作已经通过 raft 复制给了大多数副本,并且在 leader 节点(调用者所在 TiKV)完成写入,后续 leader 节点上的读就能够看到之前写入的内容
- sched:事务调度器,负责并发事务请求的调度工作
- readPool:读取线程池,所有只读 KV 请求,包括事务的和非事务的,如 raw get、txn kv get 等最终都会在这个线程池内执行。由于只读请求不需要获取 latches,所以为其分配一个独立的线程池直接执行,而不是与非只读事务共用事务调度器。值得注意的是,当前版本的 readPool 已经支持根据读请求中的 priority 字段来差别调度读请求,而不是全部看做相同优先级的任务来公平调度
pub struct Storage<E: Engine, L: LockManager, F: KvFormat>
// TODO: Too many Arcs, would be slow when clone.
engine: E,
sched: TxnScheduler<E, L>,
/// The thread pool used to run most read operations.
read_pool: ReadPoolHandle,
...
#[derive(Clone)]
pub enum ReadPoolHandle
FuturePools
read_pool_high: FuturePool,
read_pool_normal: FuturePool,
read_pool_low: FuturePool,
,
Yatp
remote: Remote<TiKV 源码阅读三部曲写流程