源码链接: https://github.com/xingzhuz/MysqlLinkPool
前置知识:
相关的环境配置: https://xingzhu.top/archives/shu-ju-ku-lian-jie-chi-huan-jing-pei-zhi
MySQL API: https://subingwen.cn/mysql/mysql-api/
Jsoncpp API: https://xingzhu.top/archives/jsoncpp
C++多线程: https://xingzhu.top/archives/duo-xian-cheng-xian-cheng-chi
概述
- 数据库连接池的作用是管理和复用数据库连接,以提高应用程序的性能和资源利用率
- 通过预先创建一定数量的连接并将它们存储在池中,应用程序可以快速获取连接,而无需每次都进行昂贵的连接创建和销毁过程
- 这不仅减少了延迟,还能有效控制数据库连接的数量,防止资源耗尽,从而提升整体系统的稳定性和响应速度

具体实现
- 连接池只需要一个实例,所以连接池类应该是一个单例模式的类
1 2 3 4 5 6 7 8
| ConnectionPool *ConnectionPool::getConnectPool() { static ConnectionPool pool; return &pool; }
|
- 所有的数据库连接应该维护到一个队列中,使用队列的目的是方便连接的添加和删除
- 由于队列是连接池共享资源,需要使用互斥锁来保护队列数据的读写
- 由于数据库有连接上限,过多会压力过大,导致性能降低,所以需要设置一个最大连接上限;为了应对突然的高并发操作,需要设置一个最小连接数,这个最小连接数是维护队列的最小数量,保证有这么多连接供使用
- 客户端在满足条件的情况下,从连接池取连接,然后从进行使用,使用完毕后,归还这个连接,让这个连接重新加入队列中,不是销毁掉
- 因此队列存储的是一个数据库连接的对象,即
MYSQL*
,为了便于使用,先封装一个 MYSQL
类,用于连接
封装数据库头文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #pragma once #include <iostream> #include <mysql.h> #include <chrono> using namespace std; using namespace chrono; class MysqlConn { public: MysqlConn();
~MysqlConn();
bool connect(string user, string passwd, string dbName, string ip, unsigned short port = 3306);
bool update(string sql);
bool query(string sql);
bool next();
string value(int index);
bool transaction();
bool commit();
bool rollback();
void refreshAliveTime();
long long getAliveTime();
private: void freeResult(); MYSQL *m_conn = nullptr; MYSQL_RES *m_result = nullptr; MYSQL_ROW m_row = nullptr; steady_clock::time_point m_alivetime; };
|
连接池头文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| #pragma once #include <queue> #include <mutex> #include <condition_variable> #include <thread> #include "MysqlConn.h" #include <atomic> using namespace std;
class ConnectionPool { public: static ConnectionPool *getConnectPool();
ConnectionPool(const ConnectionPool &obj) = delete; ConnectionPool &operator=(const ConnectionPool &obj) = delete;
shared_ptr<MysqlConn> getConnection(); ~ConnectionPool();
private: ConnectionPool();
bool parseJsonFile();
void produceConnection();
void recycleConnection();
void addConnection();
string m_ip; string m_user; string m_passwd; string m_dbName; unsigned short m_port; int m_minSize; int m_maxSize; atomic<int> m_busySize; int m_timeout; int m_maxIdleTime;
queue<MysqlConn *> m_connectionQ; mutex m_mutexQ; condition_variable m_cond; };
|
构造函数
- 首先解析 json 格式数据,设置端口号和 IP
- 然后以最小连接数添加连接到队列
- 创建两个线程进行管理添加连接和销毁连接的实现
添加连接
- 这个使用一个线程单独维护,一直运行,除非连接池发送一个关闭信号
- 持续判断能否添加连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void ConnectionPool::produceConnection() { while (true) { if (isShutdown) return;
unique_lock<mutex> locker(m_mutexQ); while (m_connectionQ.size() >= m_minSize || m_connectionQ.size() + m_busySize >= m_maxSize) { m_cond.wait(locker);
if (isShutdown) return; }
addConnection(); m_cond.notify_all(); } }
|
- 如果队列的数量比最小连接数大,就不需要添加连接
- 如果队列的连接数量 + 忙的连接 (正在被使用的) 超过了连接上限,就不需要往队列里生产连接了,即使此时队列的数量比最小连接数小,仍不能添加连接,因为添加连接后,客户端就能取,如果都取了,就超过连接上限了,因此维护的是队列连接数 + 忙的连接数要小于最大连接上限
- 也就是只有队列的数量比最小连接数小,并且此时队列的连接数量 + 忙的连接 (正在被使用的) 小于连接上限,才添加连接到队列
销毁连接
- 也是使用一个线程隔一段时间就进行检测,销毁的连接是队列中的空闲连接
- 因此取头部连接,计算它的存活时间,如果大于我们设定的值,就销毁这个连接,弹出队列,销毁它是因为它一直没被使用,属于空闲连接,只需一直销毁到保持有最小连接数即可
- 注意销毁弹出之前,需要满足队列的数量大于最小连接数,如果小于,就不进行这个操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
void ConnectionPool::recycleConnection() { while (true) { if (isShutdown) return;
this_thread::sleep_for(chrono::milliseconds(500)); unique_lock<mutex> locker(m_mutexQ); while (m_connectionQ.size() > m_minSize) { MysqlConn *conn = m_connectionQ.front();
if (conn->getAliveTime() >= m_maxIdleTime) { m_connectionQ.pop(); delete conn; } else break; } } }
|
取连接
- 首先需要检测这个队列是否为空,和现在忙的 (即正在使用的连接) 数量是否大于了连接上限,如果满足这二者之中的任何一个,都对其进行阻塞等待,不取连接
- 这个阻塞设置一个超时时间,如果满足上述二者只中一个,就阻塞,达到阻塞时长后,这个阻塞会解除,自己进行选择是退出还是继续进行阻塞
- 取连接后,由于这个连接不使用需要回收,重新加入队列中,因此需要保存这个连接,这个是一个指针,因此考虑共享智能指针维护,更安全
- 并且这个共享智能指针还有个功能,可以指定删除器函数,也就是这个指针生命周期结束后的指针处理,默认是清除智能指针指向的地址,由于这里不是进行销毁,我们需要重新加入队列,因此重定义删除器函数,使之为这个指针 (这个数据库连接)重新加入队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| shared_ptr<MysqlConn> ConnectionPool::getConnection() { unique_lock<mutex> locker(m_mutexQ); while (m_connectionQ.empty() || (m_busySize >= m_maxSize)) { if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout))) { if (m_connectionQ.empty() || (m_busySize >= m_maxSize)) { continue; } } }
shared_ptr<MysqlConn> connptr( m_connectionQ.front(), [this](MysqlConn *conn) { unique_lock<mutex> locker(m_mutexQ);
conn->refreshAliveTime(); m_connectionQ.push(conn);
m_busySize--; m_cond.notify_all(); });
m_connectionQ.pop();
m_busySize++;
m_cond.notify_all(); return connptr; }
|
解析 Json
- 为了不写死这个数据库连接的 IP 和端口号,这里使用 json 格式读取进去赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| bool ConnectionPool::parseJsonFile() { ifstream ifs("dbconf.json"); Reader rd; Value root; rd.parse(ifs, root); if (root.isObject()) { m_ip = root["ip"].asString(); m_port = root["port"].asInt(); m_user = root["userName"].asString(); m_passwd = root["password"].asString(); m_dbName = root["dbName"].asString(); m_minSize = root["minSize"].asInt(); m_maxSize = root["maxSize"].asInt(); m_maxIdleTime = root["maxIdleTime"].asInt(); m_timeout = root["timeout"].asInt(); m_busySize = 0; return true; } return false; }
|
说明: 参考学习 https://www.bilibili.com/video/BV1Fr4y1s7w4/?spm_id_from=333.999.0.0


xingzhu
keep trying!keep doing!believe in yourself!
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 星竹!