本文是自己工作中写的一个WIN32版本MySQL数据库连接池。因网络上Win32版本MySQL连接池较少,这里开源,同时也方便自己后续使用。

  • 操作系统: 64位win7

  • 编译器: vs2013

1. 头文件PKV10_Mysql.h

#ifndef _PKV10_MYSQL_H_INCLUDED_
#define _PKV10_MYSQL_H_INCLUDED_

#include "stdafx.h"
#include <string>
#include <set>
#include <list>
#include "mysql.h"



#define MYSQL_STATUS_UNCONNECTED   0x0
#define MYSQL_STATUS_CONNECTED     0x1

class MySQLConnectionPool;


class MySQLConnection{
public:
	MySQLConnection(MySQLConnectionPool *connection_pool){
		conn = NULL;
		conn_status = MYSQL_STATUS_UNCONNECTED;
		this->pConnectionPool = connection_pool;
		ownerQueue = NULL;

		lastUseTick = 0x0;

		lastFixTick = 0x0;
		failFixCnt = 0x0;
		nextFixPeriod = 0x0;
	}

	~MySQLConnection(){
		if (conn){
			mysql_close(conn);
		}
	}

	//先决条件: Init()成功
	bool Connect();


	bool Init(){
		if ((conn = mysql_init(NULL)) == NULL)
			return false;

		return true;
	}

	

	bool Ping(){
		if (mysql_ping(conn)){
			this->conn_status = MYSQL_STATUS_UNCONNECTED;
			return false;
		}
		lastUseTick = ::GetTickCount();    //Refresh the lastUseTick
		return true;
	}

	void MarkUnconnected(){
		this->conn_status = MYSQL_STATUS_UNCONNECTED;
	}

	bool IsGoodConnection(){
		return (conn_status == MYSQL_STATUS_CONNECTED);
	}

public:
	MYSQL *conn;  

private:
	int conn_status;        //连接状态   0----unconnect  1--connected
	MySQLConnectionPool *pConnectionPool;
	void *ownerQueue;
	DWORD lastUseTick;       //本连接最近一次被使用的时间(如果连接长时间不使用,需要使用ping来保活,只会对good_connections中的连接进行保活)

	DWORD lastFixTick;        //本连接在上一次进行修复时的时间戳
	DWORD failFixCnt;         //修复失败的次数
	DWORD nextFixPeriod;      //下一次修复的时间间隔

	friend class MySQLConnectionPool;
};

#define CONNECTION_PING_PERIOD           300000           //every 5min, we will try ping the idle connections    
class MySQLConnectionPool{
private:
	MySQLConnectionPool(){
		databaseURL = "";
		databasePort = 0x0;
		usrName = "";
		passwd = "";
		schema = "";
		max_conn = 0x0;
		
	
		::InitializeCriticalSection(&connCs);
		::InitializeConditionVariable(&goodNotEmpty);
		::InitializeConditionVariable(&tmpBadBufNotEmpty);

		bStop = false;
		poolThread = NULL;
	}

	~MySQLConnectionPool(); 
	

public:
	bool Init(char databaseURL[], int databasePort, char usrName[], char passwd[], char schema[], int max_conn);

	/*
	 * 说明: 本函数通常用于内部线程使用,外部不应该进行调用。
	 * 
	 * 由于MySQLConnection::Connect()是一个阻塞函数,因此InitialConnect()通常会放在线程中来进行初次连接,以防止程序被阻塞
	*/
	void InitialConnect();

	//说明: 用于等待当前已经有处于"成功"状态的空闲连接
	bool WaitGoodNotEmpty(DWORD dwMilliseconds);

	//说明:此处我们另外提供IsAllBad()函数来判断是否有“好”的连接,因为“好”的连接有可能被Borrow()出去了,因此我们并不能通过
	// 直接查看good_connections是否为空来判断(注: 非线程安全,不能严格判断当前所有连接都处于bad状态,通常我们只会用此函数来
	// 进行一些非严格的健康检查与程序恢复)
	bool IsAllBad();

	MySQLConnection *Borrow();

	MySQLConnection *Borrow(DWORD dwMilliseconds);

	void Recycle(MySQLConnection *conn);


	/*
	 * 说明:非线程安全函数,当前仅被MySQLConnectionPool内部线程使用 
	*/
	void WaitNewBadConnections_unsafe(DWORD dwMilliseconds);

	/*
	* 说明: 返回bad_connections中下一次最小的修复时间间隔
	* (注: 本函数为非线程安全函数,当前仅被MySQLConnectionPool的内部线程使用)
	*/
	DWORD FixBadConnections_unsafe();
	
	

	DWORD PingIdle();

	const inline std::string &GetDatabaseURL(){
		return databaseURL;
	}
	int inline GetDatabasePort(){
		return databasePort;
	}

	const inline std::string &GetUsrName(){
		return usrName;
	}

	const inline std::string &GetPassword(){
		return passwd;
	}

	const inline std::string &GetSchema(){
		return schema;
	}

	int inline GetMaxConn(){
		return max_conn;
	}

	static MySQLConnectionPool *GetInstance();




private:
	std::string databaseURL;
	int databasePort;
	std::string usrName;
	std::string passwd;
	std::string schema;
	int max_conn;
	


	std::set<MySQLConnection *> all_connections;
	std::list<MySQLConnection *> good_connections;    //采用list,可以让每一条连接以round robin方式进行获取
	/*
	 * 说明: tmpbad_buf用于临时保存bad connections。由于MySQLConnection::Connect()函数是一个阻塞操作,因此我们真正的重连
	 * 会放到下面的unsafe_bad_connections中来进行。如若不采用此临时缓冲,则在重连过程中可能会长期占用connCs这一互斥锁,从而
	 * 导致不能快速的获取到good_connections中的空闲连接
	 */
	std::set<MySQLConnection *> tmpbad_buf;          
	CRITICAL_SECTION connCs;                         //用于保护good_connections以及tmpbad_buf
	CONDITION_VARIABLE goodNotEmpty;
	CONDITION_VARIABLE tmpBadBufNotEmpty;            

	HANDLE poolThread;
	std::set<MySQLConnection *> unsafe_bad_connections;      //unsafe_bad_connections为非线程安全队列,当前仅会被poolThread内部使用

	bool bStop;
};




#endif

2. 源文件PKV10_Mysql.cpp

#include "stdafx.h"
#include "log4z.h"
#include "PKV10_Mysql.h"


static DWORD WINAPI PoolThreadProc(LPVOID lpParameter);


//先决条件: Init()成功
bool MySQLConnection::Connect(){
	const std::string & databaseURL = pConnectionPool->GetDatabaseURL();
	const std::string & usrName = pConnectionPool->GetUsrName();
	const std::string & password = pConnectionPool->GetPassword();
	const std::string & schema = pConnectionPool->GetSchema();
	int databasePort = pConnectionPool->GetDatabasePort();

	if (mysql_real_connect(conn, databaseURL.c_str(), usrName.c_str(), password.c_str(), schema.c_str(), databasePort, NULL, 0x0) == NULL){
		LOGFMTE("connect %s:%d(%s) failure: %s", databaseURL.c_str(), databasePort, schema.c_str(), mysql_error(conn));
		
		this->conn_status = MYSQL_STATUS_UNCONNECTED;
		return false;
	}

	this->conn_status = MYSQL_STATUS_CONNECTED;
	return true;
}


MySQLConnectionPool::~MySQLConnectionPool(){
	std::set<MySQLConnection *>::iterator it;

	::WakeConditionVariable(&goodNotEmpty);
	::WakeConditionVariable(&tmpBadBufNotEmpty);

	good_connections.clear();
	tmpbad_buf.clear();
	unsafe_bad_connections.clear();

	for (it = all_connections.begin(); it != all_connections.end(); ){
		MySQLConnection *conn = *it;
		delete conn;
		all_connections.erase(it++);
	}

	DeleteCriticalSection(&connCs);
}

bool MySQLConnectionPool::Init(char databaseURL[], int databasePort, char usrName[], char passwd[], char schema[], int max_conn){
	this->databaseURL = std::string(databaseURL);
	this->databasePort = databasePort;
	this->usrName = std::string(usrName);
	this->passwd = std::string(passwd);
	this->schema = std::string(schema);

	this->max_conn = max_conn;
	if (this->max_conn <= 0)
		this->max_conn = 5;
	else if (max_conn > 50)
		this->max_conn = 50;

	for (int i = 0x0; i < this->max_conn; i++){
		MySQLConnection *conn = new MySQLConnection(this);
		if (conn->Init() == false){
			delete conn;
			return false;
		}
		all_connections.insert(conn);
	}


	poolThread = ::CreateThread(NULL, 0x0, PoolThreadProc, this, 0x0, NULL);
	if (poolThread == NULL){
		return false;
	}

	return true;
}

/*
* 说明: 本函数通常用于内部线程使用,外部不应该进行调用。
*
* 由于MySQLConnection::Connect()是一个阻塞函数,因此InitialConnect()通常会放在线程中来进行初次连接,以防止程序被阻塞
*/
void MySQLConnectionPool::InitialConnect(){
	std::set<MySQLConnection *>::iterator it;
	DWORD currentTick;

	EnterCriticalSection(&connCs);

	for (it = all_connections.begin(); it != all_connections.end(); it++){
		MySQLConnection *pConnection = *it;
		currentTick = ::GetTickCount();

		if (pConnection->Connect())
		{
			pConnection->lastUseTick = currentTick;
			pConnection->ownerQueue = &good_connections;
			good_connections.push_back(pConnection);
		}
		else{
			pConnection->ownerQueue = &unsafe_bad_connections;
			pConnection->lastFixTick = currentTick;
			pConnection->failFixCnt = 0;
			pConnection->nextFixPeriod = 0;
			unsafe_bad_connections.insert(pConnection);
		}
	}

	if (good_connections.empty() == false){
		::WakeConditionVariable(&goodNotEmpty);
	}

	LeaveCriticalSection(&connCs);
}

//说明: 用于等待当前已经有处于"成功"状态的空闲连接
bool MySQLConnectionPool::WaitGoodNotEmpty(DWORD dwMilliseconds){
	bool ret;

	EnterCriticalSection(&connCs);
	if (good_connections.empty())
	{
		::SleepConditionVariableCS(&goodNotEmpty, &connCs, dwMilliseconds);
	}

	if (!good_connections.empty())
		ret = true;
	else
		ret = false;
	LeaveCriticalSection(&connCs);

	return ret;
}


//说明:此处我们另外提供IsAllBad()函数来判断是否有“好”的连接,因为“好”的连接有可能被Borrow()出去了,因此我们并不能通过
// 直接查看good_connections是否为空来判断(注: 非线程安全,不能严格判断当前所有连接都处于bad状态,通常我们只会用此函数来
// 进行一些非严格的健康检查与程序恢复)
bool MySQLConnectionPool::IsAllBad(){
	int badCnt = unsafe_bad_connections.size();

	return badCnt == max_conn;
}


MySQLConnection * MySQLConnectionPool::Borrow(){
	MySQLConnection *mysqlConn = NULL;
	EnterCriticalSection(&connCs);

	while (good_connections.empty()){
		SleepConditionVariableCS(&goodNotEmpty, &connCs, INFINITE);
	}

	mysqlConn = good_connections.front();
	mysqlConn->ownerQueue = NULL;
	good_connections.pop_front();

	LeaveCriticalSection(&connCs);
	return mysqlConn;
}

MySQLConnection *MySQLConnectionPool::Borrow(DWORD dwMilliseconds){
	MySQLConnection *mysqlConn = NULL;
	EnterCriticalSection(&connCs);

	if (good_connections.empty()){
		SleepConditionVariableCS(&goodNotEmpty, &connCs, dwMilliseconds);
	}

	if (!good_connections.empty()){
		mysqlConn = good_connections.front();
		good_connections.pop_front();
		mysqlConn->ownerQueue = NULL;
	}

	LeaveCriticalSection(&connCs);
	return mysqlConn;
}

    
void MySQLConnectionPool::Recycle(MySQLConnection *conn){
	std::set<MySQLConnection *>::iterator it;
	if (!conn)
		return;
	
	EnterCriticalSection(&connCs);
	
	it = all_connections.find(conn);
	if (it == all_connections.end())
	{
		LeaveCriticalSection(&connCs);
		return;
	}

	if (conn->ownerQueue != NULL){
		LeaveCriticalSection(&connCs);
		return;
	}

	if (conn->IsGoodConnection()){
		if (good_connections.empty()){
			good_connections.push_back(conn);
			conn->ownerQueue = &good_connections;
			::WakeConditionVariable(&goodNotEmpty);
		}
		else{
			good_connections.push_back(conn);
			conn->ownerQueue = &good_connections;
		}

		conn->lastUseTick = ::GetTickCount();     //Refresh the lastUseTick
	}
	else{
		if (tmpbad_buf.empty()){
			tmpbad_buf.insert(conn);
			::WakeConditionVariable(&tmpBadBufNotEmpty);
		}
		else{
			tmpbad_buf.insert(conn);
		}
		conn->ownerQueue = &tmpbad_buf;
		conn->failFixCnt = 0x0;
		conn->nextFixPeriod = 0x0;
		conn->lastFixTick = 0x0;
	}

	LeaveCriticalSection(&connCs);
	return;
}


//说明: 本函数返回的是bad_connections中当前最小的修复时间间隔,我们会以该时间间隔来启动下一次修复。这可能使得
//对于有一些连接的修复时间被推迟(这里我们采用指数退避的方式进行重连,这个推迟时间在实际环境中应该不会有太大的
//问题,并且可以避免出现在短时间内因断联而导致的太过频繁的重连发生)
DWORD MySQLConnectionPool::FixBadConnections_unsafe(){
	std::set<MySQLConnection *>::iterator it;
	DWORD nextFixTimeout = INFINITE;
	DWORD currentTick = ::GetTickCount();
	DWORD timeElapse = 0x0;
	

	badcnt = 0;
	for (it = unsafe_bad_connections.begin(); it != unsafe_bad_connections.end();){
		MySQLConnection *pConnection = *it;

		if (currentTick < pConnection->lastFixTick){
			//tickcnt has overflow
			timeElapse = 0xFFFFFFFF - pConnection->lastFixTick + currentTick;
		}
		else{
			timeElapse = currentTick - pConnection->lastFixTick;
		}

		if (pConnection->nextFixPeriod <= timeElapse){
			//should reconnect here
			pConnection->lastFixTick = currentTick;

			if (pConnection->Connect()){
				unsafe_bad_connections.erase(it++);

				EnterCriticalSection(&connCs);
				if (good_connections.empty()){
					good_connections.push_back(pConnection);
					::WakeConditionVariable(&goodNotEmpty);
				}
				else{
					good_connections.push_back(pConnection);
				}
				pConnection->ownerQueue = &good_connections;
				pConnection->conn_status = MYSQL_STATUS_CONNECTED;
				pConnection->lastUseTick = currentTick;           //Refresh the lastUseTick

				pConnection->lastFixTick = 0;
				pConnection->failFixCnt = 0;
				pConnection->nextFixPeriod = 0x0;
				LeaveCriticalSection(&connCs);
			}
			else{
				//才用指数退避的方式进行重连
				pConnection->failFixCnt++;
				if (pConnection->nextFixPeriod == 0){
					pConnection->nextFixPeriod = 200;               
				}
				else{
					pConnection->nextFixPeriod *= 2;
				}

				if (pConnection->nextFixPeriod >= 40000)       //40s
					pConnection->nextFixPeriod = 40000;
				
				if (pConnection->nextFixPeriod < nextFixTimeout){
					nextFixTimeout = pConnection->nextFixPeriod;
				}

				it++;        
			}

		}
		else{
			if (pConnection->nextFixPeriod < nextFixTimeout){
				nextFixTimeout = pConnection->nextFixPeriod;
			}
			it++;
		}
		
	}

	return nextFixTimeout;
}

void MySQLConnectionPool::WaitNewBadConnections_unsafe(DWORD dwMilliseconds){
	std::set<MySQLConnection *>::iterator it;

	EnterCriticalSection(&connCs);
	if (tmpbad_buf.empty()){
		::SleepConditionVariableCS(&tmpBadBufNotEmpty, &connCs, dwMilliseconds);
	}

	/*
	 * 说明:WaitNewBadConnections_unsafe与FixBadConnections_unsafe()工作于同一内部线程,此处将tmpbad_buf中的连接添加到
	 * unsafe_bad_connections队列中是不会出问题的
	*/
	for (it = tmpbad_buf.begin(); it != tmpbad_buf.end(); it++){
		MySQLConnection *pConnection = *it;
		tmpbad_buf.erase(it++);

		pConnection->ownerQueue = &unsafe_bad_connections;
		unsafe_bad_connections.insert(pConnection);
	}
	LeaveCriticalSection(&connCs);
}

DWORD MySQLConnectionPool::PingIdle(){
	std::list<MySQLConnection *>::iterator it;
	bool ret;

	DWORD nextPingPeriod = CONNECTION_PING_PERIOD;
	DWORD currentTick = ::GetTickCount();
	DWORD timeElapse;


	EnterCriticalSection(&connCs);
	for (it = good_connections.begin(); it != good_connections.end(); ){
		MySQLConnection *pConnection = (*it);

		if (currentTick < pConnection->lastUseTick){
			//tick overflow
			timeElapse = 0xFFFFFFFF - pConnection->lastUseTick + currentTick;
		}
		else{
			timeElapse = currentTick - pConnection->lastUseTick;
		}
		if (timeElapse < CONNECTION_PING_PERIOD){
			nextPingPeriod = CONNECTION_PING_PERIOD - timeElapse;
			break;
		}
		ret = pConnection->Ping();
		if (ret == false){
			//find an bad connection
			good_connections.erase(it++);
			pConnection->failFixCnt = 0x0;
			pConnection->nextFixPeriod = 0;
			pConnection->ownerQueue = &tmpbad_buf;

			if (tmpbad_buf.empty()){
				tmpbad_buf.insert(pConnection);
				::WakeConditionVariable(&tmpBadBufNotEmpty);
			}
			else{
				tmpbad_buf.insert(pConnection);
			}
		}
		else{
			it++;
		}
	}

	LeaveCriticalSection(&connCs);

	return nextPingPeriod;
}

MySQLConnectionPool * MySQLConnectionPool::GetInstance(){
	static MySQLConnectionPool thePool;

	return &thePool;
}


   
DWORD WINAPI PoolThreadProc(LPVOID lpParameter){
	MySQLConnectionPool *thePool = (MySQLConnectionPool *)lpParameter;
	DWORD nextWaitTime = INFINITE;
	DWORD currentTick;
	DWORD lastPingTick = ::GetTickCount();
	DWORD nextPingPeriod = CONNECTION_PING_PERIOD;
	DWORD timeElapse;
	

	thePool->InitialConnect();
	nextWaitTime = thePool->FixBadConnections_unsafe();
	while (1){
		if (nextWaitTime > CONNECTION_PING_PERIOD){
			nextWaitTime = CONNECTION_PING_PERIOD;
		}
		else if (nextWaitTime < 100){
			nextWaitTime = 100;      //100ms
		}

		thePool->WaitNewBadConnections_unsafe(nextWaitTime);
		nextWaitTime = thePool->FixBadConnections_unsafe();

		//ping idle check
		currentTick = ::GetTickCount();
		if (currentTick < lastPingTick){
			timeElapse = INFINITE - lastPingTick + currentTick;
		}
		else{
			timeElapse = currentTick - lastPingTick;
		}
		if (timeElapse >= nextPingPeriod){
			nextPingPeriod = thePool->PingIdle();
			lastPingTick = currentTick;
		}
		
		if (nextWaitTime > nextPingPeriod)
			nextWaitTime = nextPingPeriod;
	}
	
}



[参看]: