RDB Journal特性研究

Posted powerrailgun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDB Journal特性研究相关的知识,希望对你有一定的参考价值。

1. journaling 相关的pull request

librbd: integrate journaling support for IO operations #6541 (https://github.com/ceph/ceph/pull/6541) ()
librbd: integrate journaling for maintenance operations #6625 (https://github.com/ceph/ceph/pull/6625)(对于维护性操作例如resize, snapshots等放到journal中)

2. 写流程里面讲write op写到journal里面

https://github.com/ceph/ceph/pull/6625/commits/cd3d056cb36235e9766c8822ab2168affedd56ef

3. 流程

3.1 开启journal涉及的流程

#rbd feature enable Foo/Bar journaling

下发enable journaling的请求。

  if (enabled) {
    operation::EnableFeaturesRequest<I> *req =
      new operation::EnableFeaturesRequest<I>(
        m_image_ctx, on_finish, journal_op_tid, features);
    req->send();

关注EnableFeaturesRequest这个类。
这个类里面清楚的说明了流程:

  * @verbatim
   *
   * <start>
   *    |
   *    v
   * STATE_PREPARE_LOCK  (send_prepare_lock()给image加锁)《1》
   *    |
   *    v
   * STATE_BLOCK_WRITES   ()                             《2》阻塞写入。
   *    |
   *    v
   * STATE_GET_MIRROR_MODE (获取image mirror mode,send_get_mirror_mode(),主要是对于pool的看是pool还是image。)在handle_get_mirror_mode中做一些必要的检测:exclusive-lock,object-map,fast-diff这些feature都要提前打开。《3》

   *    |
   *    v
   * STATE_CREATE_JOURNAL (skip if not                        《4》
   *    |                  required)
   *    v
   * STATE_APPEND_OP_EVENT (skip if journaling                 《5》  
   *    |                   disabled)
   *    v
   * STATE_UPDATE_FLAGS
   *    |
   *    v
   * STATE_SET_FEATURES
   *    |
   *    v
   * STATE_CREATE_OBJECT_MAP (skip if not
   *    |                     required)
   *    v
   * STATE_ENABLE_MIRROR_IMAGE
   *    |
   *    V
   * STATE_NOTIFY_UPDATE
   *    |
   *    | (unblock writes)
   *    v
   * <finish>
   * @endverbatim

《4》:journal/CreateRequest.cc

  1. 获取pool的IoCtx. journal对象保存到具体的pool里面。
  2. create_journal函数
 void CreateRequest<I>::create_journal() {
  ldout(m_cct, 20) << this << " " << __func__ << dendl;

  ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
  m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
                              m_ioctx, m_image_id, m_image_client_id, {});

  using klass = CreateRequest<I>;
  Context *ctx = create_context_callback<klass, &klass::handle_create_journal>(this);

  m_journaler->create(m_order, m_splay_width, m_pool_id, ctx);
}

在pool中创建journal metadata对象。
分配tag,tag是一个类似于同步点的东西,每当客户端打开了image,就会分配一个tag,使用tag是为了保证在promote之前不能用于写数据。
使用上一步分配的tag,注册一个client。
接下来就是结束后的清理工作。
总结:feature enable操作完成后,在image所在的pool里面,会多出一个journal.<image_id> 的对象,这个对象用于保存journal metadata。

3.2 journal对于write Op路径的影响

对于启动了journal feature的image,如果有write操作,相比于没有journal feature会有如下差别:
主要是看void AbstractImageWriteRequest::send_request()这个函数,这个函数会检测image是否开启了journal。

void AbstractImageWriteRequest<I>::send_request(){
  bool journaling = false;
  journaling = (image_ctx.journal != nullptr &&
                image_ctx.journal->is_journal_appending());
  send_object_requests(object_extents, snapc,
                         (journaling ? &requests : nullptr)); //这里如果开启了journaling,那么request会被加入到队列暂存。否则就会被发送出去。

  if (journaling) {
      // in-flight ops are flushed prior to closing the journal
      assert(image_ctx.journal != NULL);
      journal_tid = append_journal_event(requests, m_synchronous);
  }
}

具体实现关注:append_journal_event()函数。

append_journal_event
  ->append_write_event
    ->append_io_events

最终调用到Journal<I>::append_io_events中,从该函数我们可以看到:
1.每次写是以事件的形式来被记录的
2.每次写会以Future的形式来进入到journal中
3.在Journal落盘的时候,通过调用C_IOEventSafe来完成真正的数据落盘
不仅仅是写,只要有更新io产生就会有Journal数据产生,具体的事件全集可以参见:
C_IOEventSafe的回调函数,void Journal::handle_io_event_safe(int r, uint64_t tid)会继续讲原始request发送出去,完成原始request的落盘。

3.3 journal 回放

在rbd-mirror中:

m_replay_handler = new ReplayHandler<I>(this);
m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
342 void Journaler::start_live_replay(ReplayHandler *replay_handler,
343                                   double interval) {
344   create_player(replay_handler);
345   m_player->prefetch_and_watch(interval);
346 }
 void JournalPlayer::fetch(uint64_t object_num) {
624   ceph_assert(ceph_mutex_is_locked(m_lock));
625
626   auto object_player = ceph::make_ref<ObjectPlayer>(
627     m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
628     m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
629     m_max_fetch_bytes);
630
631   auto splay_width = m_journal_metadata->get_splay_width();
632   m_object_players[object_num % splay_width] = object_player;
633   fetch(object_player);
634 }
 void JournalPlayer::fetch(const ceph::ref_t<ObjectPlayer> &object_player) {
637   ceph_assert(ceph_mutex_is_locked(m_lock));
638
639   uint64_t object_num = object_player->get_object_number();
640   std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
641   ceph_assert(m_fetch_object_numbers.count(object_num) == 0);
642   m_fetch_object_numbers.insert(object_num);
643
644   ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
645   C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
646
647   object_player->fetch(fetch_ctx);
648 }
 void ObjectPlayer::fetch(Context *on_finish) {
 45   ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
 46
 47   if(m_bps_throttle != nullptr) {
 48     if(m_bps_throttle->get<ObjectPlayer, Context,
 49       &ObjectPlayer::handle_bps_throttle_ready>(m_max_fetch_bytes, this, on_finish)) {
 50       return;
 51     }
 52   }
 53
 54   handle_bps_throttle_ready(0, on_finish);
 55 }
 56
 57 void ObjectPlayer::handle_bps_throttle_ready(int r,  Context *on_finish) {
 58   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
 59
 60   Mutex::Locker locker(m_lock);
 61   assert(!m_fetch_in_progress);
 62   m_fetch_in_progress = true;
 63
 64   C_Fetch *context = new C_Fetch(this, on_finish);
 65   librados::ObjectReadOperation op;
 66   op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
 67   op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
 68
 69   librados::AioCompletion *rados_completion =
 70     librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
 71                                            NULL);
 72   r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
 73   assert(r == 0);
 74   rados_completion->release();
 75 }
ObjectPlyer::fetch读取object,然后再回调C_Fetch的finish中的handle_fetched()函数。如下:
 void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
631   ldout(m_cct, 10) << __func__ << ": "
632                    << utils::get_object_name(m_object_oid_prefix, object_num)
633                    << ": r=" << r << dendl;
634
635   Mutex::Locker locker(m_lock);
636   assert(m_fetch_object_numbers.count(object_num) == 1);
637   m_fetch_object_numbers.erase(object_num);
638
639   if (m_shut_down) {
640     return;
641   }
642
643   if (r == 0) {
644     ObjectPlayerPtr object_player = get_object_player(object_num);
645     remove_empty_object_player(object_player);
646   }
647   process_state(object_num, r);
648 }

这里的调用链如下:

  process_prefetch
    ->notify_entries_available
      ->replay_handler->handle_entries_available();
        ->journal->handle_replay_ready();
          ->m_journal_replay->process(event_entry, on_ready, on_commit);//void Replay<I>::process(const EventEntry &event_entry)
          到这里会调用EventVisitor中完成的这里,这里根据event的类型将通过replay->handle_event来处理。
          即就是:
          boost::apply_visitor(EventVisitor(this, on_ready, on_safe),event_entry.event);
          这里会根据event的类型选择不同的handle_event函数。

例如对于write event:

template <typename I>
void Replay<I>::handle_event(const journal::AioWriteEvent &event,
                             Context *on_ready, Context *on_safe) {
  CephContext *cct = m_image_ctx.cct;
  ldout(cct, 20) << ": AIO write event" << dendl;

  bufferlist data = event.data;
  bool flush_required;
  auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
                                               io::AIO_TYPE_WRITE,
                                               &flush_required,
                                               {});
  if (aio_comp == nullptr) {
    return;
  }

  io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
                                 {{event.offset, event.length}},
                                 std::move(data), 0, {});
  if (flush_required) {
    m_lock.Lock();
    auto flush_comp = create_aio_flush_completion(nullptr);
    m_lock.Unlock();

    if (flush_comp != nullptr) {
      io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
    }
  }
}

回放结束。

Journal数据的删除

(参考了别人的说法,具体没有去看)
在看代码的时候,一直有一个疑惑,本地写rbd的时候,有一个Jounaler。rbd mirror进行同步的时候也有一个Journaler。那么Journal数据是怎么删除的?是在本地的Journaler删除的吗?还是在rbd mirror中删除的?一直困惑不已。
后来仔细看了一下JournalTrimmer中的代码,才发现原来这里面还用到了Listener。就是在最开时候的时候,会注册多个client到journal,当journal的元数据发生变化的话会notify给所有的client,这样各个client就能够得知metadata变化了,在调用相应的更新函数,具体可以参见JournalMetadata::handle_refresh_complete中的这一小段核心逻辑:















以上是关于RDB Journal特性研究的主要内容,如果未能解决你的问题,请参考以下文章

有啥方法可以将 sd_journal_send 重定向到 stdout 或 stderr?

Redis的RDB和AOF

js代码片段

译ECMAScript 2016, 2017, 2018 新特性之必读篇

Redis7.0新特性

Redis7.0新特性