如何在多线程环境中使用嵌入式 MySQL?
Posted
技术标签:
【中文标题】如何在多线程环境中使用嵌入式 MySQL?【英文标题】:How to use MySQL Embedded in a multithreaded environment? 【发布时间】:2011-12-04 19:57:20 【问题描述】:我正在编写一个使用 mysqle 作为嵌入式后端的程序。数据库库归一个名为“域”的对象所有。此 Domain 对象在主线程中运行。
程序启动另一个运行 XML-RPC 服务器的线程(boost::thread 和 xmlrpc_c::serverAbyss)。它链接到域对象。
当 XML-RPC 服务器使域对象执行 SQL 查询时,程序崩溃:
Program received signal: “EXC_BAD_ACCESS”.
[Switching to process 73191]
[Switching to process 73191]
Xcode could not locate source file: regex.cpp (line: 74)
当主线程调用执行 SQL 查询的域对象的方法时,程序仍然运行。
/*
* Ports listening
*
* - create a Rpc_Server object
* - create a dedicated thread
*/
Rpc_Server server(&domain, &conf_params, &router);
boost::thread server_thread(boost::bind(&Rpc_Server::run, &server)); // This thread makes the server crash
/*
* Domain routine
*
* - Check for ready jobs every minute
*/
while (1)
v_jobs jobs = domain.get_ready_jobs(conf_params.get_param("node_name")); // This method does NOT make the server crash
sleep(60);
Domain 对象的方法和 Database 对象的方法都锁定一个互斥体以避免多重访问。
bool Mysql::execute(const std::string* query)
MYSQL_RES* res;
MYSQL_ROW row;
if ( query == NULL )
return false;
this->updates_mutex.lock();
std::cout << query->c_str() << std::endl;
if ( mysql_query(this->mysql, query->c_str()) != 0 )
std::cerr << query << std::endl << mysql_error(this->mysql);
UNLOCK_MUTEX;
return false;
res = mysql_store_result(this->mysql);
if (res)
while ( ( row = mysql_fetch_row(res) ) )
for ( uint i=0 ; i < mysql_num_fields(res) ; i++ )
std::cout << row[i] << std::endl;
else
if ( mysql_field_count(this->mysql) != 0 )
std::cerr << "Erreur : " << mysql_error(this->mysql) << std::endl;
mysql_free_result(res);
this->updates_mutex.unlock();
return false;
mysql_free_result(res);
this->updates_mutex.unlock();
return true;
bool Domain::add_node(const std::string* running_node, const std::string* n, const int* w)
std::string query;
this->updates_mutex.lock();
query = "START TRANSACTION;";
if ( this->database.execute(&query) == false )
this->updates_mutex.unlock();
return false;
query = "REPLACE INTO node (node_name,node_weight) VALUES ('";
query += n->c_str();
query += "','";
query += boost::lexical_cast<std::string>(*w);
query += "');";
if ( this->database.execute(&query) == false )
query = "ROLLBACK;";
this->database.execute(&query);
this->updates_mutex.unlock();
return false;
query = "COMMIT;"
if ( this->database.execute(&query) == false )
this->updates_mutex.unlock();
return false;
else
this->updates_mutex.unlock();
return true;
MySQLe 在那里创建:
bool Mysql::prepare(const std::string* node_name, const std::string* db_skeleton)
static char* server_args[] = "this_program","--datadir=.";
static char* server_groups[] = "embedded","server","this_program_SERVER",(char *)NULL;
std::string query("CREATE DATABASE IF NOT EXISTS ");
// DB init
if ( mysql_library_init(sizeof(server_args) / sizeof(char *), server_args, server_groups) )
std::cerr << "could not initialize MySQL library" << std::endl;
std::cout << "mysql init..." << std::endl;
if ( (this->mysql = mysql_init(NULL)) == NULL )
std::cerr << mysql_error(this->mysql) << std::endl;
if ( ! mysql_thread_safe() )
std::cerr << "MySQL is NOT theadsafe !" << std::endl;
return false;
mysql_options(this->mysql, MYSQL_READ_DEFAULT_GROUP, "embedded");
mysql_options(this->mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql_real_connect(this->mysql, NULL, NULL, NULL, NULL, 0, NULL, 0);
// Creates the schema
query += this->translate_into_db(node_name);
query += ";";
if ( this->execute(&query) == false )
return false;
// Creates the schema
query = "CREATE SCHEMA IF NOT EXISTS ";
query += this->translate_into_db(node_name);
query += " DEFAULT CHARACTER SET latin1;";
this->execute(&query);
// Uses it
query = "USE " + this->translate_into_db(node_name) + ";";
this->execute(&query);
// Loads the skeleton from file
return this->load_file(db_skeleton->c_str());
我是不是哪里错了? 你有例子给我看吗?
【问题讨论】:
【参考方案1】:我找到了解决问题的方法。每个线程都需要初始化 MySQL 环境。也就是执行一些mysql_*函数。
这里是修改/新方法:
bool Mysql::atomic_execute(const std::string* query)
MYSQL_RES* res;
MYSQL_ROW row;
boost::regex empty_string("^\\s+$", boost::regex::perl);
if ( query == NULL )
return false;
if ( query->empty() == true or boost::regex_match(*query, empty_string) == true )
std::cerr << "Error : query is empty !" << std::endl;
return false;
this->updates_mutex.lock();
if ( mysql_query(this->mysql, query->c_str()) != 0 )
std::cerr << query << std::endl << mysql_error(this->mysql);
this->updates_mutex.unlock();;
return false;
res = mysql_store_result(this->mysql);
if (res)
while ( ( row = mysql_fetch_row(res) ) )
for ( uint i=0 ; i < mysql_num_fields(res) ; i++ )
std::cout << row[i] << std::endl;
else
if ( mysql_field_count(this->mysql) != 0 )
std::cerr << "Erreur : " << mysql_error(this->mysql) << std::endl;
mysql_free_result(res);
this->updates_mutex.unlock();
return false;
mysql_free_result(res);
this->updates_mutex.unlock();
return true;
bool Mysql::standalone_execute(const v_queries* queries)
MYSQL* local_mysql = this->init();
std::string query = "START TRANSACTION;";
if ( this->atomic_execute(&query) == false )
mysql_close(local_mysql);
return false;
BOOST_FOREACH(std::string q, *queries)
std::cout << q.c_str() << std::endl;
if ( this->atomic_execute(&q) == false )
query = "ROLLBACK";
this->atomic_execute(&query);
mysql_close(local_mysql);
return false;
query = "COMMIT";
if ( this->atomic_execute(&query) == false )
mysql_close(local_mysql);
return false;
mysql_close(local_mysql);
return true;
MYSQL* Mysql::init()
MYSQL* local_mysql;
local_mysql = mysql_init(this->mysql);
mysql_options(this->mysql, MYSQL_READ_DEFAULT_GROUP, "embedded");
mysql_options(this->mysql, MYSQL_OPT_USE_EMBEDDED_CONNECTION, NULL);
mysql_real_connect(local_mysql, NULL, NULL, NULL, NULL, 0, NULL, 0);
return local_mysql;
atomic_execute 方法用于向服务器发送单个查询。
standalone_execute 方法初始化一个连接和一个事务,然后它使用 atomic_execute 将整个查询发送到服务器。
我不知道在 COMMIT 失败的情况下 ROLLBACK 是否有用...
代码可能需要一些改进,但它可以工作。
【讨论】:
以上是关于如何在多线程环境中使用嵌入式 MySQL?的主要内容,如果未能解决你的问题,请参考以下文章