C++实现数据库连接池

Posted 伍树明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++实现数据库连接池相关的知识,希望对你有一定的参考价值。

一、关键技术点

使用C++语言级别的开发。

mysql数据库编程、线程安全的懒汉单例模式、queue、C++11多线程编程、线程互斥、线程同步通信、生产者-消费者模型、基于CAS的原子整形、智能指针shared_ptr、lambda表达式等。

二、项目背景

为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务端添加缓存服务器缓存常见的数据之外(例如redis),还可以增加连接池来提高SQL的访问效率,在高并发情况下,大量的TCP 三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所带来的消耗十分明显,连接池的主要功能就是优化这些性能损耗。

三、连接池功能点介绍

连接池一般包含了数据库连接所用的ip地址、port端口号、用户名和密码以及其它的性能参数,例如初
始连接量,最大连接量,最大空闲时间、连接超时时间等。
初始连接量:初始创建这么多数量的连接,当应用发起MySQL连接请求时直接从池中获取一个可用的连接,使用完后不断开连接,而是将connection再归还给连接池。
最大连接量:当并发访问多时,初始池可能不够用,如果总数没有达到最大连接量就创建新的连接。当连接使用完后放回池中。
最大空闲时间:连接队列的长度可能超出初始连接量,在指定的最大空闲时间内没有被使用的连接将被回收。
连接超时时间:当MySQL的并发请求量过大,连接池中的数量已经达到了最大值,没有空闲的连接可供使用。再超过“连接超时时间”仍无法获取连接的话,获取连接失败。

四、功能实现设计

ConnectionPool.cppConnectionPool.h:连接池代码实现
Connection.cppConnection.h:数据库操作代码、增删改查代码实现

ConnectionPool

4.1 线程安全的懒汉单例模式

将构造函数私有化
将获取池的方法定义为静态

static ConnectionPool* getConnectionPool();

实现方式如下

//线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()

	static ConnectionPool pool; //lock和unlock
	return &pool;

构造函数一定一定要实现!!!

4.2 连接池的初始化配置

  1. 创建初始连接
  2. 启动一个新的线程, 作为连接的生产者
  3. 启动一个新的线程, 作为连接的生产者
//连接池的构造
ConnectionPool::ConnectionPool()

	//加载配置项
	if (!loadConfigFile())
	
		return;
	

	//创建初始数量的连接
	for (int i = 0; i < _initSize; i++)
	
		Connection* p = new Connection();
		p->connect(_ip, _port, _username, _passward, _dbname);
		p->refreshAliveTime(); //刷新下开始空闲的起始时间
		_connectionQue.push(p);
		_connectionCnt++;
	

	//启动一个新的线程, 作为连接的生产者 linux thread => pthread_create
	thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
	//守护线程
	produce.detach();

	//启动一个新的线程, 扫描回收
	thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
	scanner.detach();

4.3 生产者线程

生产者消费者模型

生产者线程的工作如下:

  1. 先抢到锁,如果队列不为空,就说明不需要我去生产,然后我将锁释放掉,接口会将线程放在某个等待队列上并阻塞该线程直到某个事件到来,当内核监测到notify_*发布的事件后便会去唤醒相应等待队列上的等待线程,此时cv.wait内部会将mutex重新lock
  2. 如果连接数量没有达到上限,继续创建新的连接
  3. 通知消费者可以消费了,唤醒获取对象锁的线程

注意点如下:

  1. wait()既释放锁又加锁
  2. notify_* 并没有释放锁,出右括号释放锁

生产者代码如下:

//运行在独立的线程中,专门负责生产新连接
void ConnectionPool::produceConnectionTask()

	for (;;)
	
		unique_lock<mutex> lock(_queueMutex);
		while (!_connectionQue.empty())
		
			cv.wait(lock);  //队列不空, 此处生产线程进入等待状态
			//当调用对象的wait()方法时会释放获取的该对象的锁。
			//消费者消费完了告知生产者没有连接了,去生产
			//生产者发现确实没有了, 进行下一步  死循环里面套wait():防止假唤醒
			/*
			wait接口会将线程放在某个等待队列上并阻塞该线程直到某个事件到来,
			而notify_*接口则是发布事件,当内核监测到相应的事件后便会去唤醒
			相应等待队列上的等待线程,此时cv.wait内部会将mutex重新lock。
			*/
		

		//连接数量没有到达上限, 继续创建新的连接
		if (_connectionCnt < _maxSize)
		
			Connection* p = new Connection();
			p->connect(_ip, _port, _username, _passward, _dbname);
			p->refreshAliveTime();
			_connectionQue.push(p);
			_connectionCnt++;
		

		//通知消费者线程, 可以消费连接了
		cv.notify_all();
		//唤醒其他等待获取对象锁的线程
		//出右括号释放锁
	

4.4 消费者(给外部提供接口, 从连接池中获取一个可用的空闲连接)

  1. 抢到锁
  2. 只要队列为空,就等待_connectionTimeOut时间,如果超时了队列仍为空,说明获取空闲连接超时了,返回nullptr。将队列判空写为循环是有可能没超时被唤醒但又没抢到资源。
  3. 最后一个消费完的通知生产者生产。

注意点:
使用shared_ptr智能指针需要自定义析构函数。因为shared_ptr智能指针析构时会把connection资源直接delete掉,相当于关闭了连接。所以要将connection直接归还给queue。

代码如下:

//给外部提供接口, 从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()

	unique_lock<mutex> lock(_queueMutex);
	while (_connectionQue.empty())
	
		//不要写sleep, sleep是直接睡这么长时间
		//wait_for: 时间内收到通知 或 超时
		if(cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeOut)))
		
			if (_connectionQue.empty())
			
				LOG("获取空闲连接超时了...获取连接失败!");
				return nullptr;
			
		
	

	/*
	shared_ptr智能指针析构时会把connection资源直接delete掉, 相当于
	调用connection的析构函数,connection就被close掉了
	这里需要自定义shared_ptr的释放资源方式, 把connection直接归还到queue中
	*/
	shared_ptr<Connection> sp(_connectionQue.front(),
		[&](Connection *pcon) 
			//要考虑队列的线程安全  这里的抢锁发生在出作用域析构,不是现在
			unique_lock<mutex> lock(_queueMutex);
			pcon->refreshAliveTime();
			_connectionQue.push(pcon);
		
	);
	_connectionQue.pop();
	//谁消费了队列的最后一个connection, 谁负责通知一下生产者
	if (_connectionQue.empty())
	
		cv.notify_all(); 
	
	return sp;

4.5 最大空闲时间 —— 扫描回收线程

因为是queue,先进先出,所以如果队头没有超时,其他的也没有。

代码如下:

//扫描超过maxIdleTime时间的空闲连接, 进行对应的连接回收
void ConnectionPool::scannerConnectionTask()

	for (;;)
	
		//通过sleep模拟定时效果
		this_thread::sleep_for(chrono::seconds(_maxIdleTime));

		//扫描整个队列, 释放多余的连接
		unique_lock<mutex> lock(_queueMutex);
		while (_connectionCnt > _initSize)
		
			Connection* p = _connectionQue.front();
			if (p->getAliveTime() >= _maxIdleTime * 1000)
			
				_connectionQue.pop();
				_connectionCnt--;
				delete p;  //释放连接 调用~Connection()
			
			else
			
				break; //队头的连接没有超过_maxIdleTime, 其他的肯定没有
			
		
	

Connection

单条连接有以下的要求:
ip、端口号、用户名、登录密码、数据库名称、更新操作、查询操作、刷新连接的起始点、返回存活的时间

数据库的操作直接用MySQL官方封装好的库即可。

五、压力测试

数据量未使用连接池花费时间使用连接池花费时间
1000单线程:5694ms 四线程:2018ms单线程:2781ms 四线程:1235ms
5000单线程:28692ms 四线程:8740ms单线程:13355ms 四线程:5692ms
10000单线程:57895ms 四线程:17330ms单线程:26741ms 四线程:10840ms

六、整体代码

可以访问我的 github 数据库连接池

七、C++调用MySQL

MySQL数据库编程直接采用oracle公司提供的MySQL C/C++客户端开发包,在VS上需要进行相
应的头文件和库文件的配置,如下:
1.右键项目 - C/C++ - 常规 - 附加包含目录,填写mysql.h头文件的路径
2.右键项目 - 链接器 - 常规 - 附加库目录,填写libmysql.lib的路径
3.右键项目 - 链接器 - 输入 - 附加依赖项,填写libmysql.lib库的名字
4.把libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下

MySQL数据库C++代码封装如下:

#include <mysql.h>
#include <string>
using namespace std;
#include "public.h"
// 数据库操作类
class MySQL

public:
	// 初始化数据库连接
	MySQL()
	
		_conn = mysql_init(nullptr);
	
	// 释放数据库连接资源
	~MySQL()
	
		if (_conn != nullptr)
			mysql_close(_conn);
	
	// 连接数据库
	bool connect(string ip, unsigned short port, string user, string password,
		string dbname)
	
		MYSQL* p = mysql_real_connect(_conn, ip.c_str(), user.c_str(),
			password.c_str(), dbname.c_str(), port, nullptr, 0);
		return p != nullptr;
	
	// 更新操作 insert、delete、update
	bool update(string sql)
	
		if (mysql_query(_conn, sql.c_str()))
		
			LOG("更新失败:" + sql);
			return false;
		
		return true;
	
	// 查询操作 select
	MYSQL_RES* query(string sql)
	
		if (mysql_query(_conn, sql.c_str()))
		
			LOG("查询失败:" + sql);
			return nullptr;
		
		return mysql_use_result(_conn);
	
private:
	MYSQL* _conn; // 表示和MySQL Server的一条连接
;

C++基础知识线程池内存池连接池的实现

//=============threadpool.c==========
#include "threadpool.h"
 
#include <stdio.h>
pthread_mutex_t ThreadPool::mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ThreadPool::cond = PTHREAD_COND_INITIALIZER;
std::vector<Task*> ThreadPool::tasks;
 
ThreadPool::ThreadPool(int threads) 
    this->threads = threads;
    this->shutdown = false;
    this->counter = 0;
    this->idle = 0;

 
void* ThreadPool::process(void *arg) 
    struct timespec abstime;
    int timeout;
    ThreadPool *pool = static_cast<ThreadPool*>(arg);
    printf("thread 0x%0x is starting\n", (int)pthread_self());
    while (true) 
        pthread_mutex_lock(&mutex);
        pool->idle++;
        timeout = 0;
        while (tasks.empty() && !pool->shutdown) 
            clock_gettime(CLOCK_REALTIME, &abstime);
            abstime.tv_sec += 2;
            int status = pthread_cond_timedwait(&cond, &mutex, &abstime);
            if (status == ETIMEDOUT) 
                printf("thread 0x%0x is waiting time out\n", (int)pthread_self());
                timeout = -1;
                break;
            
        
        pool->idle--;
        if (!tasks.empty()) 
            Task *task = tasks.back();
            tasks.pop_back();  // 执行任务
            pthread_mutex_unlock(&mutex);
            task->run(task->arg);
            delete task;
            pthread_mutex_lock(&mutex);
        
        if (pool->shutdown && tasks.empty()) 
            pool->counter--;
            if (pool->counter == 0) 
                pthread_cond_signal(&cond);
            
            pthread_mutex_unlock(&mutex);
            break;
        
        if (timeout == -1 && tasks.empty()) 
        // if it is timeout,the thread will end(why not keep it? because it means no more works are added now)
            pool->counter--;
            pthread_mutex_unlock(&mutex);
            break; 
        
        pthread_mutex_unlock(&mutex);
    
    printf("thread 0x%0x is exiting\n", (int)pthread_self());
    return NULL;

 
void ThreadPool::add_task(void* (*run)(void *arg), void *arg) 
    pthread_mutex_lock(&mutex);  // 生成新的任务
    Task *task = new Task;
    task->run = run;
    task->arg = arg;
    tasks.push_back(task);  // 如果有等待线程,则唤醒一个
    if (idle > 0) 
        pthread_cond_signal(&cond);
     else if (counter < threads) 
        pthread_t tid;
        pthread_create(&tid, NULL, ThreadPool::process, (void*)this);
        this->counter++;
    
    pthread_mutex_unlock(&mutex);

 
void ThreadPool::threadpool_destroy() 
    if (shutdown) 
        return;
    
    pthread_mutex_lock(&mutex);
    shutdown = true;
    if (counter > 0)   // 唤醒空闲线程退出
        if (idle > 0) 
            pthread_cond_broadcast(&cond);
          // 如果线程处于执行阶段,等待任务执行完毕之后退出
        while (counter > 0) 
            pthread_cond_wait(&cond, &mutex);
        
    
    pthread_mutex_unlock(&mutex);
    pthread_mutex_destroy(&mutex);


//=============threadpool.h==========

#ifndef _THREADPOOL_H
#define _THREADPOOL_H
 
#include <vector>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <iostream>
 
struct Task 
    void* (*run)(void *arg);
    void *arg;
;
 
class ThreadPool 
private:
    int threads;
    static std::vector<Task*> tasks;
    static pthread_mutex_t mutex;
    static pthread_cond_t cond;
    static void *process(void *arg);
    bool shutdown;
    int counter;
    int idle;
public:
    ThreadPool(int threads_);
    ~ThreadPool() ;
    void add_task(void* (*task)(void *arg), void *arg);
    void threadpool_destroy();
;
 
#endif /*_THREADPOOL_H*/
//=============main.c==========
#include "threadpool.h"
#include <iostream>
#include <vector>
#include <stdio.h>
#include <unistd.h>

void* func1(void* arg) 
    int cnt = 0;
    while (1) 
        int tid = pthread_self();
        printf("[func1]0x%0x thread, cnt = %d\n", tid, cnt);
        cnt += 1;
        sleep(2);
    


void* func2(void* arg) 
    int cnt = 1;
    while (1) 
        int tid = pthread_self();
        printf("[func2]0x%0x thread, cnt = %d\n", tid, cnt);
        cnt *= 3;
        sleep(3);
    


int main() 
    ThreadPool pool(3);
    //Task task = func1, NULL;
    for (int i = 0; i < 10; i++) 
        pool.add_task(&func1, NULL);
    
    pool.add_task(&func2, NULL);
        while (1) 
        
    return 0;

 

以上是关于C++实现数据库连接池的主要内容,如果未能解决你的问题,请参考以下文章

Linux + C + Epoll实现高并发服务器(线程池 + 数据库连接池)

C++ LinuxWebServer项目数据库连接池与注册登录

C++线程池

C++线程池

一个Linux下C线程池的实现

C++线程池ThreadPool实现解析