一、 mysql中的动态插件
最初想到这个问题是在学习mysql半同步复制相关问题的时候,为何在mysql运行时install半同步插件并开启后就能起到作用,他是如何让事务停下来等待的。安装插件的时候加载的是一个.so动态库,这个库里是插件的实现。那么MySQL源码中应该需要对应的框架去以调用这些插件,这个框架是如何运作的呢?
二、从源码中寻找答案
首先,我们需要要知道插件从何处调用的。以半同步插件为例:众所周知,开启半同步插件后通过设置rpl_semi_sync_master_wait_point 的值可以决定mysql主库在何处等待从库的ack,为after_sync则在binlog刷新到磁盘后,为after_commit则是在事务提交以后。那么在mysql事务刷新到binlog之后和事务提交之后应该需要各有一个地方调用半同步插件的接口进行等待。
首先,进入mysql的事务提交函数,当开启binlog的时候mysql由MYSQL_BIN_LOG::ordered_commit进行提交操作。为了保证不丢数据,我们通常使用after_sync同步点,如果要寻找在这个同步点半同步插件的调用,那么就需要在这个函数中刷新binlog之后与事务正式提交之前寻找。事实上在刷新binlog后,可以找到这样一个函数调用:
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{
···
if (flush_error == 0 && sync_error == 0)
sync_error= call_after_sync_hook(commit_queue);
···
}
似乎看其函数名字就和半同步有关,那么进入这个函数中:
static inline int call_after_sync_hook(THD *queue_head)
{
const char *log_file= NULL;
my_off_t pos= 0;
if (NO_HOOK(binlog_storage))
return 0;
DBUG_ASSERT(queue_head != NULL);
for (THD *thd= queue_head; thd != NULL; thd= thd->next_to_commit)
if (likely(thd->commit_error == THD::CE_NONE))
thd->get_trans_fixed_pos(&log_file, &pos);
if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)))
{
sql_print_error("Failed to run ‘after_sync‘ hooks");
return ER_ERROR_ON_WRITE;
}
return 0;
}
这个函数接受一队待提交的线程的队列作为参数,让这些线程等待从库的ack,那么具体是那个调用呢?就是这个RUN_HOOK:
RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos))
查看RUN_HOOK的源码发现这是一个宏:
#define RUN_HOOK(group, hook, args) (group ##_delegate->is_empty() ? 0 : group ##_delegate->hook args)
将上面的调用按照这个宏的定义进行文本替换后得到实际上的调用为:
binlog_storage_delegate->is_empty() ? 0 : binlog_storage_delegate->after_sync(queue_head, log_file, pos)
也就是说实际上是调用了binlog_storage_delegate指向的对象的函数来实现半同步的功能,如果is_empty()的返回值为真则不做任何操作,否则调用after_sync函数来进行等待。查找binlog_storage_delegate这个指针的定义,可以在rpl_handler.cc中找到它,这是一个全局指针变量,类型为Binlog_storage_delegate *:
//rpl_handler.cc
Binlog_storage_delegate *binlog_storage_delegate;
进一步查找这个变量在何处初始化可以找到delegates_init函数:
int delegates_init()
{
······
transaction_delegate= new (place_trans_mem) Trans_delegate;
if (!transaction_delegate->is_inited())
{
sql_print_error("Initialization of transaction delegates failed. "
"Please report a bug.");
return 1;
}
binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
if (!binlog_storage_delegate->is_inited())
{
sql_print_error("Initialization binlog storage delegates failed. "
"Please report a bug.");
return 1;
}
server_state_delegate= new (place_state_mem) Server_state_delegate;
······
}
这个函数专门初始化各种xxx_delegate类型的指针,为他们分配对象,实际上继续追踪调用栈可以发现如下函数调用
mysqld_main()
|
|
init_server_components()
|
|
delegates_init()
所以可以看出,实际上在mysql启动的时候这个这些指针就已经得到了初始化。接下来,依然以半同步插件的同步等待点为例子来看看到底是如何实现调用的。
三、 插件中的观察者模式
依然以上面等待从库ack的调用为例,已经知道了实际上是以Binlog_storage_delegate类来实现的,那么接下来看Binlog_storage_delegate的代码:
class Binlog_storage_delegate
:public Delegate {
public:
Binlog_storage_delegate()
: Delegate(
#ifdef HAVE_PSI_INTERFACE
key_rwlock_Binlog_storage_delegate_lock
#endif
)
{}
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos);
int after_sync(THD *thd, const char *log_file,
my_off_t log_pos);
};
这个类中有一个after_sync函数,也就是说我们上面的RUN_HOOK实际上调用的是这个函数。Binlog_storage_delegate继承了一个基类Delegate:
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
······
}
int remove_observer(void *observer, st_plugin_int *plugin)
{
···
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
DBUG_PRINT("debug", ("is_empty: %d", observer_info_list.is_empty()));
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return mysql_rwlock_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return mysql_rwlock_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return mysql_rwlock_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate(
#ifdef HAVE_PSI_INTERFACE
PSI_rwlock_key key
#endif
)
{
inited= FALSE;
#ifdef HAVE_PSI_INTERFACE
if (mysql_rwlock_init(key, &lock))
return;
#else
if (mysql_rwlock_init(0, &lock))
return;
#endif
init_sql_alloc(key_memory_delegate, &memroot, 1024, 0);
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
mysql_rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
mysql_rwlock_t lock;
MEM_ROOT memroot;
bool inited;
};
上面略去了一些函数的细节,先来看里面的主要成员。observer_info_list是一个链表,里面存储了Observer_info类型的成员,同时还有一把读写锁lock进行保护。其他成员函数为对这个链表的操作函数(例如add_observer为添加成员)以及加锁解锁的函数。注意到其中有个is_empty函数,这个函数用来判断链表是否为空。在RUN_HOOK中调用的is_empty就是这个函数。
接下来继续回到子类Binlog_storage_delegate中,我们已经知道after_sync的等待实际上就是靠Binlog_storage_delegate的after_sync函数实现的。接下来进入这个函数的源代码,来看这个函数如何实现的
int Binlog_storage_delegate::after_sync(THD *thd,
const char *log_file,
my_off_t log_pos)
{
DBUG_ENTER("Binlog_storage_delegate::after_sync");
DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
log_file, (ulonglong) log_pos));
Binlog_storage_param param;
param.server_id= thd->server_id;
DBUG_ASSERT(log_pos != 0);
int ret= 0;
FOREACH_OBSERVER(ret, after_sync, thd, (¶m, log_file, log_pos));
DEBUG_SYNC(thd, "after_call_after_sync_observer");
DBUG_RETURN(ret);
}
除去各种调试信息外,这个函数实际上只是调用了FOREACH_OBSERVER这个宏:
FOREACH_OBSERVER(ret, after_sync, thd, (¶m, log_file, log_pos));
继续查看这个宏的定义:
#define FOREACH_OBSERVER(r, f, thd, args) /*
Use a struct to make sure that they are allocated adjacent, check
delete_dynamic().
*/ Prealloced_array<plugin_ref, 8> plugins(PSI_NOT_INSTRUMENTED); read_lock(); Observer_info_iterator iter= observer_info_iter(); Observer_info *info= iter++; for (; info; info= iter++) { plugin_ref plugin= my_plugin_lock(0, &info->plugin); if (!plugin) { /* plugin is not intialized or deleted, this is not an error */ r= 0; break; } plugins.push_back(plugin); if (((Observer *)info->observer)->f && ((Observer *)info->observer)->f args) { r= 1; sql_print_error("Run function ‘" #f "‘ in plugin ‘%s‘ failed", info->plugin_int->name.str); break; } } unlock(); /*
Unlock plugins should be done after we released the Delegate lock
to avoid possible deadlock when this is the last user of the
plugin, and when we unlock the plugin, it will try to
deinitialize the plugin, which will try to lock the Delegate in
order to remove the observers.
*/ if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
这个宏比较长,我们先忽略其他对于插件管理的代码,注意力放在遍历这个链表的for循环中。我们将这个循环进行简化:
Observer_info_iterator iter= observer_info_iter(); Observer_info *info= iter++; for (; info; info= iter++) {
······ if (((Observer *)info->observer)->f && ((Observer *)info->observer)->f args) { r= 1; sql_print_error("Run function ‘" #f "‘ in plugin ‘%s‘ failed", info->plugin_int->name.str); break; } } ······
从中可以看出,这个for循环遍历整个链表,取出每个元素的指针并转换为Observer 类型的指针,再以传入的args作为参数调用其名字为f成员函数。我们可以依然将宏进行文本替换,以Binlog_storage_delegate::after_sync中调用的FOREACH_OBSERVER为例,外部的调用为:
FOREACH_OBSERVER(ret, after_sync, thd, (¶m, log_file, log_pos));
则内部for循环中,使用每个元素的指针封装出的调用可以翻译为:
if (((Observer *)info->observer)->after_sync &&
((Observer *)info->observer)->after_sync ((¶m, log_file, log_pos)))
假设info指针指向的对象的after_sync函数成员存在,则以args作为这个函数的参数调用它。那么也就是说Binlog_storage_delegate::after_sync的作用实际上是用来挨个调用它链表中保存的各个Observer对象的after_sync的成员函数。而半同步插件中等待ack的动作实际上再进一步由observer对象实现的,在Binlog_storage_delegate类中可以看到它的定义:
typedef Binlog_storage_observer Observer;
这里这样定义的原因是为了FOREACH_OBSERVER这个宏统一使用Observer 这个名字。那么实际上就是调用的Binlog_storage_observer的成员函数了。继续查看Binlog_storage_observer的定义:
typedef struct Binlog_storage_observer {
uint32 len;
/**
This callback is called after binlog has been flushed
This callback is called after cached events have been flushed to
binary log file but not yet synced.
@param param Observer common parameter
@param log_file Binlog file name been updated
@param log_pos Binlog position after update
@retval 0 Sucess
@retval 1 Failure
*/
int (*after_flush)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos);
int (*after_sync)(Binlog_storage_param *param,
const char *log_file, my_off_t log_pos);
} Binlog_storage_observer;
/**
Binlog storage observer parameters
*/
typedef struct Binlog_storage_param {
uint32 server_id;
} Binlog_storage_param;
从上面可以看出,Binlog_storage_observer其实就是一个含有两个函数指针的class,最终实现等待从库ack的是这里面的函数指针。
那么问题接下来就变成了到底Binlog_storage_delegate中保存Binlog_storage_observer实例是从哪里来的?Binlog_storage_observer中的函数指针具体又是指向哪个函数呢?
回到Binlog_storage_delegate类中,这个类有一个向链表中添加对象的add_observer函数,要向其中添加则应该会调用这个函数。查找对它的调用可以找到添加Binlog_storage_observer到链表中的函数register_binlog_storage_observer:
//rpl_handler
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
DBUG_ENTER("register_binlog_storage_observer");
int result= binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
DBUG_RETURN(result);
}
而向其中添加的observer则又是register_binlog_storage_observer的参数,那么顺着调用链继续向上找,最终可以在半同步插件的源码中找到semi_sync_master_plugin_init函数:
static int semi_sync_master_plugin_init(void *p)
{
#ifdef HAVE_PSI_INTERFACE
init_semisync_psi_keys();
#endif
my_create_thread_local_key(&THR_RPL_SEMI_SYNC_DUMP, NULL);
if (repl_semisync.initObject())
return 1;
if (ack_receiver.init())
return 1;
if (register_trans_observer(&trans_observer, p))
return 1;
if (register_binlog_storage_observer(&storage_observer, p))
return 1;
if (register_binlog_transmit_observer(&transmit_observer, p))
return 1;
return 0;
}
这个函数注册半同步插件的各个xxxx_obeserver,将其添加到xxxx_delegate的链表中以供调用。依然关注我们之前的Binlog_storage_observe,上文提到的register_binlog_storage_observer函数参数来自于一个叫storage_observer的变量,这个变量就是Binlog_storage_observe的一个实例。我们继续追踪最后可以发现这是一个在半同步复制插件源码中定义的全局变量:
//semisync_master_plugin.cc
Binlog_storage_observer storage_observer = {
sizeof(Binlog_storage_observer), // len
repl_semi_report_binlog_update, // report_update
repl_semi_report_binlog_sync, // after_sync
};
after_sync指针指向的repl_semi_report_binlog_sync就是半同步插件关于等待从库ack的具体实现了,这里不在具体深入半同步的实现。
跟着源码追了了一大圈,这里来做一个回顾。其实从插件被调用的过程(以及叫xxxx_observer的名字)就可以看出这是一个典型的观察者模式,RUN_HOOK调用的时候,如果对应的xxxx_delegate中有对应的xxxx_observer观察者,就挨个调用这些观察者的回调函数,如果没有就什么都不做。
最后,我们以半同步插件等待从库ack的功能总结一个插件调用过程中的时序图: