从0开始搭建编程框架——插件
Posted breaksoftware
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0开始搭建编程框架——插件相关的知识,希望对你有一定的参考价值。
我将插件设计为两种类型。一种是框架自身携带的插件,用于增强其基础能力。一种是用户自定义插件,用于完成业务。本文将分别介绍在peleus框架下,这两种插件该怎么写。(转载请指明出于breaksoftware的csdn博客)
本文谈下框架自身携带的插件怎么写。我们以操作mysql为例。
第一步把mysqlclient开发环境给准备好。
Wget https://downloads.mysql.com/archives/get/file/mysql-connector-c-6.1.11-src.tar.gz .
tar -zxvf mysql-connector-c-6.1.11-src.tar.gz
cd mysql-connector-c-6.1.11-src
cmake -G "Unix Makefiles"
cmake -G "Unix Makefiles" -DCMAKE_INSTALL_PREFIX=/usr
make
sudo make install
由于实际安装到系统中的头文件不全,我们还要从https://github.com/mysql/mysql-server/blob/5.6/include/hash.h把hash.h文件放到/usr/include下。
因为mysql-connector-c是C语言编码,为了统一peleus下语言栈,我们将其封装成一个类
#include "mysql_connection.h"
#include <butil/logging.h>
#include <errmsg.h>
#include "mysql_error.h"
namespace peleus
namespace plugins
namespace mysql_conns
using peleus::plugins::configure::mysql_connection;
MysqlConnection::MysqlConnection()
_conn = NULL;
MysqlConnection::~MysqlConnection()
_disconnect();
void MysqlConnection::init(const mysql_connection& conf)
_conf.CopyFrom(conf);
int MysqlConnection::mysql_real_query(const std::string& sql)
int ret = 0;
if (!_conn)
ret = _reconnct();
if (ret)
return ret;
LOG(INFO) << "query sql:" << sql.c_str();
const int max_retry = 2;
for (auto i = 0; i < max_retry; i++)
ret = ::mysql_real_query(_conn, sql.c_str(), sql.length());
if (0 == ret)
break;
else if (i < max_retry - 1)
LOG(INFO) << "mysql connection is lost.retry " << i;
int ret_re = _reconnct();
if (ret_re)
LOG(WARNING) << "mysql connection is lost.retry error" << ret_re;
return ret_re;
else if (ret != CR_SERVER_LOST && ret != CR_SERVER_GONE_ERROR)
LOG(ERROR) << "query error:" << ret << " sql:" << sql.c_str();
return ret;
return ret;
MYSQL_RES* MysqlConnection::mysql_store_result()
if (_conn)
return ::mysql_store_result(_conn);
else
return NULL;
int MysqlConnection::_reconnct()
if (_conn)
_disconnect();
return _connect();
int MysqlConnection::_connect()
_conn = mysql_init(NULL);
if (!_conn)
return CR_OUT_OF_MEMORY;
unsigned long time_out = _conf.read_timeout();
unsigned long conn_out = _conf.conn_timeout();
mysql_options(_conn, MYSQL_OPT_READ_TIMEOUT, (char*)&time_out);
mysql_options(_conn, MYSQL_OPT_CONNECT_TIMEOUT, (char*)&conn_out);
if (!mysql_real_connect(_conn,
_conf.host().c_str(),
_conf.user().c_str(),
_conf.password().c_str(),
_conf.dbname().c_str(),
_conf.port(),
NULL, 0))
_disconnect();
return CR_SERVER_GONE_ERROR;
int ret = mysql_set_character_set(_conn, _conf.character_set().c_str());
if (ret)
_disconnect();
return ret;
void MysqlConnection::_disconnect()
if (_conn)
mysql_close(_conn);
_conn = NULL;
MysqlConnection类主要对外提供连接和查询数据库的功能,并支持断开重连功能。这是针对一个连接的,我们需要连接池来提高连接的使用率。
MysqlConnectionPool::MysqlConnectionPool()
MysqlConnectionPool::~MysqlConnectionPool()
for (auto it = _unused.begin(); it != _unused.end(); it++)
if (*it)
delete *it;
*it = NULL;
void MysqlConnectionPool::init(const peleus::plugins::
configure::mysql_connection_pool& conf)
_conf.CopyFrom(conf);
int size = _conf.size();
if (size < 0)
_use_pool = false;
return;
_use_pool = true;
pthread_mutex_init(&_mutex, NULL);
pthread_mutex_lock(&_mutex);
for (int i = 0; i< size; i++)
MysqlConnection* conn = new MysqlConnection();
conn->init(_conf.conn_info());
_unused.push_back(conn);
pthread_mutex_unlock(&_mutex);
int MysqlConnectionPool::query(const std::string& sql, MysqlSession& session)
if (_use_pool)
return query_from_pool(sql, session);
return query_once(sql, session);
void MysqlConnectionPool::recovery(MysqlConnection* conn)
if (_use_pool)
pthread_mutex_lock(&_mutex);
_unused.push_back(conn);
pthread_mutex_unlock(&_mutex);
int MysqlConnectionPool::query_once(const std::string& sql, MysqlSession& session)
MysqlConnection conn;
conn.init(_conf.conn_info());
int ret = conn.mysql_real_query(sql);
session._res = conn.mysql_store_result();
return ret;
int MysqlConnectionPool::query_from_pool(const std::string& sql, MysqlSession& session)
MysqlConnection* conn = NULL;
pthread_mutex_lock(&_mutex);
if (!_unused.empty())
conn = _unused.back();
_unused.pop_back();
pthread_mutex_unlock(&_mutex);
if (!conn)
LOG(ERROR) << "need more connection";
return LACK_CONNECTION;
int ret = conn->mysql_real_query(sql);
session._res = conn->mysql_store_result();
session._conn = conn;
session._conn_pool = this;
return ret;
MysqlConnectionPool将空闲连接放在_unused中。当需要做查询操作时,便从_unused中取出一个连接使用。但是连接池里的连接不能交由用户管理,于是需要暴露query_from_pool方法,将连接隐藏起来。
由于业务可能需要连接多个数据库,所以需要做一个针对多个数据库的连接管理器
MysqlConns::MysqlConns()
MysqlConns::~MysqlConns()
for (auto it = _conns.begin(); it != _conns.end(); it++)
if (it->second)
delete it->second;
it->second = NULL;
void MysqlConns::init(const char* conf_floder_path)
TraversalCallback traversal_floder = [this](const char* path) this->init(path);;
TraversalCallback traversal_file = [this](const char* path) this->init_from_file(path);;
TraversalFloder traversal;
traversal.set_callback(traversal_floder, traversal_file, __FILE__);
traversal.init(conf_floder_path);
void MysqlConns::init_from_file(const char* conf_path)
mysql_connection_pool config;
bool suc = peleus::utils::file2conf(conf_path, config);
if (!suc)
LOG(ERROR) << "init " << conf_path << " error";
return;
const std::string& conn_name = config.name();
MysqlConnectionPool* conn_pool = init_conn_pool(config);
if (!conn_pool)
LOG(ERROR) << "init mysql connection pool error:" << conn_name;
return;
_conns[conn_name] = conn_pool;
MysqlConnectionPool* MysqlConns::init_conn_pool(const mysql_connection_pool& conf)
MysqlConnectionPool* conn_pool = new MysqlConnectionPool();
conn_pool->init(conf);
return conn_pool;
int MysqlConns::query(const std::string& conn_name,
const std::string& sql,
MysqlSession& session)
auto it = _conns.find(conn_name);
if (it == _conns.end())
LOG(ERROR) << "cant find mysql connection:" << conn_name.c_str();
return NO_CONNECTION;
if (!it->second)
LOG(ERROR) << "mysql connection pool is null";
return NULL_CONNECTION;
return it->second->query(sql, session);
至此,我们将mysql最基础的操作部分给封装完毕。现在开始写插件
class MysqlVisitorService :
public peleus::plugins::MysqlVisitorService,
public peleus::entrance::Entrance
public:
explicit MysqlVisitorService(const char* name);
public:
virtual void query(::google::protobuf::RpcController* controller,
const ::peleus::plugins::MysqlVisitorServiceRequest* request,
::peleus::plugins::MysqlVisitorServiceResponse* response,
::google::protobuf::Closure* done) override;
public:
virtual void on_init(const char* conf_path) final;
virtual void reset() final;
private:
void parse_res(
MYSQL_RES* res,
::peleus::plugins::MysqlVisitorServiceResponse* response);
void parse_session(
peleus::plugins::mysql_conns::MysqlSession& session,
::peleus::plugins::MysqlVisitorServiceResponse* response);
;
MysqlVisitorService将作为内部服务的一个业务逻辑存在,同时它也是一个入口,所以继承于Entrance接口。MysqlVisitorService是protobuf产生的接口类
syntax="proto2";
option cc_generic_services = true;
package peleus.plugins;
message MysqlVisitorServiceRequest
required string conn_name = 1;
required string command = 2;
;
message MysqlVisitorRow
repeated string column = 1;
;
message MysqlVisitorServiceResponse
required bool suc = 1;
repeated MysqlVisitorRow rows = 2;
;
service MysqlVisitorService
rpc query(MysqlVisitorServiceRequest) returns (MysqlVisitorServiceResponse);
;
MysqlVisitorService的业务代码就是query,其会调用MysqlConns类的方法去查询结果,然后解析和封装返回
MysqlVisitorService::MysqlVisitorService(const char* name) :
peleus::entrance::Entrance(name)
void MysqlVisitorService::query(::google::protobuf::RpcController* controller,
const ::peleus::plugins::MysqlVisitorServiceRequest* request,
::peleus::plugins::MysqlVisitorServiceResponse* response,
::google::protobuf::Closure* done)
ClosureGuard done_guard(done);
const std::string& conn_name = request->conn_name();
const std::string& command = request->command();
MysqlSession session;
int ret = MysqlConns::get_mutable_instance().query(conn_name, command, session);
parse_session(session, response);
response->set_suc(0 == ret ? true : false);
void MysqlVisitorService::parse_session(
MysqlSession& session,
::peleus::plugins::MysqlVisitorServiceResponse* response)
MYSQL_RES* res = session._res;
if (!res)
return;
int row_size = mysql_num_rows(res);
for (int i = 0; i < row_size; i++)
parse_res(res, response);
void MysqlVisitorService::parse_res(
MYSQL_RES* res,
::peleus::plugins::MysqlVisitorServiceResponse* response)
MysqlVisitorRow* row = response->add_rows();
MYSQL_ROW res_row = mysql_fetch_row(res);
if (!row)
LOG(ERROR) << "add_rows return null";
return;
int field_size = mysql_num_fields(res);
for (int i = 0; i < field_size; i++)
row->add_column(std::string(res_row[i]));
最后在插件启动代码中加入注册方法就行了
extern "C"
bool plugins_main(const char*);
;
#define USE(x, y) \\
if (item_conf.name() == x) \\
y::get_mutable_instance().init(item_conf.conf_path().c_str()); \\
#define REG(x, y, z) \\
if (item_conf.name() == x) \\
return register_class<y>(z); \\
bool load_moudle(const plugin_conf& item_conf)
if (!item_conf.on())
return true;
REG("mysql_visitor_service", MysqlVisitorService, "MysqlVisitorService");
return true;
bool plugins_main(const char* conf_path)
plugins_conf config;
bool suc = file2conf(conf_path, config);
if (!suc)
LOG(ERROR) << "conv config error:" << conf_path;
return false;
size_t size = config.conf_size();
for (size_t i = 0; i < size; i++)
const plugin_conf& item_conf = config.conf(i);
if (!load_moudle(item_conf))
break;
return true;
以上是关于从0开始搭建编程框架——插件的主要内容,如果未能解决你的问题,请参考以下文章