TiKV 源码阅读三部曲写流程
Posted TiDB_PingCAP
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TiKV 源码阅读三部曲写流程相关的知识,希望对你有一定的参考价值。
背景
TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。
作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。
笔者将结合TiKV 官方源码解析文档 系列文章,基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:
- TiKV 源码阅读三部曲(一)重要模块:TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案
- TiKV 源码阅读三部曲(二)读流程:TiKV 中一条读请求的全链路流程
- TiKV 源码阅读三部曲(三)写流程:TiKV 中一条写请求的全链路流程
希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
本文为第三篇博客,将主要介绍 TiKV 中一条写请求的全链路流程。
写流程
以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。
- TiKV 源码解析系列文章(九)Service 层处理流程解析
- TiKV 源码解析系列文章(十一)Storage - 事务控制层
- TiKV 源码解析系列文章(十二)分布式事务
- TiKV 源码解析系列文章(十八)Raft Propose 的 Commit 和 Apply 情景分析
本小节将在 TiKV 6.1 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程。
KVService
在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 Storage::sched_txn_command
函数中。
impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
for Service<T, E, L, F>
handle_request!(
kv_prewrite,
future_prewrite,
PrewriteRequest,
PrewriteResponse,
has_time_detail
);
txn_command_future!(future_prewrite, PrewriteRequest, PrewriteResponse, (v, resp, tracker)
if let Ok(v) = &v
resp.set_min_commit_ts(v.min_commit_ts.into_inner());
resp.set_one_pc_commit_ts(v.one_pc_commit_ts.into_inner());
GLOBAL_TRACKERS.with_tracker(tracker, |tracker|
tracker.write_scan_detail(resp.mut_exec_details_v2().mut_scan_detail_v2());
tracker.write_write_detail(resp.mut_exec_details_v2().mut_write_detail());
);
resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());
);
Storage
在 Storage 模块,其会将请求路由到 Scheduler::run_cmd
函数中,并进一步路由到 Scheduler::schedule_command
函数中。在 schedule_command
函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。
// The entry point of the storage scheduler. Not only transaction commands need
// to access keys serially.
pub fn sched_txn_command<T: StorageCallbackType>(
&self,
cmd: TypedCommand<T>,
callback: Callback<T>,
) -> Result<()>
...
self.sched.run_cmd(cmd, T::callback(callback));
Ok(())
pub(in crate::storage) fn run_cmd(&self, cmd: Command, callback: StorageCallback)
// write flow control
if cmd.need_flow_control() && self.inner.too_busy(cmd.ctx().region_id)
SCHED_TOO_BUSY_COUNTER_VEC.get(cmd.tag()).inc();
callback.execute(ProcessResult::Failed
err: StorageError::from(StorageErrorInner::SchedTooBusy),
);
return;
self.schedule_command(cmd, callback);
fn schedule_command(&self, cmd: Command, callback: StorageCallback)
let cid = self.inner.gen_id();
let tracker = get_tls_tracker_token();
debug!("received new command"; "cid" => cid, "cmd" => ?cmd, "tracker" => ?tracker);
let tag = cmd.tag();
let priority_tag = get_priority_tag(cmd.priority());
SCHED_STAGE_COUNTER_VEC.get(tag).new.inc();
SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC
.get(priority_tag)
.inc();
let mut task_slot = self.inner.get_task_slot(cid);
let tctx = task_slot.entry(cid).or_insert_with(||
self.inner
.new_task_context(Task::new(cid, tracker, cmd), callback)
);
if self.inner.latches.acquire(&mut tctx.lock, cid)
fail_point!("txn_scheduler_acquire_success");
tctx.on_schedule();
let task = tctx.task.take().unwrap();
drop(task_slot);
self.execute(task);
return;
let task = tctx.task.as_ref().unwrap();
let deadline = task.cmd.deadline();
let cmd_ctx = task.cmd.ctx().clone();
self.fail_fast_or_check_deadline(cid, tag, cmd_ctx, deadline);
fail_point!("txn_scheduler_acquire_fail");
在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:
- 使用
Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await
获取 snapshot。此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述 - 使用
sched.process(snapshot, task).await
基于获取到的 snapshot 和对应 task 去调用scheduler::process
函数,进而被路由到scheduler::process_write
函数中
/// Executes the task in the sched pool.
fn execute(&self, mut task: Task)
set_tls_tracker_token(task.tracker);
let sched = self.clone();
self.get_sched_pool(task.cmd.priority())
.pool
.spawn(async move
...
// The program is currently in scheduler worker threads.
// Safety: `self.inner.worker_pool` should ensure that a TLS engine exists.
match unsafe with_tls_engine(|engine: &E| kv::snapshot(engine, snap_ctx)) .await
Ok(snapshot) =>
...
sched.process(snapshot, task).await;
Err(err) =>
...
)
.unwrap();
/// Process the task in the current thread.
async fn process(self, snapshot: E::Snap, task: Task)
if self.check_task_deadline_exceeded(&task)
return;
let resource_tag = self.inner.resource_tag_factory.new_tag(task.cmd.ctx());
async
...
if task.cmd.readonly()
self.process_read(snapshot, task, &mut statistics);
else
self.process_write(snapshot, task, &mut statistics).await;
;
...
.in_resource_metering_tag(resource_tag)
.await;
scheduler::process_write
函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:
- 使用
task.cmd.process_write(snapshot, context).map_err(StorageError::from)
根据 snapshot 和 task 执行事务对应的语义:可以从Command::process_write
函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据。有关 PreWrite 和其他请求的具体操作可以参照 TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述。需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改。 - 使用
engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb)
将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 apply
async fn process_write(self, snapshot: E::Snap, task: Task, statistics: &mut Statistics)
...
let write_result =
let _guard = sample.observe_cpu();
let context = WriteContext
lock_mgr: &self.inner.lock_mgr,
concurrency_manager: self.inner.concurrency_manager.clone(),
extra_op: task.extra_op,
statistics,
async_apply_prewrite: self.inner.enable_async_apply_prewrite,
;
let begin_instant = Instant::now();
let res = unsafe
with_perf_context::<E, _, _>(tag, ||
task.cmd
.process_write(snapshot, context)
.map_err(StorageError::from)
)
;
SCHED_PROCESSING_READ_HISTOGRAM_STATIC
.get(tag)
.observe(begin_instant.saturating_elapsed_secs());
res
;
...
// Safety: `self.sched_pool` ensures a TLS engine exists.
unsafe
with_tls_engine(|engine: &E|
if let Err(e) =
engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb)
SCHED_STAGE_COUNTER_VEC.get(tag).async_write_err.inc();
info!("engine async_write failed"; "cid" => cid, "err" => ?e);
scheduler.finish_with_err(cid, e);
)
pub(crate) fn process_write<S: Snapshot, L: LockManager>(
self,
snapshot: S,
context: WriteContext<'_, L>,
) -> Result<WriteResult>
match self
Command::Prewrite(t) => t.process_write(snapshot, context),
Command::PrewritePessimistic(t) => t.process_write(snapshot, context),
Command::AcquirePessimisticLock(t) => t.process_write(snapshot, context),
Command::Commit(t) => t.process_write(snapshot, context),
Command::Cleanup(t) => t.process_write(snapshot, context),
Command::Rollback(t) => t.process_write(snapshot, context),
Command::PessimisticRollback(t) => t.process_write(snapshot, context),
Command::ResolveLock(t) => t.process_write(snapshot, context),
Command::ResolveLockLite(t) => t.process_write(snapshot, context),
Command::TxnHeartBeat(t) => t.process_write(snapshot, context),
Command::CheckTxnStatus(t) => t.process_write(snapshot, context),
Command::CheckSecondaryLocks(t) => t.process_write(snapshot, context),
Command::Pause(t) => t.process_write(snapshot, context),
Command::RawCompareAndSwap(t) => t.process_write(snapshot, context),
Command::RawAtomicStore(t) => t.process_write(snapshot, context),
_ => panic!("unsupported write command"),
fn async_write_ext(
&self,
ctx: &Context,
batch: WriteData,
write_cb: Callback<()>,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
) -> kv::Result<()>
fail_point!("raftkv_async_write");
if batch.modifies.is_empty()
return Err(KvError::from(KvErrorInner::EmptyRequest));
ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();
let begin_instant = Instant::now_coarse();
self.exec_write_requests(
ctx,
batch,
Box::new(move |res| match res
...
),
proposed_cb,
committed_cb,
)
.map_err(|e|
let status_kind = get_status_kind_from_error(&e);
ASYNC_REQUESTS_COUNTER_VEC.write.get(status_kind).inc();
e.into()
)
进入 raftkv::async_write_ext
函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command
的调用栈将 task 连带 callback 发送给 RaftBatchSystem 交由 RaftStore 模块处理。
fn exec_write_requests(
&self,
ctx: &Context,
batch: WriteData,
write_cb: Callback<CmdRes<E::Snapshot>>,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
) -> Result<()>
...
let cb = StoreCallback::write_ext(
Box::new(move |resp|
write_cb(on_write_result(resp).map_err(Error::into));
),
proposed_cb,
committed_cb,
);
let extra_opts = RaftCmdExtraOpts
deadline: batch.deadline,
disk_full_opt: batch.disk_full_opt,
;
self.router.send_command(cmd, cb, extra_opts)?;
Ok(())
/// Sends RaftCmdRequest to local store.
fn send_command(
&self,
req: RaftCmdRequest,
cb: Callback<EK::Snapshot>,
extra_opts: RaftCmdExtraOpts,
) -> RaftStoreResult<()>
send_command_impl::<EK, _>(self, req, cb, extra_opts)
RaftStore
直接定位到 RaftPoller
的 handle_normal
函数。
与处理 ReadIndex 请求相似, RaftPoller
会首先尝试获取 messages_per_tick
次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs
函数进行处理,
这里依然只列出了我们需要关注的几种消息类型:
- RaftMessage: 其他 Peer 发送过来 Raft 消息,包括心跳、日志、投票消息等。
- RaftCommand: 上层提出的 proposal,其中包含了需要通过 Raft 同步的操作,以及操作成功之后需要调用的 callback 函数。PreWrite 包装出的 RaftCommand 便是最正常的 proposal。
- ApplyRes: ApplyFsm 在将日志应用到状态机之后发送给 PeerFsm 的消息,用于在进行操作之后更新某些内存状态。
对于 PreWrite 请求,其会进入 PeerMsg::RaftCommand(cmd)
分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal -> Peer::propose -> Peer::propose_normal
的调用链最终被 propose 到 raft-rs 的 RawNode 接口中,同时其 callback 会连带该请求的 logIndex 被 push 到该 Peer 的 proposals
中去。
impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, StoreFsm<EK>>
for RaftPollerTiKV 源码阅读三部曲重要模块