Ceph PG创建流程源码分析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ceph PG创建流程源码分析相关的知识,希望对你有一定的参考价值。
本章节主要介绍pg创建的流程,主要包括两部分,第一部分是创建pg在mon上跳转的流程。第二部分是osd端的pg创建流程。
以下源码分析以函数调用栈展开分析:
ps:以下代码以L版本为准;
一、mon节点处理
1.函数调用栈1
1.void OSDMonitor::update_from_paxos() -> PGMonitor::check_osd_map() -> void PGMapUpdater::register_new_pgs() ->void PGMapUpdater::register_pg()
void PGMonitor::check_osd_map(epoch_t epoch)
{
if (mon->is_peon())
return; // whatever.
PGMapUpdater::register_new_pgs(osdmap, pg_map, &pending_inc);//a---
propose_pending();//b---
}
a.PG的注册创建
b.完成推行,形成一致的PGMap
void PGMapUpdater::register_new_pgs(
const OSDMap &osd_map,
const PGMap &pg_map,
PGMap::Incremental *pending_inc)
{
for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) {//a---
pg_t pgid(ps, poolid, -1);
if (pg_map.pg_stat.count(pgid)) {
dout(20) << "register_new_pgs have " << pgid << dendl;
continue;
}
created++;
register_pg(osd_map, pgid, pool.get_last_change(), new_pool,pg_map, pending_inc);//b---
}
}
a.循环遍历当前这个pool中的所有pg。
b.pg_map中统计了所有的pg,如果发现当前的pg不再pg_map中,说明这个pg是需要被创建的。
void PGMapUpdater::register_pg(
const OSDMap &osd_map,
pg_t pgid, epoch_t epoch,
bool new_pool,
const PGMap &pg_map,
PGMap::Incremental *pending_inc)
{
pg_stat_t &stats = pending_inc->pg_stat_updates[pgid];//a---
stats.state = PG_STATE_CREATING;//b---
stats.created = epoch;
stats.parent = parent;
stats.parent_split_bits = split_bits;
stats.mapping_epoch = epoch;
osd_map.pg_to_up_acting_osds(//c---
pgid,
&stats.up,
&stats.up_primary,
&stats.acting,
&stats.acting_primary);
}
a.将刚刚创建的pg_id统计到pending_inc.pg_stat_updates结构中
b.修改该pg的状态为PG_STATE_CREATING
c.将PG映射到OSD上,这时候需要osdmap,最终通过_pg_to_up_acting_osds()函数完成映射,这里使用了crush的计算方法。
2.函数调用栈2
2.void OSDMonitor::update_from_paxos() -> void PGMonitor::check_osd_map() -> PaxosService::propose_pending() -> Paxos::trigger_propose() -> Paxos::propose_pending() -> void Paxos::begin(bufferlist& v) 写到db,并发送到其他mon进行paxos的协议表决 ...->
void PaxosService::propose_pending()
{
paxos->queue_pending_finisher(new C_Committed(this));
paxos->trigger_propose();//a---
}
a.触发提议的表决
bool Paxos::trigger_propose()
{
if (plugged) {
...
} else if (is_active()) {
dout(10) << __func__ << " active, proposing now" << dendl;
propose_pending();//
return true;
} else {
...
}
}
void Paxos::propose_pending()
{
committing_finishers.swap(pending_finishers);
state = STATE_UPDATING;//a---
begin(bl);//b---
}
a.修改状态至STATE_UPDATING
b.开始推动决议表决
// leader
void Paxos::begin(bufferlist& v)
{
...
// ask others to accept it too!
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue;
dout(10) << " sending begin to mon." << *p << dendl;
MMonPaxos *begin = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_BEGIN,
ceph_clock_now());
begin->values[last_committed+1] = new_value;
begin->last_committed = last_committed;
begin->pn = accepted_pn;
mon->messenger->send_message(begin, mon->monmap->get_inst(*p));//a---
}
...
}
a. 发送到monmap的其他mon中进行决议
3.函数调用栈3
接下来接收到决议propose后,开始继续处理。
3.bool ms_dispatch()-> void Monitor::_ms_dispatch()-> void Monitor::dispatch_op()-> void Paxos::dispatch()-> void Paxos::handle_accept()-> Paxos::commit_start() -> get_store()->queue_transaction(t, new C_Committed(this)) -> void Paxos::commit_finish() -> bool Paxos::do_refresh() ->void Monitor::refresh_from_paxos()-> void PaxosService::refresh && void PaxosService::post_refresh()
bool ms_dispatch(Message *m) override {
lock.Lock();
_ms_dispatch(m);
lock.Unlock();
return true;
}
void Monitor::_ms_dispatch(Message *m)
{
...
dispatch_op(op);
...
}
void Monitor::dispatch_op(MonOpRequestRef op)
{
switch (op->get_req()->get_type()) {
// paxos
case MSG_MON_PAXOS:
...
paxos->dispatch(op);
...
}
}
void Paxos::dispatch(MonOpRequestRef op)
{
assert(mon->is_leader() ||
(mon->is_peon() && m->get_source().num() == mon->get_leader()));
switch (m->get_type()) {
case MSG_MON_PAXOS:
{
case MMonPaxos::OP_ACCEPT:
handle_accept(op);
}
}
// leader
void Paxos::handle_accept(MonOpRequestRef op)
{
if (accepted == mon->get_quorum()) {
// yay, commit!
dout(10) << " got majority, committing, done with update" << dendl;
op->mark_paxos_event("commit_start");
commit_start();//go---
}
}
void Paxos::commit_start()
{
get_store()->queue_transaction(t, new C_Committed(this));
}
struct C_Committed : public Context {
Paxos *paxos;
explicit C_Committed(Paxos *p) : paxos(p) {}
void finish(int r) override {
assert(r >= 0);
Mutex::Locker l(paxos->mon->lock);
if (paxos->is_shutdown()) {
paxos->abort_commit();
return;
}
paxos->commit_finish();//go---
}
};
void Paxos::commit_finish()
{
// tell everyone
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
if (*p == mon->rank) continue;
dout(10) << " sending commit to mon." << *p << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_COMMIT,
ceph_clock_now());//a---
commit->values[last_committed] = new_value;
commit->pn = accepted_pn;
commit->last_committed = last_committed;
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));//a---
}
state = STATE_REFRESH;
if (do_refresh()) {//b---
...
}
}
a.此时通知monmap中的其他mon,此op已经commit,MMonPaxos::OP_COMMIT
b.开始更新相关操作
bool Paxos::do_refresh()
{
// make sure we have the latest state loaded up
mon->refresh_from_paxos(&need_bootstrap);
}
void Monitor::refresh_from_paxos(bool *need_bootstrap)
{
for (int i = 0; i < PAXOS_NUM; ++i) {
paxos_service[i]->refresh(need_bootstrap);//a---
}
for (int i = 0; i < PAXOS_NUM; ++i) {
paxos_service[i]->post_refresh();//b---
}
}
a.遍历PAXOS_NUM,进行相关refresh操作。
b.遍历PAXOS_NUM,进行相关post_refresh操作。
4.函数调用栈4
4.Monitor::refresh_from_paxos() -> PaxosService::refresh(bool *need_bootstrap) -> void PGMonitor::update_from_paxos(bool *need_bootstrap) -> apply_pgmap_delta()
void PaxosService::refresh(bool *need_bootstrap)
{
update_from_paxos(need_bootstrap);
}
void PGMonitor::update_from_paxos(bool *need_bootstrap)
{
apply_pgmap_delta(bl);//a---
}
a. 进行相关pg信息应用更新
5.函数调用栈5
5.Monitor::refresh_from_paxos() -> PaxosService::post_refresh() -> PGMonitor::post_paxos_update() -> PGMonitor::check_subs() -> PGMonitor::check_sub() -> send_pg_creates()
void PaxosService::post_refresh()
{
post_paxos_update();
}
void PGMonitor::post_paxos_update()
{
if (osdmap.get_epoch()) {
if (osdmap.get_num_up_osds() > 0) {
assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
check_subs();//go---
}
}
}
void PGMonitor::check_subs()
{
mon->with_session_map([this, &type](const MonSessionMap& session_map) {
if (mon->session_map.subs.count(type) == 0)
return;
auto p = mon->session_map.subs[type]->begin();
while (!p.end()) {
Subscription *sub = *p;
++p;
dout(20) << __func__ << " .. " << sub->session->inst << dendl;
check_sub(sub);//go---
}
});
}
bool PGMonitor::check_sub(Subscription *sub)
{
OSDMap& osdmap = mon->osdmon()->osdmap;
if (sub->type == "osd_pg_creates") {
// only send these if the OSD is up. we will check_subs() when they do
// come up so they will get the creates then.
if (sub->session->inst.name.is_osd() &&
osdmap.is_up(sub->session->inst.name.num())) {
sub->next = send_pg_creates(sub->session->inst.name.num(),
sub->session->con.get(),
sub->next);//go---
}
}
return true;
}
epoch_t PGMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
{
map<int, map<epoch_t, set<pg_t> > >::iterator p = pg_map.creating_pgs_by_osd_epoch.find(osd);
for (map<epoch_t, set<pg_t> >::iterator q = p->second.lower_bound(next);
q != p->second.end();
++q) {
for (set<pg_t>::iterator r = q->second.begin(); r != q->second.end(); ++r) {
pg_stat_t &st = pg_map.pg_stat[*r];
if (!m)
m = new MOSDPGCreate(pg_map.last_osdmap_epoch);
m->mkpg[*r] = pg_create_t(st.created,
st.parent,
st.parent_split_bits);
// Need the create time from the monitor using its clock to set
// last_scrub_stamp upon pg creation.
m->ctimes[*r] = pg_map.pg_stat[*r].last_scrub_stamp;
}
}
con->send_message(m);//a---
}
a. 开始发送创建pg消息到OSD端
二、OSD端处理
6.函数调用栈6
6.bool OSD::ms_dispatch(Message *m)-> void OSD::_dispatch(Message *m) ->void OSD::dispatch_op(OpRequestRef op) -> void OSD::handle_pg_create(OpRequestRef op) -> handle_pg_peering_evt() -> maybe_update_heartbeat_peers()
bool OSD::ms_dispatch(Message *m)
{
_dispatch(m);
}
void OSD::_dispatch(Message *m)
{
switch (m->get_type()) {
case MSG_OSD_PG_CREATE:
dispatch_op(op);
}
}
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
}
void OSD::handle_pg_create(OpRequestRef op)
{
const MOSDPGCreate *m = static_cast<const MOSDPGCreate*>(op->get_req());//a---
assert(m->get_type() == MSG_OSD_PG_CREATE);
if (!require_mon_peer(op->get_req())) {//b---
return;
}
if (!require_same_or_newer_map(op, m->epoch, false))//c---
return;
for (map<pg_t,pg_create_t>::const_iterator p = m->mkpg.begin();p != m->mkpg.end();++p, ++ci) {//d---
// is it still ours?
vector<int> up, acting;
int up_primary = -1;
int acting_primary = -1;
osdmap->pg_to_up_acting_osds(on, &up, &up_primary, &acting, &acting_primary);
int role = osdmap->calc_pg_role(whoami, acting, acting.size());//e---
if (acting_primary != whoami)//f--- {//这里只在主osd上创建pg
dout(10) << "mkpg " << on << " not acting_primary (" << acting_primary
<< "), my role=" << role << ", skipping" << dendl;
continue;
}
spg_t pgid;
bool mapped = osdmap->get_primary_shard(on, &pgid);//g--- 确认本osd为该PG的主osd
assert(mapped);
if (handle_pg_peering_evt(//h--- 处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。
pgid,
history,
pi,
osdmap->get_epoch(),
PG::CephpeeringEvtRef(
new PG::CephPeeringEvt(
osdmap->get_epoch(),
osdmap->get_epoch(),
PG::NullEvt()))
) == -EEXIST) {
service.send_pg_created(pgid.pgid);//i-- pg 已经创建了,正常创建pg不走这里
}
}
maybe_update_heartbeat_peers();//更新该PG相关的OSD的心跳列表
}
a.把op转换为MOSDPGCreate类型的消息。
b.确保是由Monitor发送的创建消息
c.检查epoch是否一致。如果对方的epoch比自己拥有的更新,就更新自己的epoch;否则就直接拒绝该请求。
d.遍历所有要创建的mkpg map
e.查看当前osd在pg中的角色
f.这里只在主osd上创建pg, 如果不是主,就跳出循环,不在该osd上创建。其实会由其他的osd来创建。
g.确认本osd为该PG的主osd
h.处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。
i. pg 已经创建了,正常创建pg不走这里
7.函数调用栈7
7.handle_pg_peering_evt() -> void PG::_create() -> PG *OSD::_create_lock_pg() ->void PG::handle_create() 给新创建PG状态机投递事件 ->void PG::write_if_dirty()
enum res_result {
RES_PARENT, // resurrected a parent
RES_SELF, // resurrected self
RES_NONE // nothing relevant deleting
};
/*
* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
* hasn‘t changed since the given epoch and we are the primary.
*/
int OSD::handle_pg_peering_evt(
spg_t pgid,
const pg_history_t& orig_history,
const PastIntervals& pi,
epoch_t epoch,
PG::CephPeeringEvtRef evt)
{
PG *pg = _lookup_lock_pg(pgid);//a---
if (!pg) //a---
{
res_result result = _try_resurrect_pg(//b---
service.get_osdmap(),
pgid,
&resurrected,
&old_pg_state);
PG::RecoveryCtx rctx = create_context();
switch (result) {
case RES_NONE: {//c---
PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));//d---真正的创建和初始化pg
PG::_init(*rctx.transaction, pgid, pp);
pg = _create_lock_pg(//e---初始化pg
get_map(epoch),
pgid, false, false,
role,
up, up_primary,
acting, acting_primary,
history, pi,
*rctx.transaction);
pg->handle_create(&rctx);//f--- 给新创建PG状态机投递事件,PG的状态发生相应的改变
pg->write_if_dirty(*rctx.transaction); //g-- dispatch_context(rctx, parent, osdmap);//g--
pg->queue_peering_event(evt);//h---
...
}
case RES_SELF: {
...
}
case RES_PARENT: {
...
}
}
}
a. 查找pgid,如果没找到,说明需要创建,否则,说明已经创建了,这里主要分析没有找到的情况。
b._try_resurrect_pg 函数用于处理要操作的pg是否之前删除要恢复的,以及是否有父pg,即pg分裂相关。根据result返回值进行分类。
c.RES_NONE 类型应该是正常创建分支要走的分支,说明这个pg是第一次创建,这里也主要分析它。
d.创建和部分初始化pg
e.在该函数中,会调用pg->init(),主要初始化pg
f.给新创建PG状态机投递事件,PG的状态发生相应的改变
g.所有修改操作都打包在事务rctx.transaction中,调用函数dispatch_context将事务提交到本地对象存储中
h.把创建的pg事件加入queue,下一步进行peering。
总结:
以上讲述的是PG在主OSD上的创建流程。Monitor并不会给PG的从OSD发送消息来创建该PG, 而是由该主OSD上的PG在Peering过程中创建。主OSD给从OSD的PG状态机投递事件时,在函数handle_pg_peering_evt中,如果发现该PG不存在,才完成创建该PG。
函数handle_pg_peering_evt是处理Peering状态机事件的入口。该函数会查找相应的PG,如果该PG不存在,就创建该PG。该PG的状态机进入RecoveryMachine/Stray状态。
参考链接:
https://ivanzz1001.github.io/records/post/ceph/2018/12/11/ceph-data-consistency
以上是关于Ceph PG创建流程源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段