⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msdtc_enlist.cpp

📁 postgresql-odbc,跨平台应用
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/*------ * Module:			msdtc_enlist.cpp * * Description: *		This module contains routines related to *			the enlistment in MSDTC. * *------- */#ifdef	_HANDLE_ENLIST_IN_DTC_#undef	_MEMORY_DEBUG_#ifndef	_WIN32_WINNT#define	_WIN32_WINNT	0x0400#endif	/* _WIN32_WINNT */#define	WIN32_LEAN_AND_MEAN#include <oleTx2xa.h>#include <XOLEHLP.h>/*#include <Txdtc.h>*/#include "connection.h"/*#define	_SLEEP_FOR_TEST_*/#include <stdio.h>#include <string.h>#include <ctype.h>#include <process.h>#include <map>#ifndef	WIN32#include <errno.h>#endif /* WIN32 */#include "qresult.h"#include "dlg_specific.h"#include "pgapifunc.h"#include "pgenlist.h"EXTERN_C {HINSTANCE s_hModule;               /* Saved module handle. */}/*      This is where the Driver Manager attaches to this Driver */BOOL    WINAPIDllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved){        switch (ul_reason_for_call)        {                case DLL_PROCESS_ATTACH:                        s_hModule = (HINSTANCE) hInst;  /* Save for dialog boxes */			break;	}	return TRUE;}static class INIT_CRIT{public:	CRITICAL_SECTION	life_cs;	CRITICAL_SECTION	map_cs;	INIT_CRIT() {		InitializeCriticalSection(&life_cs);		InitializeCriticalSection(&map_cs);		}	~INIT_CRIT() {			DeleteCriticalSection(&life_cs);			DeleteCriticalSection(&map_cs);			}} init_crit;#define	LIFELOCK_ACQUIRE EnterCriticalSection(&init_crit.life_cs)#define	LIFELOCK_RELEASE LeaveCriticalSection(&init_crit.life_cs)#define	MLOCK_ACQUIRE	EnterCriticalSection(&init_crit.map_cs)#define	MLOCK_RELEASE	LeaveCriticalSection(&init_crit.map_cs)static const char *XidToText(const XID &xid, char *rtext){	int	glen = xid.gtrid_length, blen = xid.bqual_length;	int	i, j;	for (i = 0, j = 0; i < glen; i++, j += 2)		sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]);	strcat(rtext, "-"); j++;	for (; i < glen + blen; i++, j += 2)		sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); 	return rtext;}static LONG	g_cComponents = 0;static LONG	g_cServerLocks = 0;////	埲壓偺ITransactionResourceAsync僆僽僕僃僋僩偼擟堄偺僗儗僢僪偐傜//	帺桼偵傾僋僙僗壜擻側傛偆偵幚憰偡傞丅奺Request偺寢壥傪曉偡偨傔偵//	巊梡偡傞ITransactionEnlistmentAsync僀儞僞乕僼僃僀僗傕偦偺傛偆偵//	幚憰偝傟偰偄傞乮偲巚傢傟傞丄壓婰嶲徠乯偺偱屇傃弌偟偵COM偺傾僷乕//	僩儊儞僩傪堄幆偡傞(CoMarshalInterThreadInterfaceInStream/CoGetIn//	terfaceAndReleaseStream傪巊梡偡傞乯昁梫偼側偄丅//	偙偺DLL撪偱巊梡偡傞ITransactionResourceAsync偲ITransactionEnlist//	mentAsync偺僀儞僞乕僼僃僀僗億僀儞僞乕偼擟堄偺僗儗僢僪偐傜捈愙巊梡//	偡傞偙偲偑偱偒傞丅//// OLE Transactions Standard//// OLE Transactions is the Microsoft interface standard for transaction// management. Applications use OLE Transactions-compliant interfaces to// initiate, commit, abort, and inquire about transactions. Resource// managers use OLE Transactions-compliant interfaces to enlist in// transactions, to propagate transactions to other resource managers,// to propagate transactions from process to process or from system to// system, and to participate in the two-phase commit protocol.//// The Microsoft DTC system implements most OLE Transactions-compliant// objects, interfaces, and methods. Resource managers that wish to use// OLE Transactions must implement some OLE Transactions-compliant objects,// interfaces, and methods.//// The OLE Transactions specification is based on COM but it differs in the// following respects: //// OLE Transactions objects cannot be created using the COM CoCreate APIs. // References to OLE Transactions objects are always direct. Therefore,// no proxies or stubs are created for inter-apartment, inter-process,// or inter-node calls and OLE Transactions references cannot be marshaled// using standard COM marshaling. // All references to OLE Transactions objects and their sinks are completely// free threaded and cannot rely upon COM concurrency control models.// For example, you cannot pass a reference to an IResourceManagerSink// interface on a single-threaded apartment and expect the callback to occur// only on the same single-threaded apartment. /*#define	_LOCK_DEBUG_ */class	IAsyncPG : public ITransactionResourceAsync{	friend class	AsyncThreads;private:	IDtcToXaHelperSinglePipe	*helper;	DWORD				RMCookie;	ConnectionClass			*conn;	ConnectionClass			*xaconn;	LONG				refcnt;	CRITICAL_SECTION		as_spin; // to make this object Both	CRITICAL_SECTION		as_exec; // to make this object Both	XID				xid;	bool				prepared;	HANDLE				eThread[3];	HRESULT				prepare_result;	bool				requestAccepted;	HRESULT				commit_result;#ifdef	_LOCK_DEBUG_	int				spin_cnt;	int				cs_cnt;#endif /* _LOCK_DEBUG_ */public:	enum {		PrepareExec = 0		,CommitExec		,AbortExec		};	ITransactionEnlistmentAsync	*enlist;	HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject);	ULONG	STDMETHODCALLTYPE AddRef(void); 	ULONG	STDMETHODCALLTYPE Release(void); 	HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining,				DWORD grfRM,				BOOL fWantMoniker,				BOOL fSinglePhase);	HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW);	HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason,				BOOL fRetaining,				XACTUOW * pNewUOW);	HRESULT STDMETHODCALLTYPE TMDown(void);	IAsyncPG();	void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;} 	 	HRESULT RequestExec(DWORD type, HRESULT res);	HRESULT ReleaseConnection(void);	void SetConnection(ConnectionClass *sconn) {SLOCK_ACQUIRE(); conn = sconn; SLOCK_RELEASE();}	void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();}private:	~IAsyncPG();#ifdef	_LOCK_DEBUG_	void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;}	void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;}#else	void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);}	void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);}#endif /* _LOCK_DEBUG_ */	void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);}	void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);}	ConnectionClass	*getLockedXAConn(void);	ConnectionClass	*generateXAConn(bool spinAcquired);	void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();} 	void SetDone(HRESULT);	void Reset_eThread(int idx) {SLOCK_ACQUIRE(); eThread[idx] = NULL; SLOCK_RELEASE();}	void Wait_pThread(bool slock_hold);	void Wait_cThread(bool slock_hold, bool once);};////	For thread control.//class	AsyncWait {private:	IAsyncPG	*obj;	DWORD		type;	int		waiting_count;public:	AsyncWait(IAsyncPG *async, DWORD itype) : obj(async), type(itype), waiting_count(0) {}	AsyncWait(const AsyncWait &a_th) : obj(a_th.obj), type(a_th.type), waiting_count(a_th.waiting_count) {}	~AsyncWait()	{}	IAsyncPG *GetObj()  const {return obj;}	DWORD	GetType()  const {return type;}	int	WaitCount()  const {return waiting_count;}	int	StartWaiting() {return ++waiting_count;}	int	StopWaiting() {return --waiting_count;}};////	List of threads invoked from IAsyncPG objects.//class	AsyncThreads {private:	static std::map <HANDLE, AsyncWait>	th_list;public:	static void insert(HANDLE, IAsyncPG *, DWORD);	static void CleanupThreads(DWORD millisecond);	static bool WaitThread(IAsyncPG *, DWORD type, DWORD millisecond);};#define	SYNC_AUTOCOMMIT(conn)	(SQL_AUTOCOMMIT_OFF != conn->connInfo.autocommit_public ? (conn->transact_status |= CONN_IN_AUTOCOMMIT) : (conn->transact_status &= ~CONN_IN_AUTOCOMMIT))IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), conn(NULL), xaconn(NULL), refcnt(1), prepared(false), requestAccepted(false){	InterlockedIncrement(&g_cComponents);	InitializeCriticalSection(&as_spin);	InitializeCriticalSection(&as_exec);	eThread[0] = eThread[1] = eThread[2] = NULL;	memset(&xid, 0, sizeof(xid));#ifdef	_LOCK_DEBUG_	spin_cnt = 0;	cs_cnt = 0;#endif /* _LOCK_DEBUG_ */}////	invoked from *delete*.//	When entered ELOCK -> LIFELOCK -> SLOCK are acquired//	and they are released.//	IAsyncPG::~IAsyncPG(void){	ConnectionClass *fconn = NULL;	if (conn)	{		conn->asdum = NULL;		conn = NULL;	}	if (xaconn)	{		fconn = xaconn;		xaconn->asdum = NULL;		xaconn = NULL;	}	SLOCK_RELEASE();	LIFELOCK_RELEASE;	if (fconn)		PGAPI_FreeConnect((HDBC) fconn);	DeleteCriticalSection(&as_spin);	ELOCK_RELEASE();	DeleteCriticalSection(&as_exec);	InterlockedDecrement(&g_cComponents);}HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject){forcelog("%x QueryInterface called\n", this);	if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync)	{		*ppvObject = this;		AddRef();		return S_OK;	}	*ppvObject = NULL;	return E_NOINTERFACE;}////	acquire/releases SLOCK.//ULONG	STDMETHODCALLTYPE IAsyncPG::AddRef(void){	mylog("%x->AddRef called\n", this);	SLOCK_ACQUIRE();	refcnt++;	SLOCK_RELEASE();	return refcnt;}////	acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.//ULONG	STDMETHODCALLTYPE IAsyncPG::Release(void){	mylog("%x->Release called refcnt=%d\n", this, refcnt);	SLOCK_ACQUIRE();	refcnt--;	if (refcnt <= 0)	{		SLOCK_RELEASE();		ELOCK_ACQUIRE();		LIFELOCK_ACQUIRE;		SLOCK_ACQUIRE();		if (refcnt <=0)		{	mylog("delete %x\n", this);			delete this;		}		else		{			SLOCK_RELEASE();			LIFELOCK_RELEASE;			ELOCK_RELEASE();		}	}	else		SLOCK_RELEASE();	return refcnt;}////	Acquire/release [MLOCK -> ] SLOCK.//void IAsyncPG::Wait_pThread(bool slock_hold){	mylog("Wait_pThread %d in\n", slock_hold);	HANDLE	wThread;	int	wait_idx = PrepareExec;	bool	th_found;	if (!slock_hold)		SLOCK_ACQUIRE();	while (NULL != eThread[wait_idx])	{		wThread = eThread[wait_idx];		SLOCK_RELEASE();		th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);		SLOCK_ACQUIRE();		if (th_found)			break;	}	if (!slock_hold)		SLOCK_RELEASE();	mylog("Wait_pThread out\n");}////	Acquire/releases [MLOCK -> ] SLOCK.//void IAsyncPG::Wait_cThread(bool slock_hold, bool once){	HANDLE	wThread;	int	wait_idx;	bool	th_found;	mylog("Wait_cThread %d,%d in\n", slock_hold, once);	if (!slock_hold)		SLOCK_ACQUIRE();	if (NULL != eThread[CommitExec])		wait_idx = CommitExec;	else		wait_idx = AbortExec;	while (NULL != eThread[wait_idx])	{		wThread = eThread[wait_idx];		SLOCK_RELEASE();		th_found = AsyncThreads::WaitThread(this, wait_idx, 2000);		SLOCK_ACQUIRE();		if (once || th_found)			break;	}	if (!slock_hold)		SLOCK_RELEASE();	mylog("Wait_cThread out\n");}/* Processing Prepare/Commit Request */typedefstruct RequestPara {	DWORD	type;	LPVOID	lpr;	HRESULT	res;} RequestPara;////	Acquire/releases LIFELOCK -> SLOCK.//	may acquire/release ELOCK.//void	IAsyncPG::SetDone(HRESULT res){	LIFELOCK_ACQUIRE;	SLOCK_ACQUIRE();	prepared = false;	requestAccepted = true;	commit_result = res;	if (conn || xaconn)	{		if (conn)		{			conn->asdum = NULL;			SYNC_AUTOCOMMIT(conn);			conn = NULL;		}		SLOCK_RELEASE();		LIFELOCK_RELEASE;		ELOCK_ACQUIRE();		if (xaconn)		{			xaconn->asdum = NULL;			PGAPI_FreeConnect(xaconn);			xaconn = NULL;		}		ELOCK_RELEASE();	}	else	{		SLOCK_RELEASE();		LIFELOCK_RELEASE;	}}	////	Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.//ConnectionClass	*IAsyncPG::generateXAConn(bool spinAcquired){	if (!spinAcquired)		SLOCK_ACQUIRE();	if (prepared && !xaconn)	{		SLOCK_RELEASE();		ELOCK_ACQUIRE();		LIFELOCK_ACQUIRE;		SLOCK_ACQUIRE();		if (prepared && !xaconn)		{			PGAPI_AllocConnect(conn->henv, (HDBC *) &xaconn);			memcpy(&xaconn->connInfo, &conn->connInfo, sizeof(ConnInfo));			conn->asdum = NULL;			SYNC_AUTOCOMMIT(conn);			conn = NULL;			SLOCK_RELEASE();			LIFELOCK_RELEASE;			CC_connect(xaconn, AUTH_REQ_OK, NULL);		}		else		{			SLOCK_RELEASE();			LIFELOCK_RELEASE;		}		ELOCK_RELEASE();	}	else		SLOCK_RELEASE();	return xaconn;}////	[when entered]//	ELOCK is acquired.////	Acquire/releases SLOCK.//	Try to acquire CONNLOCK also.////	[on exit]//	ELOCK is kept acquired.//	If the return connection != NULL//		the CONNLOCK for the connection is acquired.//	ConnectionClass	*IAsyncPG::getLockedXAConn(){	SLOCK_ACQUIRE();	if (!xaconn && conn && !CC_is_in_trans(conn))	{		if (TRY_ENTER_CONN_CS(conn))		{			if (CC_is_in_trans(conn))			{				LEAVE_CONN_CS(conn);			}			else			{				SLOCK_RELEASE();				return conn;			}		}	}	generateXAConn(true);	if (xaconn)		ENTER_CONN_CS(xaconn);	return xaconn;}////	Acquire/release ELOCK [ -> MLOCK] -> SLOCK.//HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res){	HRESULT		ret;	bool		bReleaseEnlist = false;	ConnectionClass	*econn;	QResultClass	*qres;	char		pgxid[258], cmd[512];	mylog("%x->RequestExec type=%d\n", this, type);	XidToText(xid, pgxid);#ifdef	_SLEEP_FOR_TEST_	/*Sleep(2000);*/#endif	/* _SLEEP_FOR_TEST_ */	ELOCK_ACQUIRE();	switch (type)	{		case PrepareExec:			if (XACT_S_SINGLEPHASE == res)			{				if (!CC_commit(conn))					res = E_FAIL;					bReleaseEnlist = true;			}			else if (E_FAIL != res)			{				snprintf(cmd, sizeof(cmd), "PREPARE TRANSACTION '%s'", pgxid);				qres = CC_send_query(conn, cmd, NULL, 0, NULL);				if (!QR_command_maybe_successful(qres))					res = E_FAIL;				QR_Destructor(qres);			}			ret = enlist->PrepareRequestDone(res, NULL, NULL);			SetPrepareResult(res);			break;		case CommitExec:			Wait_pThread(false);			if (E_FAIL != res)			{				econn = getLockedXAConn();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -