⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msdtc_enlist.cpp

📁 postgresql-odbc,跨平台应用
💻 CPP
📖 第 1 页 / 共 2 页
字号:
				if (econn)				{					snprintf(cmd, sizeof(cmd), "COMMIT PREPARED '%s'", pgxid);					qres = CC_send_query(econn, cmd, NULL, 0, NULL);					if (!QR_command_maybe_successful(qres))						res = E_FAIL;					QR_Destructor(qres);					LEAVE_CONN_CS(econn);				}			}			SetDone(res);			ret = enlist->CommitRequestDone(res);			bReleaseEnlist = true;			break;		case AbortExec:			Wait_pThread(false);			if (prepared)			{				econn = getLockedXAConn();				if (econn)				{					snprintf(cmd, sizeof(cmd), "ROLLBACK PREPARED '%s'", pgxid);					qres = CC_send_query(econn, cmd, NULL, 0, NULL);					if (!QR_command_maybe_successful(qres))						res = E_FAIL;					QR_Destructor(qres);					LEAVE_CONN_CS(econn);				}			}			SetDone(res);			ret = enlist->AbortRequestDone(res);			bReleaseEnlist = true;			break;		default:			ret = -1;	}	if (bReleaseEnlist)	{		helper->ReleaseRMCookie(RMCookie, TRUE);		enlist->Release();	}	ELOCK_RELEASE();	mylog("%x->Done ret=%d\n", this, ret);	return ret;}////	Acquire/releses [MLOCK -> ] SLOCK//	 	or 	[ELOCK -> LIFELOCK -> ] SLOCK.//HRESULT IAsyncPG::ReleaseConnection(void){	mylog("%x->ReleaseConnection\n", this);	ConnectionClass	*iconn;	bool	done = false;	SLOCK_ACQUIRE();	if (iconn = conn)	{		Wait_pThread(true);		if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted)		{			if (prepared)			{				Wait_cThread(true, true);				if (!prepared)					done = true;			}			else				done = true;			if (done)				Wait_cThread(true, false);		}		if (conn && CONN_CONNECTED == conn->status && !done)		{			generateXAConn(true);		}		else			SLOCK_RELEASE();	}	else		SLOCK_RELEASE();	mylog("%x->ReleaseConnection exit\n", this);	return SQL_SUCCESS;}////	Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para);HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM,				BOOL fWantMoniker, BOOL fSinglePhase){	HRESULT	ret, res;	RequestPara	*reqp;	mylog("%x PrepareRequest called grhRM=%d enl=%x\n", this, grfRM, enlist);	SLOCK_ACQUIRE();	if (0 != CC_get_errornumber(conn))		res = ret = E_FAIL;	else	{		ret = S_OK;		if (fSinglePhase)		{			res = XACT_S_SINGLEPHASE;			mylog("XACT is singlePhase\n");		}		else			res = S_OK; 	}	SLOCK_RELEASE();	ELOCK_ACQUIRE();#ifdef	_SLEEP_FOR_TEST_	Sleep(2000);#endif	/* _SLEEP_FOR_TEST_ */	reqp = new RequestPara;	reqp->type = PrepareExec;	reqp->lpr = (LPVOID) this;	reqp->res = res;	AddRef();	HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);	if (NULL == hThread)	{		delete(reqp);		ret = E_FAIL;	}	else	{		AsyncThreads::insert(hThread, this, reqp->type);	}	ELOCK_RELEASE();	Release();	return ret;}////	Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW){	HRESULT		res = S_OK, ret = S_OK;	RequestPara	*reqp;	mylog("%x CommitRequest called grfRM=%d enl=%x\n", this, grfRM, enlist);	SLOCK_ACQUIRE();	if (!prepared)		ret = E_UNEXPECTED;	else if (S_OK != prepare_result)		ret = E_UNEXPECTED;	SLOCK_RELEASE();	if (S_OK != ret)		return ret;	AddRef();	ELOCK_ACQUIRE();#ifdef	_SLEEP_FOR_TEST_	Sleep(1000);#endif	/* _SLEEP_FOR_TEST_ */	reqp = new RequestPara;	reqp->type = CommitExec;	reqp->lpr = (LPVOID) this;	reqp->res = res;	enlist->AddRef();	HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);	if (NULL == hThread)	{		delete(reqp);		enlist->Release();		ret = E_FAIL;	}	else	{		AsyncThreads::insert(hThread, this, reqp->type);	}	mylog("CommitRequest ret=%d\n", ret);	requestAccepted = true;	ELOCK_RELEASE();	Release();	return ret;}////	Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining,				XACTUOW * pNewUOW){	HRESULT		res = S_OK, ret = S_OK;	RequestPara	*reqp;	mylog("%x AbortRequest called\n", this);	AddRef();	ELOCK_ACQUIRE();	if (!prepared && conn)		CC_abort(conn);	reqp = new RequestPara;	reqp->type = AbortExec;	reqp->lpr = (LPVOID) this;	reqp->res = res;	enlist->AddRef();	HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL);	if (NULL == hThread)	{		delete(reqp);		enlist->Release();		ret = E_FAIL;	}	else	{		AsyncThreads::insert(hThread, this, reqp->type);	}	mylog("AbortRequest ret=%d\n", ret);	requestAccepted = true;	ELOCK_RELEASE();	Release();	return	ret;}HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void){forcelog("%x TMDown called\n", this);	return	S_OK;}////	Acquire/releases MLOCK -> SLOCK.//std::map<HANDLE, AsyncWait>	AsyncThreads::th_list;void	AsyncThreads::insert(HANDLE th, IAsyncPG *obj, DWORD type){	if (!obj)	return;	MLOCK_ACQUIRE;	th_list.insert(std::pair<HANDLE, AsyncWait>(th, AsyncWait(obj, type)));	obj->SLOCK_ACQUIRE();	obj->eThread[type] = th;	obj->SLOCK_RELEASE();	MLOCK_RELEASE;}////	Acquire/releases MLOCK -> SLOCK.//bool	AsyncThreads::WaitThread(IAsyncPG *obj, DWORD type, DWORD millisecond){	HANDLE	th = NULL;	DWORD	gtype;	bool	typematch;	int	wait_count;	MLOCK_ACQUIRE;	std::map<HANDLE, AsyncWait>::iterator p;	for (p = th_list.begin(); p != th_list.end(); p++)	{		gtype = p->second.GetType();		typematch = (gtype == type);		if (p->second.GetObj() == obj && typematch)		{			th = p->first;			break;		}	}	if (NULL == th)	{		MLOCK_RELEASE;		forcelog("WaitThread thread(%x, %d) not found\n", obj, type);		return false;	}	p->second.StartWaiting();	MLOCK_RELEASE;		DWORD ret = WaitForSingleObject(th, millisecond);	MLOCK_ACQUIRE;	wait_count = p->second.StopWaiting();	if (WAIT_OBJECT_0 == ret)	{		IAsyncPG *async = p->second.GetObj();		if (type >= 0 && type <= IAsyncPG::AbortExec)			async->Reset_eThread(type);		if (wait_count <= 0)		{			th_list.erase(th);			MLOCK_RELEASE;			CloseHandle(th);			if (type >= IAsyncPG::CommitExec)			{				async->Release();			}		}		else			MLOCK_RELEASE;	}	else		MLOCK_RELEASE;	return true;}void	AsyncThreads::CleanupThreads(DWORD millisecond){	int	msize;	DWORD	nCount;	MLOCK_ACQUIRE;	if (msize = th_list.size(), msize <= 0)	{		MLOCK_RELEASE;		return;	}	mylog("CleanupThreads size=%d\n", msize);	HANDLE	*hds = new HANDLE[msize];	std::map<HANDLE, AsyncWait>::iterator p;	for (p = th_list.begin(), nCount = 0; p != th_list.end(); p++)	{		hds[nCount++] = p->first;		p->second.StartWaiting();	}	MLOCK_RELEASE;	int	i;	while (nCount > 0)	{		DWORD ret = WaitForMultipleObjects(nCount, hds, 0, millisecond);		if (ret >= nCount)			break;		HANDLE	th = hds[ret];		MLOCK_ACQUIRE;		p = th_list.find(th);		if (p != th_list.end())		{			int wait_count = p->second.StopWaiting();			DWORD	type = p->second.GetType();			IAsyncPG * async = p->second.GetObj();			if (type >= IAsyncPG::PrepareExec && type <= IAsyncPG::AbortExec)				async->Reset_eThread(type);			if (wait_count <= 0)			{				th_list.erase(th);				MLOCK_RELEASE;				CloseHandle(th);				if (type >= IAsyncPG::CommitExec)				{					async->Release();				}			}			else				MLOCK_RELEASE;		}		else			MLOCK_RELEASE;		for (i = ret; i < (int) nCount - 1; i++)			hds[i] = hds[i + 1];		nCount--;	}	for (i = 0; i < (int) nCount; i++)	{		p = th_list.find(hds[i]);		if (p != th_list.end())			p->second.StopWaiting();	}	delete [] hds;}EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para){	RequestPara	*reqp = (RequestPara *) para;	DWORD		type = reqp->type;	IAsyncPG *async = (IAsyncPG *) reqp->lpr;	HRESULT	res = reqp->res, ret;	mylog("DtcRequestExec type=%d", reqp->type);	delete(reqp);	ret = async->RequestExec(type, res);	mylog(" Done ret=%d\n", ret);	return ret;}CSTR	regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL";RETCODE static EnlistInDtc_1pipe(ConnectionClass *conn, ITransaction *pTra, ITransactionDispenser *pDtc){	CSTR	func = "EnlistInDtc_1pipe";	static	IDtcToXaHelperSinglePipe	*pHelper = NULL;	ITransactionResourceAsync		*pRes = NULL;	IAsyncPG				*asdum;	HRESULT	res;	bool	retry, errset;	DWORD	dwRMCookie;	XID	xid;	if (!pHelper)	{		res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper);		if (res != S_OK || !pHelper)		{			forcelog("DtcToXaHelperSingelPipe get error %d\n", res);			pHelper = NULL;			return SQL_ERROR;		}	}	res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL;	if (S_OK != res)	{		mylog("CoCreateInstance error %d\n", res);		return SQL_ERROR;	}mylog("dllname=%s dsn=%s\n", GetXaLibName(), conn->connInfo.dsn); res = 0;	retry = false;	errset = false;	ConnInfo *ci = &(conn->connInfo);	char	dtcname[1024];	snprintf(dtcname, sizeof(dtcname), "DRIVER={%s};SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;" ABBR_SSLMODE "=%s", 		ci->drivername, ci->server, ci->port, ci->database, ci->username, ci->password, ci->sslmode);	do { 		res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie);		if (S_OK == res)			break;		mylog("XARMCreate error code=%x\n", res);		if (XACT_E_XA_TX_DISABLED == res)		{			CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func);			errset = true;		}		else if (!retry)		{			LONG	ret;			HKEY	sKey;			DWORD	rSize;			ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey);			if (ERROR_SUCCESS != ret)				ret = ::RegCreateKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, NULL, REG_OPTION_NON_VOLATILE, KEY_ALL_ACCESS, NULL, &sKey, NULL);			if (ERROR_SUCCESS == ret)			{				switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize))				{					case ERROR_SUCCESS:						if (rSize > 0)							break;					default:						ret = ::RegSetValueEx(sKey, GetXaLibName(), 0, REG_SZ,							(CONST BYTE *) GetXaLibPath(), strlen(GetXaLibPath()) + 1);						if (ERROR_SUCCESS == ret)						{							retry = true;							continue; // retry						}						CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func);						break;				}				::RegCloseKey(sKey);						}		}		if (!errset)			CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "MSDTC XARMCreate error", func);		return SQL_ERROR;	} while (1);	res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid);	if (res != S_OK)	{		mylog("ConvertTridToXid error %d\n", res);		return SQL_ERROR;	}{char	pgxid[258];XidToText(xid, pgxid);mylog("ConvertTridToXID -> %s\n", pgxid);}	asdum->SetXid(&xid);	/* Create an IAsyncPG instance by myself */	/* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */	asdum->SetHelper(pHelper, dwRMCookie);	res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist);	if (res != S_OK)	{		mylog("EnlistWithRM error %d\n", res);		pHelper->ReleaseRMCookie(dwRMCookie, TRUE);		return SQL_ERROR;	}	mylog("asdum=%p start transaction\n", asdum);	CC_set_autocommit(conn, FALSE);	asdum->SetConnection(conn);	conn->asdum = asdum;	return 	SQL_SUCCESS;}EXTERN_C RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method){	static	ITransactionDispenser	*pDtc = NULL;	if (!pTra)	{		IAsyncPG *asdum = (IAsyncPG *) conn->asdum;		if (asdum)		{			/* asdum->Release(); */		}		else			SYNC_AUTOCOMMIT(conn);		return SQL_SUCCESS;	}	if (CC_is_in_trans(conn))	{ 		CC_abort(conn);	}	if (!pDtc)	{		HRESULT	res;		res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser,			0, 0, NULL,  (void **) &pDtc);		if (res != S_OK || !pDtc)		{			forcelog("TransactionManager get error %d\n", res);			pDtc = NULL;		}	}	return EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc);}EXTERN_C RETCODE DtcOnDisconnect(ConnectionClass *conn){	mylog("DtcOnDisconnect\n");	LIFELOCK_ACQUIRE;	IAsyncPG *asdum = (IAsyncPG *) conn->asdum;	if (asdum)	{		asdum->AddRef();		LIFELOCK_RELEASE;		asdum->ReleaseConnection();		asdum->Release();	}	else			LIFELOCK_RELEASE;	return SQL_SUCCESS;}EXTERN_C RETCODE DtcOnRelease(void){	AsyncThreads::CleanupThreads(2000);	return SQL_SUCCESS;}#endif /* _HANDLE_ENLIST_IN_DTC_ */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -