Ceph PG创建流程源码分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ceph PG创建流程源码分析相关的知识,希望对你有一定的参考价值。

本章节主要介绍pg创建的流程,主要包括两部分,第一部分是创建pg在mon上跳转的流程。第二部分是osd端的pg创建流程。

以下源码分析以函数调用栈展开分析:

ps:以下代码以L版本为准;

一、mon节点处理

1.函数调用栈1

1.void OSDMonitor::update_from_paxos() -> PGMonitor::check_osd_map() -> void PGMapUpdater::register_new_pgs() ->void PGMapUpdater::register_pg()
void PGMonitor::check_osd_map(epoch_t epoch)
{
  if (mon->is_peon())
    return;  // whatever.
  PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);//a---
  propose_pending();//b---
}

a.PG的注册创建

b.完成推行,形成一致的PGMap

void PGMapUpdater::register_new_pgs(
    const OSDMap &osd_map,
    const PGMap &pg_map,
    PGMap::Incremental *pending_inc)
{
    for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) {//a--- 
      pg_t pgid(ps, poolid, -1);
      if (pg_map.pg_stat.count(pgid)) {
    dout(20) << "register_new_pgs  have " << pgid << dendl;
    continue;
      }
      created++;
      register_pg(osd_map, pgid, pool.get_last_change(), new_pool,pg_map, pending_inc);//b---
    }        
}

a.循环遍历当前这个pool中的所有pg。

b.pg_map中统计了所有的pg,如果发现当前的pg不再pg_map中,说明这个pg是需要被创建的。

void PGMapUpdater::register_pg(
    const OSDMap &osd_map,
    pg_t pgid, epoch_t epoch,
    bool new_pool,
    const PGMap &pg_map,
    PGMap::Incremental *pending_inc)
{
  pg_stat_t &stats = pending_inc->pg_stat_updates[pgid];//a---
  stats.state = PG_STATE_CREATING;//b---
  stats.created = epoch;
  stats.parent = parent;
  stats.parent_split_bits = split_bits;
  stats.mapping_epoch = epoch;
  osd_map.pg_to_up_acting_osds(//c---
    pgid,
    &stats.up,
    &stats.up_primary,
    &stats.acting,
    &stats.acting_primary);
}

a.将刚刚创建的pg_id统计到pending_inc.pg_stat_updates结构中

b.修改该pg的状态为PG_STATE_CREATING

c.将PG映射到OSD上,这时候需要osdmap,最终通过_pg_to_up_acting_osds()函数完成映射,这里使用了crush的计算方法。

2.函数调用栈2

2.void OSDMonitor::update_from_paxos() -> void PGMonitor::check_osd_map() -> PaxosService::propose_pending() -> Paxos::trigger_propose() -> Paxos::propose_pending() -> void Paxos::begin(bufferlist& v) 写到db,并发送到其他mon进行paxos的协议表决 ...->
void PaxosService::propose_pending()
{
  paxos->queue_pending_finisher(new C_Committed(this));
  paxos->trigger_propose();//a---
}

a.触发提议的表决

bool Paxos::trigger_propose()
{
  if (plugged) {
  ...
  } else if (is_active()) {
    dout(10) << __func__ << " active, proposing now" << dendl;
    propose_pending();//
    return true;
  } else {
  ...
  }
}

void Paxos::propose_pending()
{
  committing_finishers.swap(pending_finishers);
  state = STATE_UPDATING;//a---
  begin(bl);//b---
}

a.修改状态至STATE_UPDATING

b.开始推动决议表决

// leader
void Paxos::begin(bufferlist& v)
{
...
  // ask others to accept it too!
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;

    dout(10) << " sending begin to mon." << *p << dendl;
    MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
                     ceph_clock_now());
    begin->values[last_committed+1] = new_value;
    begin->last_committed = last_committed;
    begin->pn = accepted_pn;

    mon->messenger->send_message(begin, mon->monmap->get_inst(*p));//a---
  }
...
}

a. 发送到monmap的其他mon中进行决议

3.函数调用栈3

接下来接收到决议propose后,开始继续处理。

3.bool ms_dispatch()-> void Monitor::_ms_dispatch()-> void Monitor::dispatch_op()-> void Paxos::dispatch()-> void Paxos::handle_accept()-> Paxos::commit_start() -> get_store()->queue_transaction(t, new C_Committed(this)) -> void Paxos::commit_finish() -> bool Paxos::do_refresh() ->void Monitor::refresh_from_paxos()-> void PaxosService::refresh && void PaxosService::post_refresh()
bool ms_dispatch(Message *m) override {
    lock.Lock();
    _ms_dispatch(m);
    lock.Unlock();
    return true;
  }

void Monitor::_ms_dispatch(Message *m)
{
...
    dispatch_op(op);
...
}
void Monitor::dispatch_op(MonOpRequestRef op)
{
    switch (op->get_req()->get_type()) {
    // paxos
    case MSG_MON_PAXOS:
      ...
        paxos->dispatch(op);
      ...
      }
}

void Paxos::dispatch(MonOpRequestRef op)
{
  assert(mon->is_leader() || 
     (mon->is_peon() && m->get_source().num() == mon->get_leader()));
  switch (m->get_type()) {

  case MSG_MON_PAXOS:
    {
        case MMonPaxos::OP_ACCEPT:
        handle_accept(op);
    }    
}

// leader
void Paxos::handle_accept(MonOpRequestRef op)
{
  if (accepted == mon->get_quorum()) {
    // yay, commit!
    dout(10) << " got majority, committing, done with update" << dendl;
    op->mark_paxos_event("commit_start");
    commit_start();//go---
  }    
}

void Paxos::commit_start()
{
  get_store()->queue_transaction(t, new C_Committed(this));    
}

struct C_Committed : public Context {
  Paxos *paxos;
  explicit C_Committed(Paxos *p) : paxos(p) {}
  void finish(int r) override {
    assert(r >= 0);
    Mutex::Locker l(paxos->mon->lock);
    if (paxos->is_shutdown()) {
      paxos->abort_commit();
      return;
    }
    paxos->commit_finish();//go---
  }
};

void Paxos::commit_finish()
{
  // tell everyone 
  for (set<int>::const_iterator p = mon->get_quorum().begin();
       p != mon->get_quorum().end();
       ++p) {
    if (*p == mon->rank) continue;

    dout(10) << " sending commit to mon." << *p << dendl;
    MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
                      ceph_clock_now());//a---
    commit->values[last_committed] = new_value;
    commit->pn = accepted_pn;
    commit->last_committed = last_committed;

    mon->messenger->send_message(commit, mon->monmap->get_inst(*p));//a---
  }
   state = STATE_REFRESH;
  if (do_refresh()) {//b---
  ...
  }

}
a.此时通知monmap中的其他mon,此op已经commit,MMonPaxos::OP_COMMIT

b.开始更新相关操作

bool Paxos::do_refresh()
{
  // make sure we have the latest state loaded up
  mon->refresh_from_paxos(&need_bootstrap);
}

void Monitor::refresh_from_paxos(bool *need_bootstrap)
{
  for (int i = 0; i < PAXOS_NUM; ++i) {
    paxos_service[i]->refresh(need_bootstrap);//a---
  }
  for (int i = 0; i < PAXOS_NUM; ++i) {
    paxos_service[i]->post_refresh();//b---
  }
}

a.遍历PAXOS_NUM,进行相关refresh操作。

b.遍历PAXOS_NUM,进行相关post_refresh操作。

4.函数调用栈4

4.Monitor::refresh_from_paxos() -> PaxosService::refresh(bool *need_bootstrap) -> void PGMonitor::update_from_paxos(bool *need_bootstrap) -> apply_pgmap_delta() 
void PaxosService::refresh(bool *need_bootstrap)
{
  update_from_paxos(need_bootstrap);
}
void PGMonitor::update_from_paxos(bool *need_bootstrap)
{
    apply_pgmap_delta(bl);//a---    
}

a. 进行相关pg信息应用更新

5.函数调用栈5

5.Monitor::refresh_from_paxos() -> PaxosService::post_refresh() -> PGMonitor::post_paxos_update() -> PGMonitor::check_subs() -> PGMonitor::check_sub() -> send_pg_creates() 
void PaxosService::post_refresh()
{
  post_paxos_update();
}
void PGMonitor::post_paxos_update()
{
  if (osdmap.get_epoch()) {
    if (osdmap.get_num_up_osds() > 0) {
      assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
      check_subs();//go---
    }
  }
}

void PGMonitor::check_subs()
{
  mon->with_session_map([this, &type](const MonSessionMap& session_map) {
      if (mon->session_map.subs.count(type) == 0)
    return;

      auto p = mon->session_map.subs[type]->begin();
      while (!p.end()) {
    Subscription *sub = *p;
    ++p;
    dout(20) << __func__ << " .. " << sub->session->inst << dendl;
    check_sub(sub);//go---
      }
    });
}
bool PGMonitor::check_sub(Subscription *sub)
{
  OSDMap& osdmap = mon->osdmon()->osdmap;
  if (sub->type == "osd_pg_creates") {
    // only send these if the OSD is up.  we will check_subs() when they do
    // come up so they will get the creates then.
    if (sub->session->inst.name.is_osd() &&
        osdmap.is_up(sub->session->inst.name.num())) {
      sub->next = send_pg_creates(sub->session->inst.name.num(),
                                  sub->session->con.get(),
                                  sub->next);//go---
    }
  }
  return true;
}

epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
{
  map<int, map<epoch_t, set<pg_t> > >::iterator p = pg_map.creating_pgs_by_osd_epoch.find(osd);
  for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
       q != p->second.end();
       ++q) {
    for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
      pg_stat_t &st = pg_map.pg_stat[*r];
      if (!m)
    m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
      m->mkpg[*r] = pg_create_t(st.created,
                                st.parent,
                                st.parent_split_bits);
      // Need the create time from the monitor using its clock to set
      // last_scrub_stamp upon pg creation.
      m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
    }           
       }    
  con->send_message(m);//a---    
}

a. 开始发送创建pg消息到OSD端

二、OSD端处理

6.函数调用栈6

6.bool OSD::ms_dispatch(Message *m)-> void OSD::_dispatch(Message *m) ->void OSD::dispatch_op(OpRequestRef op) -> void OSD::handle_pg_create(OpRequestRef op) -> handle_pg_peering_evt() -> maybe_update_heartbeat_peers()
bool OSD::ms_dispatch(Message *m)
{
  _dispatch(m);
}
void OSD::_dispatch(Message *m)
{
  switch (m->get_type()) {
    case MSG_OSD_PG_CREATE:
         dispatch_op(op);
    }
}

void OSD::dispatch_op(OpRequestRef op)
{
  switch (op->get_req()->get_type()) {

  case MSG_OSD_PG_CREATE:
    handle_pg_create(op);
}

void OSD::handle_pg_create(OpRequestRef op)
{
  const MOSDPGCreate *m = static_cast<const MOSDPGCreate*>(op->get_req());//a---
  assert(m->get_type() == MSG_OSD_PG_CREATE);
  if (!require_mon_peer(op->get_req())) {//b---
    return;
  }

  if (!require_same_or_newer_map(op, m->epoch, false))//c---
    return;  
  for (map<pg_t,pg_create_t>::const_iterator p = m->mkpg.begin();p != m->mkpg.end();++p, ++ci) {//d---
    // is it still ours?
    vector<int> up, acting;
    int up_primary = -1;
    int acting_primary = -1;
    osdmap->pg_to_up_acting_osds(on, &up, &up_primary, &acting, &acting_primary);
    int role = osdmap->calc_pg_role(whoami, acting, acting.size());//e---

    if (acting_primary != whoami)//f--- {//这里只在主osd上创建pg
      dout(10) << "mkpg " << on << "  not acting_primary (" << acting_primary
           << "), my role=" << role << ", skipping" << dendl;
      continue;
    }

    spg_t pgid;
    bool mapped = osdmap->get_primary_shard(on, &pgid);//g--- 确认本osd为该PG的主osd
    assert(mapped);

    if (handle_pg_peering_evt(//h--- 处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。
          pgid,
          history,
          pi,
          osdmap->get_epoch(),
          PG::CephpeeringEvtRef(
        new PG::CephPeeringEvt(
          osdmap->get_epoch(),
          osdmap->get_epoch(),
          PG::NullEvt()))
          ) == -EEXIST) {
      service.send_pg_created(pgid.pgid);//i-- pg 已经创建了,正常创建pg不走这里
    }
       }    
  maybe_update_heartbeat_peers();//更新该PG相关的OSD的心跳列表       
}

a.把op转换为MOSDPGCreate类型的消息。

b.确保是由Monitor发送的创建消息

c.检查epoch是否一致。如果对方的epoch比自己拥有的更新,就更新自己的epoch;否则就直接拒绝该请求。

d.遍历所有要创建的mkpg map

e.查看当前osd在pg中的角色

f.这里只在主osd上创建pg, 如果不是主,就跳出循环,不在该osd上创建。其实会由其他的osd来创建。

g.确认本osd为该PG的主osd

h.处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。

i. pg 已经创建了,正常创建pg不走这里

7.函数调用栈7

7.handle_pg_peering_evt() -> void PG::_create() -> PG *OSD::_create_lock_pg() ->void PG::handle_create() 给新创建PG状态机投递事件 ->void PG::write_if_dirty()
  enum res_result {
    RES_PARENT,    // resurrected a parent
    RES_SELF,      // resurrected self
    RES_NONE       // nothing relevant deleting
  };

/*
 * look up a pg.  if we have it, great.  if not, consider creating it IF the pg mapping
 * hasn‘t changed since the given epoch and we are the primary.
 */
int OSD::handle_pg_peering_evt(
  spg_t pgid,
  const pg_history_t& orig_history,
  const PastIntervals& pi,
  epoch_t epoch,
  PG::CephPeeringEvtRef evt)
{
  PG *pg = _lookup_lock_pg(pgid);//a---
  if (!pg) //a---
  {
    res_result result = _try_resurrect_pg(//b---
      service.get_osdmap(),
      pgid,
      &resurrected,
      &old_pg_state);

    PG::RecoveryCtx rctx = create_context();
    switch (result) {
    case RES_NONE: {//c---
          PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));//d---真正的创建和初始化pg
          PG::_init(*rctx.transaction, pgid, pp);    
          pg = _create_lock_pg(//e---初始化pg
                get_map(epoch),
                pgid, false, false,
                role,
                up, up_primary,
                acting, acting_primary,
                history, pi,
                *rctx.transaction);
      pg->handle_create(&rctx);//f--- 给新创建PG状态机投递事件,PG的状态发生相应的改变
      pg->write_if_dirty(*rctx.transaction); //g--  dispatch_context(rctx, parent, osdmap);//g--
      pg->queue_peering_event(evt);//h---
      ...
      }
    case RES_SELF: {
        ...
    }

    case RES_PARENT: {
        ...
    }
  }
}

a. 查找pgid,如果没找到,说明需要创建,否则,说明已经创建了,这里主要分析没有找到的情况。

b._try_resurrect_pg 函数用于处理要操作的pg是否之前删除要恢复的,以及是否有父pg,即pg分裂相关。根据result返回值进行分类。

c.RES_NONE 类型应该是正常创建分支要走的分支,说明这个pg是第一次创建,这里也主要分析它。

d.创建和部分初始化pg

e.在该函数中,会调用pg->init(),主要初始化pg

f.给新创建PG状态机投递事件,PG的状态发生相应的改变

g.所有修改操作都打包在事务rctx.transaction中,调用函数dispatch_context将事务提交到本地对象存储中

h.把创建的pg事件加入queue,下一步进行peering。

总结:

以上讲述的是PG在主OSD上的创建流程。Monitor并不会给PG的从OSD发送消息来创建该PG, 而是由该主OSD上的PG在Peering过程中创建。主OSD给从OSD的PG状态机投递事件时,在函数handle_pg_peering_evt中,如果发现该PG不存在,才完成创建该PG。

函数handle_pg_peering_evt是处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。

参考链接:
https://ivanzz1001.github.io/records/post/ceph/2018/12/11/ceph-data-consistency

以上是关于Ceph PG创建流程源码分析的主要内容,如果未能解决你的问题,请参考以下文章

ceph pg degraded#yyds干货盘点#

Ceph源码解析:PG peering

ceph scrub

ceph scrub

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

Ceph Rados io流程处理