TiKV 源码阅读三部曲写流程

Posted TiDB_PingCAP

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TiKV 源码阅读三部曲写流程相关的知识,希望对你有一定的参考价值。

背景

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。

作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。

笔者将结合TiKV 官方源码解析文档 系列文章,基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:

希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

本文为第三篇博客,将主要介绍 TiKV 中一条写请求的全链路流程。

写流程

以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。

本小节将在 TiKV 6.1 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程。

KVService

在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 Storage::sched_txn_command 函数中。

impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
    for Service<T, E, L, F>

    handle_request!(
        kv_prewrite,
        future_prewrite,
        PrewriteRequest,
        PrewriteResponse,
        has_time_detail
    );
 

txn_command_future!(future_prewrite, PrewriteRequest, PrewriteResponse, (v, resp, tracker) 
    if let Ok(v) = &v 
        resp.set_min_commit_ts(v.min_commit_ts.into_inner());
        resp.set_one_pc_commit_ts(v.one_pc_commit_ts.into_inner());
        GLOBAL_TRACKERS.with_tracker(tracker, |tracker| 
            tracker.write_scan_detail(resp.mut_exec_details_v2().mut_scan_detail_v2());
            tracker.write_write_detail(resp.mut_exec_details_v2().mut_write_detail());
        );
    
    resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());
);

Storage

在 Storage 模块,其会将请求路由到 Scheduler::run_cmd 函数中,并进一步路由到 Scheduler::schedule_command 函数中。在 schedule_command 函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。

// The entry point of the storage scheduler. Not only transaction commands need
// to access keys serially.
pub fn sched_txn_command<T: StorageCallbackType>(
    &self,
    cmd: TypedCommand<T>,
    callback: Callback<T>,
) -> Result<()> 

    ...
    
    self.sched.run_cmd(cmd, T::callback(callback));

    Ok(())


pub(in crate::storage) fn run_cmd(&self, cmd: Command, callback: StorageCallback) 
    // write flow control
    if cmd.need_flow_control() && self.inner.too_busy(cmd.ctx().region_id) 
        SCHED_TOO_BUSY_COUNTER_VEC.get(cmd.tag()).inc();
        callback.execute(ProcessResult::Failed 
            err: StorageError::from(StorageErrorInner::SchedTooBusy),
        );
        return;
    
    self.schedule_command(cmd, callback);


fn schedule_command(&self, cmd: Command, callback: StorageCallback) 
    let cid = self.inner.gen_id();
    let tracker = get_tls_tracker_token();
    debug!("received new command"; "cid" => cid, "cmd" => ?cmd, "tracker" => ?tracker);
    let tag = cmd.tag();
    let priority_tag = get_priority_tag(cmd.priority());
    SCHED_STAGE_COUNTER_VEC.get(tag).new.inc();
    SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC
        .get(priority_tag)
        .inc();

    let mut task_slot = self.inner.get_task_slot(cid);
    let tctx = task_slot.entry(cid).or_insert_with(|| 
        self.inner
            .new_task_context(Task::new(cid, tracker, cmd), callback)
    );

    if self.inner.latches.acquire(&mut tctx.lock, cid) 
        fail_point!("txn_scheduler_acquire_success");
        tctx.on_schedule();
        let task = tctx.task.take().unwrap();
        drop(task_slot);
        self.execute(task);
        return;
    
    let task = tctx.task.as_ref().unwrap();
    let deadline = task.cmd.deadline();
    let cmd_ctx = task.cmd.ctx().clone();
    self.fail_fast_or_check_deadline(cid, tag, cmd_ctx, deadline);
    fail_point!("txn_scheduler_acquire_fail");

在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:

  • 使用 Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await 获取 snapshot。此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述
  • 使用 sched.process(snapshot, task).await 基于获取到的 snapshot 和对应 task 去调用 scheduler::process 函数,进而被路由到 scheduler::process_write 函数中
/// Executes the task in the sched pool.
fn execute(&self, mut task: Task) 
    set_tls_tracker_token(task.tracker);
    let sched = self.clone();
    self.get_sched_pool(task.cmd.priority())
        .pool
        .spawn(async move 
        
            ...

            // The program is currently in scheduler worker threads.
            // Safety: `self.inner.worker_pool` should ensure that a TLS engine exists.
            match unsafe  with_tls_engine(|engine: &E| kv::snapshot(engine, snap_ctx)) .await
            
                Ok(snapshot) => 
              
                    ...

                    sched.process(snapshot, task).await;
                
                Err(err) => 
                    ...
                
            
        )
        .unwrap();


 /// Process the task in the current thread.
async fn process(self, snapshot: E::Snap, task: Task) 
    if self.check_task_deadline_exceeded(&task) 
        return;
    

    let resource_tag = self.inner.resource_tag_factory.new_tag(task.cmd.ctx());
    async 
        
        ...

        if task.cmd.readonly() 
            self.process_read(snapshot, task, &mut statistics);
         else 
            self.process_write(snapshot, task, &mut statistics).await;
        ;
   
        ...
    
    .in_resource_metering_tag(resource_tag)
    .await;

scheduler::process_write 函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:

  • 使用 task.cmd.process_write(snapshot, context).map_err(StorageError::from) 根据 snapshot 和 task 执行事务对应的语义:可以从 Command::process_write 函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据。有关 PreWrite 和其他请求的具体操作可以参照 TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述。需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改。
  • 使用 engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb) 将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 apply
async fn process_write(self, snapshot: E::Snap, task: Task, statistics: &mut Statistics) 
 
    ...

    let write_result = 
        let _guard = sample.observe_cpu();
        let context = WriteContext 
            lock_mgr: &self.inner.lock_mgr,
            concurrency_manager: self.inner.concurrency_manager.clone(),
            extra_op: task.extra_op,
            statistics,
            async_apply_prewrite: self.inner.enable_async_apply_prewrite,
        ;
        let begin_instant = Instant::now();
        let res = unsafe 
            with_perf_context::<E, _, _>(tag, || 
                task.cmd
                    .process_write(snapshot, context)
                    .map_err(StorageError::from)
            )
        ;
        SCHED_PROCESSING_READ_HISTOGRAM_STATIC
            .get(tag)
            .observe(begin_instant.saturating_elapsed_secs());
        res
    ;

    ...

    // Safety: `self.sched_pool` ensures a TLS engine exists.
    unsafe 
        with_tls_engine(|engine: &E| 
            if let Err(e) =
                engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb)
            
                SCHED_STAGE_COUNTER_VEC.get(tag).async_write_err.inc();

                info!("engine async_write failed"; "cid" => cid, "err" => ?e);
                scheduler.finish_with_err(cid, e);
            
        )
    


pub(crate) fn process_write<S: Snapshot, L: LockManager>(
    self,
    snapshot: S,
    context: WriteContext<'_, L>,
) -> Result<WriteResult> 
    match self 
        Command::Prewrite(t) => t.process_write(snapshot, context),
        Command::PrewritePessimistic(t) => t.process_write(snapshot, context),
        Command::AcquirePessimisticLock(t) => t.process_write(snapshot, context),
        Command::Commit(t) => t.process_write(snapshot, context),
        Command::Cleanup(t) => t.process_write(snapshot, context),
        Command::Rollback(t) => t.process_write(snapshot, context),
        Command::PessimisticRollback(t) => t.process_write(snapshot, context),
        Command::ResolveLock(t) => t.process_write(snapshot, context),
        Command::ResolveLockLite(t) => t.process_write(snapshot, context),
        Command::TxnHeartBeat(t) => t.process_write(snapshot, context),
        Command::CheckTxnStatus(t) => t.process_write(snapshot, context),
        Command::CheckSecondaryLocks(t) => t.process_write(snapshot, context),
        Command::Pause(t) => t.process_write(snapshot, context),
        Command::RawCompareAndSwap(t) => t.process_write(snapshot, context),
        Command::RawAtomicStore(t) => t.process_write(snapshot, context),
        _ => panic!("unsupported write command"),
    


fn async_write_ext(
    &self,
    ctx: &Context,
    batch: WriteData,
    write_cb: Callback<()>,
    proposed_cb: Option<ExtCallback>,
    committed_cb: Option<ExtCallback>,
) -> kv::Result<()> 
    fail_point!("raftkv_async_write");
    if batch.modifies.is_empty() 
        return Err(KvError::from(KvErrorInner::EmptyRequest));
    

    ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();
    let begin_instant = Instant::now_coarse();

    self.exec_write_requests(
        ctx,
        batch,
        Box::new(move |res| match res 

            ...

        ),
        proposed_cb,
        committed_cb,
    )
    .map_err(|e| 
        let status_kind = get_status_kind_from_error(&e);
        ASYNC_REQUESTS_COUNTER_VEC.write.get(status_kind).inc();
        e.into()
    )

进入 raftkv::async_write_ext 函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command 的调用栈将 task 连带 callback 发送给 RaftBatchSystem 交由 RaftStore 模块处理。

fn exec_write_requests(
    &self,
    ctx: &Context,
    batch: WriteData,
    write_cb: Callback<CmdRes<E::Snapshot>>,
    proposed_cb: Option<ExtCallback>,
    committed_cb: Option<ExtCallback>,
) -> Result<()> 
    
    ...

    let cb = StoreCallback::write_ext(
        Box::new(move |resp| 
            write_cb(on_write_result(resp).map_err(Error::into));
        ),
        proposed_cb,
        committed_cb,
    );
    let extra_opts = RaftCmdExtraOpts 
        deadline: batch.deadline,
        disk_full_opt: batch.disk_full_opt,
    ;
    self.router.send_command(cmd, cb, extra_opts)?;

    Ok(())


    /// Sends RaftCmdRequest to local store.
fn send_command(
    &self,
    req: RaftCmdRequest,
    cb: Callback<EK::Snapshot>,
    extra_opts: RaftCmdExtraOpts,
) -> RaftStoreResult<()> 
    send_command_impl::<EK, _>(self, req, cb, extra_opts)

RaftStore

直接定位到 RaftPollerhandle_normal 函数。

与处理 ReadIndex 请求相似, RaftPoller 会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs 函数进行处理,

这里依然只列出了我们需要关注的几种消息类型:

  • RaftMessage: 其他 Peer 发送过来 Raft 消息,包括心跳、日志、投票消息等。
  • RaftCommand: 上层提出的 proposal,其中包含了需要通过 Raft 同步的操作,以及操作成功之后需要调用的 callback 函数。PreWrite 包装出的 RaftCommand 便是最正常的 proposal。
  • ApplyRes: ApplyFsm 在将日志应用到状态机之后发送给 PeerFsm 的消息,用于在进行操作之后更新某些内存状态。

对于 PreWrite 请求,其会进入 PeerMsg::RaftCommand(cmd) 分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal -> Peer::propose -> Peer::propose_normal 的调用链最终被 propose 到 raft-rs 的 RawNode 接口中,同时其 callback 会连带该请求的 logIndex 被 push 到该 Peer 的 proposals 中去。

impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, StoreFsm<EK>>
    for RaftPollerTiKV 源码阅读三部曲重要模块

TiKV 源码解析系列 - Raft 的优化

TiCDC 源码阅读TiKV CDC 模块介绍

TiCDC 源码阅读TiKV CDC 模块介绍

自定义spring boot starter三部曲之三:源码分析spring.factories加载过程

TiFlash 源码阅读 TiFlash 存储层概览