mysql数据库连接池(win32版) By ivan.L 发表于 2018-01-02 本文是自己工作中写的一个WIN32版本MySQL数据库连接池。因网络上Win32版本MySQL连接池较少,这里开源,同时也方便自己后续使用。 操作系统: 64位win7 编译器: vs2013 1. 头文件PKV10_Mysql.h #ifndef _PKV10_MYSQL_H_INCLUDED_ #define _PKV10_MYSQL_H_INCLUDED_ #include "stdafx.h" #include <string> #include <set> #include <list> #include "mysql.h" #define MYSQL_STATUS_UNCONNECTED 0x0 #define MYSQL_STATUS_CONNECTED 0x1 class MySQLConnectionPool; class MySQLConnection{ public: MySQLConnection(MySQLConnectionPool *connection_pool){ conn = NULL; conn_status = MYSQL_STATUS_UNCONNECTED; this->pConnectionPool = connection_pool; ownerQueue = NULL; lastUseTick = 0x0; lastFixTick = 0x0; failFixCnt = 0x0; nextFixPeriod = 0x0; } ~MySQLConnection(){ if (conn){ mysql_close(conn); } } //先决条件: Init()成功 bool Connect(); bool Init(){ if ((conn = mysql_init(NULL)) == NULL) return false; return true; } bool Ping(){ if (mysql_ping(conn)){ this->conn_status = MYSQL_STATUS_UNCONNECTED; return false; } lastUseTick = ::GetTickCount(); //Refresh the lastUseTick return true; } void MarkUnconnected(){ this->conn_status = MYSQL_STATUS_UNCONNECTED; } bool IsGoodConnection(){ return (conn_status == MYSQL_STATUS_CONNECTED); } public: MYSQL *conn; private: int conn_status; //连接状态 0----unconnect 1--connected MySQLConnectionPool *pConnectionPool; void *ownerQueue; DWORD lastUseTick; //本连接最近一次被使用的时间(如果连接长时间不使用,需要使用ping来保活,只会对good_connections中的连接进行保活) DWORD lastFixTick; //本连接在上一次进行修复时的时间戳 DWORD failFixCnt; //修复失败的次数 DWORD nextFixPeriod; //下一次修复的时间间隔 friend class MySQLConnectionPool; }; #define CONNECTION_PING_PERIOD 300000 //every 5min, we will try ping the idle connections class MySQLConnectionPool{ private: MySQLConnectionPool(){ databaseURL = ""; databasePort = 0x0; usrName = ""; passwd = ""; schema = ""; max_conn = 0x0; ::InitializeCriticalSection(&connCs); ::InitializeConditionVariable(&goodNotEmpty); ::InitializeConditionVariable(&tmpBadBufNotEmpty); bStop = false; poolThread = NULL; } ~MySQLConnectionPool(); public: bool Init(char databaseURL[], int databasePort, char usrName[], char passwd[], char schema[], int max_conn); /* * 说明: 本函数通常用于内部线程使用,外部不应该进行调用。 * * 由于MySQLConnection::Connect()是一个阻塞函数,因此InitialConnect()通常会放在线程中来进行初次连接,以防止程序被阻塞 */ void InitialConnect(); //说明: 用于等待当前已经有处于"成功"状态的空闲连接 bool WaitGoodNotEmpty(DWORD dwMilliseconds); //说明:此处我们另外提供IsAllBad()函数来判断是否有“好”的连接,因为“好”的连接有可能被Borrow()出去了,因此我们并不能通过 // 直接查看good_connections是否为空来判断(注: 非线程安全,不能严格判断当前所有连接都处于bad状态,通常我们只会用此函数来 // 进行一些非严格的健康检查与程序恢复) bool IsAllBad(); MySQLConnection *Borrow(); MySQLConnection *Borrow(DWORD dwMilliseconds); void Recycle(MySQLConnection *conn); /* * 说明:非线程安全函数,当前仅被MySQLConnectionPool内部线程使用 */ void WaitNewBadConnections_unsafe(DWORD dwMilliseconds); /* * 说明: 返回bad_connections中下一次最小的修复时间间隔 * (注: 本函数为非线程安全函数,当前仅被MySQLConnectionPool的内部线程使用) */ DWORD FixBadConnections_unsafe(); DWORD PingIdle(); const inline std::string &GetDatabaseURL(){ return databaseURL; } int inline GetDatabasePort(){ return databasePort; } const inline std::string &GetUsrName(){ return usrName; } const inline std::string &GetPassword(){ return passwd; } const inline std::string &GetSchema(){ return schema; } int inline GetMaxConn(){ return max_conn; } static MySQLConnectionPool *GetInstance(); private: std::string databaseURL; int databasePort; std::string usrName; std::string passwd; std::string schema; int max_conn; std::set<MySQLConnection *> all_connections; std::list<MySQLConnection *> good_connections; //采用list,可以让每一条连接以round robin方式进行获取 /* * 说明: tmpbad_buf用于临时保存bad connections。由于MySQLConnection::Connect()函数是一个阻塞操作,因此我们真正的重连 * 会放到下面的unsafe_bad_connections中来进行。如若不采用此临时缓冲,则在重连过程中可能会长期占用connCs这一互斥锁,从而 * 导致不能快速的获取到good_connections中的空闲连接 */ std::set<MySQLConnection *> tmpbad_buf; CRITICAL_SECTION connCs; //用于保护good_connections以及tmpbad_buf CONDITION_VARIABLE goodNotEmpty; CONDITION_VARIABLE tmpBadBufNotEmpty; HANDLE poolThread; std::set<MySQLConnection *> unsafe_bad_connections; //unsafe_bad_connections为非线程安全队列,当前仅会被poolThread内部使用 bool bStop; }; #endif 2. 源文件PKV10_Mysql.cpp #include "stdafx.h" #include "log4z.h" #include "PKV10_Mysql.h" static DWORD WINAPI PoolThreadProc(LPVOID lpParameter); //先决条件: Init()成功 bool MySQLConnection::Connect(){ const std::string & databaseURL = pConnectionPool->GetDatabaseURL(); const std::string & usrName = pConnectionPool->GetUsrName(); const std::string & password = pConnectionPool->GetPassword(); const std::string & schema = pConnectionPool->GetSchema(); int databasePort = pConnectionPool->GetDatabasePort(); if (mysql_real_connect(conn, databaseURL.c_str(), usrName.c_str(), password.c_str(), schema.c_str(), databasePort, NULL, 0x0) == NULL){ LOGFMTE("connect %s:%d(%s) failure: %s", databaseURL.c_str(), databasePort, schema.c_str(), mysql_error(conn)); this->conn_status = MYSQL_STATUS_UNCONNECTED; return false; } this->conn_status = MYSQL_STATUS_CONNECTED; return true; } MySQLConnectionPool::~MySQLConnectionPool(){ std::set<MySQLConnection *>::iterator it; ::WakeConditionVariable(&goodNotEmpty); ::WakeConditionVariable(&tmpBadBufNotEmpty); good_connections.clear(); tmpbad_buf.clear(); unsafe_bad_connections.clear(); for (it = all_connections.begin(); it != all_connections.end(); ){ MySQLConnection *conn = *it; delete conn; all_connections.erase(it++); } DeleteCriticalSection(&connCs); } bool MySQLConnectionPool::Init(char databaseURL[], int databasePort, char usrName[], char passwd[], char schema[], int max_conn){ this->databaseURL = std::string(databaseURL); this->databasePort = databasePort; this->usrName = std::string(usrName); this->passwd = std::string(passwd); this->schema = std::string(schema); this->max_conn = max_conn; if (this->max_conn <= 0) this->max_conn = 5; else if (max_conn > 50) this->max_conn = 50; for (int i = 0x0; i < this->max_conn; i++){ MySQLConnection *conn = new MySQLConnection(this); if (conn->Init() == false){ delete conn; return false; } all_connections.insert(conn); } poolThread = ::CreateThread(NULL, 0x0, PoolThreadProc, this, 0x0, NULL); if (poolThread == NULL){ return false; } return true; } /* * 说明: 本函数通常用于内部线程使用,外部不应该进行调用。 * * 由于MySQLConnection::Connect()是一个阻塞函数,因此InitialConnect()通常会放在线程中来进行初次连接,以防止程序被阻塞 */ void MySQLConnectionPool::InitialConnect(){ std::set<MySQLConnection *>::iterator it; DWORD currentTick; EnterCriticalSection(&connCs); for (it = all_connections.begin(); it != all_connections.end(); it++){ MySQLConnection *pConnection = *it; currentTick = ::GetTickCount(); if (pConnection->Connect()) { pConnection->lastUseTick = currentTick; pConnection->ownerQueue = &good_connections; good_connections.push_back(pConnection); } else{ pConnection->ownerQueue = &unsafe_bad_connections; pConnection->lastFixTick = currentTick; pConnection->failFixCnt = 0; pConnection->nextFixPeriod = 0; unsafe_bad_connections.insert(pConnection); } } if (good_connections.empty() == false){ ::WakeConditionVariable(&goodNotEmpty); } LeaveCriticalSection(&connCs); } //说明: 用于等待当前已经有处于"成功"状态的空闲连接 bool MySQLConnectionPool::WaitGoodNotEmpty(DWORD dwMilliseconds){ bool ret; EnterCriticalSection(&connCs); if (good_connections.empty()) { ::SleepConditionVariableCS(&goodNotEmpty, &connCs, dwMilliseconds); } if (!good_connections.empty()) ret = true; else ret = false; LeaveCriticalSection(&connCs); return ret; } //说明:此处我们另外提供IsAllBad()函数来判断是否有“好”的连接,因为“好”的连接有可能被Borrow()出去了,因此我们并不能通过 // 直接查看good_connections是否为空来判断(注: 非线程安全,不能严格判断当前所有连接都处于bad状态,通常我们只会用此函数来 // 进行一些非严格的健康检查与程序恢复) bool MySQLConnectionPool::IsAllBad(){ int badCnt = unsafe_bad_connections.size(); return badCnt == max_conn; } MySQLConnection * MySQLConnectionPool::Borrow(){ MySQLConnection *mysqlConn = NULL; EnterCriticalSection(&connCs); while (good_connections.empty()){ SleepConditionVariableCS(&goodNotEmpty, &connCs, INFINITE); } mysqlConn = good_connections.front(); mysqlConn->ownerQueue = NULL; good_connections.pop_front(); LeaveCriticalSection(&connCs); return mysqlConn; } MySQLConnection *MySQLConnectionPool::Borrow(DWORD dwMilliseconds){ MySQLConnection *mysqlConn = NULL; EnterCriticalSection(&connCs); if (good_connections.empty()){ SleepConditionVariableCS(&goodNotEmpty, &connCs, dwMilliseconds); } if (!good_connections.empty()){ mysqlConn = good_connections.front(); good_connections.pop_front(); mysqlConn->ownerQueue = NULL; } LeaveCriticalSection(&connCs); return mysqlConn; } void MySQLConnectionPool::Recycle(MySQLConnection *conn){ std::set<MySQLConnection *>::iterator it; if (!conn) return; EnterCriticalSection(&connCs); it = all_connections.find(conn); if (it == all_connections.end()) { LeaveCriticalSection(&connCs); return; } if (conn->ownerQueue != NULL){ LeaveCriticalSection(&connCs); return; } if (conn->IsGoodConnection()){ if (good_connections.empty()){ good_connections.push_back(conn); conn->ownerQueue = &good_connections; ::WakeConditionVariable(&goodNotEmpty); } else{ good_connections.push_back(conn); conn->ownerQueue = &good_connections; } conn->lastUseTick = ::GetTickCount(); //Refresh the lastUseTick } else{ if (tmpbad_buf.empty()){ tmpbad_buf.insert(conn); ::WakeConditionVariable(&tmpBadBufNotEmpty); } else{ tmpbad_buf.insert(conn); } conn->ownerQueue = &tmpbad_buf; conn->failFixCnt = 0x0; conn->nextFixPeriod = 0x0; conn->lastFixTick = 0x0; } LeaveCriticalSection(&connCs); return; } //说明: 本函数返回的是bad_connections中当前最小的修复时间间隔,我们会以该时间间隔来启动下一次修复。这可能使得 //对于有一些连接的修复时间被推迟(这里我们采用指数退避的方式进行重连,这个推迟时间在实际环境中应该不会有太大的 //问题,并且可以避免出现在短时间内因断联而导致的太过频繁的重连发生) DWORD MySQLConnectionPool::FixBadConnections_unsafe(){ std::set<MySQLConnection *>::iterator it; DWORD nextFixTimeout = INFINITE; DWORD currentTick = ::GetTickCount(); DWORD timeElapse = 0x0; badcnt = 0; for (it = unsafe_bad_connections.begin(); it != unsafe_bad_connections.end();){ MySQLConnection *pConnection = *it; if (currentTick < pConnection->lastFixTick){ //tickcnt has overflow timeElapse = 0xFFFFFFFF - pConnection->lastFixTick + currentTick; } else{ timeElapse = currentTick - pConnection->lastFixTick; } if (pConnection->nextFixPeriod <= timeElapse){ //should reconnect here pConnection->lastFixTick = currentTick; if (pConnection->Connect()){ unsafe_bad_connections.erase(it++); EnterCriticalSection(&connCs); if (good_connections.empty()){ good_connections.push_back(pConnection); ::WakeConditionVariable(&goodNotEmpty); } else{ good_connections.push_back(pConnection); } pConnection->ownerQueue = &good_connections; pConnection->conn_status = MYSQL_STATUS_CONNECTED; pConnection->lastUseTick = currentTick; //Refresh the lastUseTick pConnection->lastFixTick = 0; pConnection->failFixCnt = 0; pConnection->nextFixPeriod = 0x0; LeaveCriticalSection(&connCs); } else{ //才用指数退避的方式进行重连 pConnection->failFixCnt++; if (pConnection->nextFixPeriod == 0){ pConnection->nextFixPeriod = 200; } else{ pConnection->nextFixPeriod *= 2; } if (pConnection->nextFixPeriod >= 40000) //40s pConnection->nextFixPeriod = 40000; if (pConnection->nextFixPeriod < nextFixTimeout){ nextFixTimeout = pConnection->nextFixPeriod; } it++; } } else{ if (pConnection->nextFixPeriod < nextFixTimeout){ nextFixTimeout = pConnection->nextFixPeriod; } it++; } } return nextFixTimeout; } void MySQLConnectionPool::WaitNewBadConnections_unsafe(DWORD dwMilliseconds){ std::set<MySQLConnection *>::iterator it; EnterCriticalSection(&connCs); if (tmpbad_buf.empty()){ ::SleepConditionVariableCS(&tmpBadBufNotEmpty, &connCs, dwMilliseconds); } /* * 说明:WaitNewBadConnections_unsafe与FixBadConnections_unsafe()工作于同一内部线程,此处将tmpbad_buf中的连接添加到 * unsafe_bad_connections队列中是不会出问题的 */ for (it = tmpbad_buf.begin(); it != tmpbad_buf.end(); it++){ MySQLConnection *pConnection = *it; tmpbad_buf.erase(it++); pConnection->ownerQueue = &unsafe_bad_connections; unsafe_bad_connections.insert(pConnection); } LeaveCriticalSection(&connCs); } DWORD MySQLConnectionPool::PingIdle(){ std::list<MySQLConnection *>::iterator it; bool ret; DWORD nextPingPeriod = CONNECTION_PING_PERIOD; DWORD currentTick = ::GetTickCount(); DWORD timeElapse; EnterCriticalSection(&connCs); for (it = good_connections.begin(); it != good_connections.end(); ){ MySQLConnection *pConnection = (*it); if (currentTick < pConnection->lastUseTick){ //tick overflow timeElapse = 0xFFFFFFFF - pConnection->lastUseTick + currentTick; } else{ timeElapse = currentTick - pConnection->lastUseTick; } if (timeElapse < CONNECTION_PING_PERIOD){ nextPingPeriod = CONNECTION_PING_PERIOD - timeElapse; break; } ret = pConnection->Ping(); if (ret == false){ //find an bad connection good_connections.erase(it++); pConnection->failFixCnt = 0x0; pConnection->nextFixPeriod = 0; pConnection->ownerQueue = &tmpbad_buf; if (tmpbad_buf.empty()){ tmpbad_buf.insert(pConnection); ::WakeConditionVariable(&tmpBadBufNotEmpty); } else{ tmpbad_buf.insert(pConnection); } } else{ it++; } } LeaveCriticalSection(&connCs); return nextPingPeriod; } MySQLConnectionPool * MySQLConnectionPool::GetInstance(){ static MySQLConnectionPool thePool; return &thePool; } DWORD WINAPI PoolThreadProc(LPVOID lpParameter){ MySQLConnectionPool *thePool = (MySQLConnectionPool *)lpParameter; DWORD nextWaitTime = INFINITE; DWORD currentTick; DWORD lastPingTick = ::GetTickCount(); DWORD nextPingPeriod = CONNECTION_PING_PERIOD; DWORD timeElapse; thePool->InitialConnect(); nextWaitTime = thePool->FixBadConnections_unsafe(); while (1){ if (nextWaitTime > CONNECTION_PING_PERIOD){ nextWaitTime = CONNECTION_PING_PERIOD; } else if (nextWaitTime < 100){ nextWaitTime = 100; //100ms } thePool->WaitNewBadConnections_unsafe(nextWaitTime); nextWaitTime = thePool->FixBadConnections_unsafe(); //ping idle check currentTick = ::GetTickCount(); if (currentTick < lastPingTick){ timeElapse = INFINITE - lastPingTick + currentTick; } else{ timeElapse = currentTick - lastPingTick; } if (timeElapse >= nextPingPeriod){ nextPingPeriod = thePool->PingIdle(); lastPingTick = currentTick; } if (nextWaitTime > nextPingPeriod) nextWaitTime = nextPingPeriod; } } [参看]: