数据库连接池

Posted 两片空白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据库连接池相关的知识,希望对你有一定的参考价值。

前言

        mysql数据库是一个C/S模型,通过客户端来访问服务端。底层使用的是TCP协议。

        在连接数据库和断开数据库时,需要进行一下几个步骤:

  1. 建立通信连接的TCP三次握手。
  2. 数据库服务器的连接认证。
  3. 数据库服务器关闭连接时的资源回收。
  4. 断开通信连接的TCP四次挥手。

        当在我们在程序中需要频繁的访问数据库,也就是需要频繁的连接数据库和断开数据库时,会需要频繁的进行上面的操作。在高并发的情况下会消耗很多时间和资源。

        使用连接池可以减少这部分的消耗。指挥在创建连接池时,建立连接,在销毁连接池对象时,断开连接。需要连接数据库时,在连接池中取出连接,不需要使用数据库时,将连接放会连接池即可。

连接池的实现

        1. 技术点

  • 多线程编程
  • 线程同步与互斥。互斥锁和条件变量的使用。
  • 处理时间和日期的chrono库。
  • 智能指针
  • lambda表达式
  • MySql C语言库的使用。
  • 单例模式
  • 生产者消费者模型
  • Jsoncpp库的使用

        2. 连接池的设计

        在网络通信中,服务器可能收到多个客户端的请求。连接池部署在服务器上,并且服务端会部署各种组件,其中数据库组件可能部署在另外一台服务器上,需要操作数据库,就需要连接另外一台服务器。

        使用线程池和连接池可以大大的提高效率。

        多线程,可以用来处理不用的任务,比如:接受客户端的请求,处理客户端的请求,从连接池中取出连接等。

        对于连接池,首先在连接池中,与数据库建立若干个连接,需要使用连接数据库时,从连接池中取,不需要连接时,将连接放入连接池中。

 实现的功能点:

  • 连接池只需要一个实例,所以连接池是一个单例模式的类。
  • 所有数据库连接需要维护到一个安全的队列中
    • 方便数据库的连接的添加和删除
    • 由于连接池是只有一个实例,所以需要是线程安全的,需要使用互斥锁来保护队列数据的压入和弹出。
  • 在需要的时候可以从数据库中得到一个或者多个可用的数据库连接。
    • 如果有可用连接直接取出
    • 如果没有,阻塞等待一段时间,在重试
  • 如果队列中没有多余可用连接,需要动态创建。不能超过最大连接数。
  • 如果队列中空闲连接太多,需要动态销毁一部分。
  • 数据库操作完毕,需要将连接归还到连接池中。

细节分析:

  • 数据库连接的存储:使用生产者消费者模型,将连接保存到队列中。
    • 生产者:专门生产数据库连接的线程。
    • 消费者:需要访问数据库的线程。
    • 处理生产者消费者模型,需要通过条件变量来使用线程的同步。
  • 连接池连接的动态创建:交给一个单独的线程来处理。
  • 连接池连接的动态销毁:交给一个动态的线程处理。
  • 连接池的默认连接数量:连接池中提供了用连接的最小数量。如果不够,就动态创建,如果太多就动态销毁。
  • 连接池的最大连接数量:能够创建的最大有效数据库的连接数量。
  • 最大空闲时间:创建的数据库连接在指定的一定时间内,未被使用,需要进行销毁。
  • 连接超时:当连接池中没有可用连接,消费者线程无法获取连接,阻塞等待的时间。

        3. 封装MySql API的类

        释放数据库的资源,需要释放创建数据库的资源和结果集的资源。

        使用的C语言MySql接口:用C语言API(常用)操作MySql数据库_两片空白的博客-CSDN博客

#pragma once


#include <iostream>
#include <mysql.h>
#include <chrono>
using namespace std;
using namespace chrono;

class MysqlConn

public:
	//初始化数据库
	MysqlConn();
	//释放数据库资源
	~MysqlConn();
	//连接数据库
	bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
	//更新数据库 update delete insert
	bool update(string sql);
	//查询数据库
	bool query(string sql);
	//遍历数据库,主要是查找结果集中的下一行数据
	bool next();
	//得到结果集中需要行的index列的字段
	string value(int index);
	//事务操作
	bool transaction();
	//提交事务
	bool commit();
	//事务回滚
	bool roolback();
	//刷新连接等待时间
	void refreshAliveTime();
	//获得连接等待时间
	long long getAliveTime();
private:
	//释放结果集的资源
	void releaseRes();

	MYSQL* m_conn = nullptr;
	MYSQL_RES* m_res = nullptr;
	MYSQL_ROW m_row = nullptr;
	steady_clock::time_point m_aliveTime;

;
#include "MysqlConn.h"

MysqlConn::MysqlConn()

	m_conn = mysql_init(nullptr);


MysqlConn::~MysqlConn()

	if (m_conn)
	
		mysql_close(m_conn);
	
	releaseRes();


bool MysqlConn::connect(string user, string passwd, string dbName, string ip, unsigned short port)

	MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
	if (ptr)
	
		return true;
	
	return false;


bool MysqlConn::update(string sql)

	if (mysql_query(m_conn, sql.c_str()))
	
		return false;
	
	return true;


bool MysqlConn::query(string sql)

	//在查询结果集时,需要清空上一次结果集的内存
	releaseRes();
	if (mysql_query(m_conn, sql.c_str()) == 0)
	
		m_res = mysql_store_result(m_conn);
		return true;
	
	return false;


bool MysqlConn::next()

	if (m_res != nullptr)
	
		m_row = mysql_fetch_row(m_res);
		if (m_row)
		
			return true;
		
	
	return false;


//参数时这一行的第几列
string MysqlConn::value(int index)

	//得到当前行中,列的数量
	int resNum = mysql_num_fields(m_res);
	if (index < 0 || index >= resNum)
	
		return string();
	

	char* str = m_row[index];
	unsigned long length = mysql_fetch_lengths(m_res)[index];//获得index列字段的长度
	return string(str, length);


bool MysqlConn::transaction()

	return mysql_autocommit(m_conn, false);


bool MysqlConn::commit()

	return mysql_commit(m_conn);


bool MysqlConn::roolback()

	return mysql_rollback(m_conn);


void MysqlConn::refreshAliveTime()

	m_aliveTime = steady_clock::now();


long long MysqlConn::getAliveTime()

	nanoseconds res = steady_clock::now() - m_aliveTime;
	milliseconds millsec = duration_cast<milliseconds>(res);
	return millsec.count();


void MysqlConn::releaseRes()

	if (m_res)
	
		mysql_free_result(m_res);
	

  数据库连接池类

  • 使用队列来保存个连接,创建minSize个连接。
  • 使用两个单独的线程,一个用来创建连接,一个用来销毁连接,使队列中的线程个数维持在minSize个。
  • 队列是临界资源,pop和push时需要使用互斥锁加锁。
  • 队列为空,针对消费者和队列连接个数等于minSize,针对生产者都需要时用条件变量等待。当生产者生产连接到队列,消费者消费连接,需要唤醒消费者和生产者。
  • 当使用连接个数大于maxSize,也需要使用条件变量等待。
  • 销毁连接,当连接个数大于minSize,并且连接空闲时长大于maxIdleTime。
#pragma once

#include "MysqlConn.h"
#include <queue>
#include <mutex>
#include <condition_variable>

class ConnectionPool

public:
	ConnectionPool* getConnectionPool();
	ConnectionPool(const ConnectionPool& pool) = delete;
	ConnectionPool& operator=(const ConnectionPool& pool) = delete;
	shared_ptr<MysqlConn> getConnection();
private:
	ConnectionPool();
	~ConnectionPool();
	bool parseJsonFile();
	void productConnection();
	void recycleConnection();
	void addConnection();

	string m_ip;
	string m_userName;
	string m_passwd;
	string m_dbName;
	unsigned short m_port;

	int m_maxSize;//最大使用连接数
	int m_minSize;//连接池中最少连接数
	int m_timeout;
	int m_maxIdleTime;
	int m_useCount;//使用的连接数
	queue<MysqlConn*> m_pool;
	mutex m_lock;
	condition_variable m_cond;
;
#include "ConnectionPool.h"
#include <json/json.h>
#include <fstream>
#include <thread>

using namespace Json;
ConnectionPool::ConnectionPool()

	//读取配置
	if (!parseJsonFile())
	
		return;
	

	//生成连接
	for (int i = 0; i < m_minSize; ++i)
	
		addConnection();
	

	//判断连接个数,是否需要创建连接或者销毁连接
	//单独用两个线程处理,处理的是连接池中的连接数
	thread produtor(&ConnectionPool::productConnection, this);
	thread recyclor(&ConnectionPool::recycleConnection, this);
	produtor.detach();
	recyclor.detach();


ConnectionPool::~ConnectionPool()

	while (!m_pool.empty())
	
		MysqlConn* conn = m_pool.front();
		m_pool.pop();
		delete conn;
	


ConnectionPool* ConnectionPool::getConnectionPool()

	static ConnectionPool pool;
	return &pool;


bool ConnectionPool::parseJsonFile()

	ifstream ifs("dbConfig.json");
	Reader rd;
	Value root;
	rd.parse(ifs, root);
	if (root.isObject())
	
		m_ip = root["ip"].asString();
		m_port = root["port"].asInt();
		m_userName = root["userName"].asString();
		m_passwd = root["passwd"].asString();
		m_dbName = root["dbName"].asString();
		m_minSize = root["minSize"].asInt();
		m_maxSize = root["maxSize"].asInt();
		m_maxIdleTime = root["maxIdleTime"].asInt();
		m_timeout = root["timeout"].asInt();

		return true;
	
	return false;


void ConnectionPool::productConnection()

	while (true)
	
		//对整个局域加锁
		unique_lock<mutex> locker(m_lock);
		while (m_pool.size() >= m_minSize)
		
			m_cond.wait(locker);
		
		addConnection();
		//唤醒消费者,生产者只有一个线程,就是当前线程
		m_cond.notify_all();
	


//当连接池的连接个数大于minSize并且等待时长大于最大等待时间,需要销毁
void ConnectionPool::recycleConnection()

	while (true)
	
		this_thread::sleep_for(milliseconds(500));
		lock_guard<mutex> locker(m_lock);
		while (m_pool.size() > m_minSize)
		
			MysqlConn* conn = m_pool.front();
			if (conn->getAliveTime() >= m_maxIdleTime)
			
				//加锁
				m_pool.pop();
				delete conn;
				conn = nullptr;
			
			else
			
				break;
			
		
	


void ConnectionPool::addConnection()

	MysqlConn* conn = new MysqlConn;
	conn->connect(m_userName, m_passwd, m_dbName, m_ip, m_port);
	conn->refreshAliveTime();
	m_pool.push(conn);


shared_ptr<MysqlConn> ConnectionPool::getConnection()

	//加锁
	unique_lock<mutex> locker(m_lock);
	while (m_pool.empty() || m_useCount >= m_maxSize)
	
		//等待一段时间
		if (cv_status::no_timeout == m_cond.wait_for(locker, milliseconds(m_timeout)))
		
			//一段时间内没有被唤醒
			if (m_pool.empty() || m_useCount >= m_maxSize)
			
				continue;
			
		
	
	//从连接池中取连接
	shared_ptr<MysqlConn> conn(m_pool.front(), [this](MysqlConn* conn) 
		//删除器,将连接放回连接池
		lock_guard<mutex> lock(m_lock);
		conn->refreshAliveTime();
		m_pool.push(conn);
		m_useCount--;
		m_cond.notify_all();
		);

	//lock_guard<mutex> lock(m_lock);上面加了
	m_pool.pop();
	m_useCount++;
	//唤醒生产者,由于生产者和消费者使用的同一个环境变量,会将消费者也唤醒
	//但是不影响,会继续判断
	m_cond.notify_all();
	return conn;

测试

#include <iostream>
#include "MysqlConn.h"
#include "ConnectionPool.h"

//测试数据库API
void query()

	MysqlConn conn;
	conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);
	string sql = "insert into person value(2, 20, \\"man\\", \\"jj\\")";
	conn.query(sql);

	sql = "select * from person";
	bool flag = conn.query(sql);
	cout << "flag value" << flag << endl;

	while (conn.next())
	
		cout << conn.value(0) << ","
			<< conn.value(1) << ","
			<< conn.value(2) << ","
			<< conn.value(3) << ","
			<< endl;
	


void op1(int begin, int end)

	for (int i = begin; i < end; ++i)
	
		MysqlConn conn;
		conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);
		//string sql;
		char sql[1024] =  0 ;
		sprintf(sql, "insert into person value(%d, 20, \\"man\\", \\"jj\\")", i);
		conn.update(sql);
	


void op2(ConnectionPool* pool, int begin, int end)

	for (int i = begin; i < end; ++i)
	
		shared_ptr<MysqlConn> ptr = pool->getConnection();
		char sql[1024] = 0;
		sprintf(sql, "insert into person value(%d, 20, \\"man\\", \\"jj\\")", i);
		ptr->update(sql);
	


//单线程
void test1()

#if 1
	//非连接池 单线程 耗时108848850600纳秒 108848毫秒
	steady_clock::time_point begin = steady_clock::now();
	op1(0, 1000);
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "非连接池 单线程 耗时" << length.count() << "纳秒 "
		<< length.count() / 1000000 << "毫秒" << endl;
#else 
	//连接池 单线程 耗时16893997100纳秒 16893毫秒
	ConnectionPool* pool = ConnectionPool::getConnectionPool();
	steady_clock::time_point begin = steady_clock::now();
	op2(pool, 0, 1000);
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "连接池 单线程 耗时" << length.count() << "纳秒 "
		<< length.count() / 1000000 << "毫秒" << endl;
#endif 


void test2()

#if 0
	//非连接池 多线程 耗时20575826400纳秒 20575毫秒
	
	//如果同一时间多个线程连接数据库,会导致一些线程连接失败
	MysqlConn conn;
	conn.connect("root", "Root_123", "testdb", "114.55.92.82", 3306);

	steady_clock::time_point begin = steady_clock::now();
	thread t1(op1, 0, 200);
	thread t2(op1, 200, 400);
	thread t3(op1, 400, 600);
	thread t4(op1, 600, 800);
	thread t5(op1, 800, 1000);
	t1.join();
	t2.join();
	t3.join();
	t4.join();
	t5.join();
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "非连接池 多线程 耗时" << length.count() << "纳秒 "
		<< length.count() / 1000000 << "毫秒" << endl;
#else
	//连接池 多线程 耗时4014272600纳秒 4014毫秒
	ConnectionPool* pool = ConnectionPool::getConnectionPool();
	steady_clock::time_point begin = steady_clock::now();
	thread t1(op2, pool, 0, 200);
	thread t2(op2, pool, 200, 400);
	thread t3(op2, pool, 400, 600);
	thread t4(op2, pool, 600, 800);
	thread t5(op2, pool, 800, 1000);
	t1.join();
	t2.join();
	t3.join();
	t4.join();
	t5.join();
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "连接池 多线程 耗时" << length.count() << "纳秒 "
		<< length.count() / 1000000 << "毫秒" << endl;
#endif 



int main()

	test2();
	return 0;

在vs中引用第三方库

当前项目中会需要使用mysql和jsoncpp

  • 进入项目属性
  • 在下面两项中加入头文件路径和库文件路径

  •  在下面加上引用第三方库名

遇到的问题

  • 编译时报错,找不到libmysql.lib库。

找到对应动态库,将对应动态库加到项目对应exe文件下。

  • 随后报下面错误

        找到下载的mysql代码,在bin目录下,找到对应库文件,拷贝到项目的对应exe文件下

以上是关于数据库连接池的主要内容,如果未能解决你的问题,请参考以下文章

JDBC-连接池

JavaEE——数据连接池

编程马拉松021-数据库连接池

Proxool连接池配置

DBCP

理解数据库连接池底层原理之手写实现