📄 msdtc_enlist.cpp
字号:
/*------ * Module: msdtc_enlist.cpp * * Description: * This module contains routines related to * the enlistment in MSDTC. * *------- */#ifdef _HANDLE_ENLIST_IN_DTC_#undef _MEMORY_DEBUG_#ifndef _WIN32_WINNT#define _WIN32_WINNT 0x0400#endif /* _WIN32_WINNT */#define WIN32_LEAN_AND_MEAN#include <oleTx2xa.h>#include <XOLEHLP.h>/*#include <Txdtc.h>*/#include "connection.h"/*#define _SLEEP_FOR_TEST_*/#include <stdio.h>#include <string.h>#include <ctype.h>#include <process.h>#include <map>#ifndef WIN32#include <errno.h>#endif /* WIN32 */#include "qresult.h"#include "dlg_specific.h"#include "pgapifunc.h"#include "pgenlist.h"EXTERN_C {HINSTANCE s_hModule; /* Saved module handle. */}/* This is where the Driver Manager attaches to this Driver */BOOL WINAPIDllMain(HANDLE hInst, ULONG ul_reason_for_call, LPVOID lpReserved){ switch (ul_reason_for_call) { case DLL_PROCESS_ATTACH: s_hModule = (HINSTANCE) hInst; /* Save for dialog boxes */ break; } return TRUE;}static class INIT_CRIT{public: CRITICAL_SECTION life_cs; CRITICAL_SECTION map_cs; INIT_CRIT() { InitializeCriticalSection(&life_cs); InitializeCriticalSection(&map_cs); } ~INIT_CRIT() { DeleteCriticalSection(&life_cs); DeleteCriticalSection(&map_cs); }} init_crit;#define LIFELOCK_ACQUIRE EnterCriticalSection(&init_crit.life_cs)#define LIFELOCK_RELEASE LeaveCriticalSection(&init_crit.life_cs)#define MLOCK_ACQUIRE EnterCriticalSection(&init_crit.map_cs)#define MLOCK_RELEASE LeaveCriticalSection(&init_crit.map_cs)static const char *XidToText(const XID &xid, char *rtext){ int glen = xid.gtrid_length, blen = xid.bqual_length; int i, j; for (i = 0, j = 0; i < glen; i++, j += 2) sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); strcat(rtext, "-"); j++; for (; i < glen + blen; i++, j += 2) sprintf(rtext + j, "%02x", (unsigned char) xid.data[i]); return rtext;}static LONG g_cComponents = 0;static LONG g_cServerLocks = 0;//// 埲壓偺ITransactionResourceAsync僆僽僕僃僋僩偼擟堄偺僗儗僢僪偐傜// 帺桼偵傾僋僙僗壜擻側傛偆偵幚憰偡傞丅奺Request偺寢壥傪曉偡偨傔偵// 巊梡偡傞ITransactionEnlistmentAsync僀儞僞乕僼僃僀僗傕偦偺傛偆偵// 幚憰偝傟偰偄傞乮偲巚傢傟傞丄壓婰嶲徠乯偺偱屇傃弌偟偵COM偺傾僷乕// 僩儊儞僩傪堄幆偡傞(CoMarshalInterThreadInterfaceInStream/CoGetIn// terfaceAndReleaseStream傪巊梡偡傞乯昁梫偼側偄丅// 偙偺DLL撪偱巊梡偡傞ITransactionResourceAsync偲ITransactionEnlist// mentAsync偺僀儞僞乕僼僃僀僗億僀儞僞乕偼擟堄偺僗儗僢僪偐傜捈愙巊梡// 偡傞偙偲偑偱偒傞丅//// OLE Transactions Standard//// OLE Transactions is the Microsoft interface standard for transaction// management. Applications use OLE Transactions-compliant interfaces to// initiate, commit, abort, and inquire about transactions. Resource// managers use OLE Transactions-compliant interfaces to enlist in// transactions, to propagate transactions to other resource managers,// to propagate transactions from process to process or from system to// system, and to participate in the two-phase commit protocol.//// The Microsoft DTC system implements most OLE Transactions-compliant// objects, interfaces, and methods. Resource managers that wish to use// OLE Transactions must implement some OLE Transactions-compliant objects,// interfaces, and methods.//// The OLE Transactions specification is based on COM but it differs in the// following respects: //// OLE Transactions objects cannot be created using the COM CoCreate APIs. // References to OLE Transactions objects are always direct. Therefore,// no proxies or stubs are created for inter-apartment, inter-process,// or inter-node calls and OLE Transactions references cannot be marshaled// using standard COM marshaling. // All references to OLE Transactions objects and their sinks are completely// free threaded and cannot rely upon COM concurrency control models.// For example, you cannot pass a reference to an IResourceManagerSink// interface on a single-threaded apartment and expect the callback to occur// only on the same single-threaded apartment. /*#define _LOCK_DEBUG_ */class IAsyncPG : public ITransactionResourceAsync{ friend class AsyncThreads;private: IDtcToXaHelperSinglePipe *helper; DWORD RMCookie; ConnectionClass *conn; ConnectionClass *xaconn; LONG refcnt; CRITICAL_SECTION as_spin; // to make this object Both CRITICAL_SECTION as_exec; // to make this object Both XID xid; bool prepared; HANDLE eThread[3]; HRESULT prepare_result; bool requestAccepted; HRESULT commit_result;#ifdef _LOCK_DEBUG_ int spin_cnt; int cs_cnt;#endif /* _LOCK_DEBUG_ */public: enum { PrepareExec = 0 ,CommitExec ,AbortExec }; ITransactionEnlistmentAsync *enlist; HRESULT STDMETHODCALLTYPE QueryInterface(REFIID iid, void ** ppvObject); ULONG STDMETHODCALLTYPE AddRef(void); ULONG STDMETHODCALLTYPE Release(void); HRESULT STDMETHODCALLTYPE PrepareRequest(BOOL fRetaining, DWORD grfRM, BOOL fWantMoniker, BOOL fSinglePhase); HRESULT STDMETHODCALLTYPE CommitRequest(DWORD grfRM, XACTUOW * pNewUOW); HRESULT STDMETHODCALLTYPE AbortRequest(BOID * pboidReason, BOOL fRetaining, XACTUOW * pNewUOW); HRESULT STDMETHODCALLTYPE TMDown(void); IAsyncPG(); void SetHelper(IDtcToXaHelperSinglePipe *pHelper, DWORD dwRMCookie) {helper = pHelper; RMCookie = dwRMCookie;} HRESULT RequestExec(DWORD type, HRESULT res); HRESULT ReleaseConnection(void); void SetConnection(ConnectionClass *sconn) {SLOCK_ACQUIRE(); conn = sconn; SLOCK_RELEASE();} void SetXid(const XID *ixid) {SLOCK_ACQUIRE(); xid = *ixid; SLOCK_RELEASE();}private: ~IAsyncPG();#ifdef _LOCK_DEBUG_ void SLOCK_ACQUIRE() {forcelog("SLOCK_ACQUIRE %d\n", spin_cnt); EnterCriticalSection(&as_spin); spin_cnt++;} void SLOCK_RELEASE() {forcelog("SLOCK_RELEASE=%d\n", spin_cnt); LeaveCriticalSection(&as_spin); spin_cnt--;}#else void SLOCK_ACQUIRE() {EnterCriticalSection(&as_spin);} void SLOCK_RELEASE() {LeaveCriticalSection(&as_spin);}#endif /* _LOCK_DEBUG_ */ void ELOCK_ACQUIRE() {EnterCriticalSection(&as_exec);} void ELOCK_RELEASE() {LeaveCriticalSection(&as_exec);} ConnectionClass *getLockedXAConn(void); ConnectionClass *generateXAConn(bool spinAcquired); void SetPrepareResult(HRESULT res) {SLOCK_ACQUIRE(); prepared = true; prepare_result = res; SLOCK_RELEASE();} void SetDone(HRESULT); void Reset_eThread(int idx) {SLOCK_ACQUIRE(); eThread[idx] = NULL; SLOCK_RELEASE();} void Wait_pThread(bool slock_hold); void Wait_cThread(bool slock_hold, bool once);};//// For thread control.//class AsyncWait {private: IAsyncPG *obj; DWORD type; int waiting_count;public: AsyncWait(IAsyncPG *async, DWORD itype) : obj(async), type(itype), waiting_count(0) {} AsyncWait(const AsyncWait &a_th) : obj(a_th.obj), type(a_th.type), waiting_count(a_th.waiting_count) {} ~AsyncWait() {} IAsyncPG *GetObj() const {return obj;} DWORD GetType() const {return type;} int WaitCount() const {return waiting_count;} int StartWaiting() {return ++waiting_count;} int StopWaiting() {return --waiting_count;}};//// List of threads invoked from IAsyncPG objects.//class AsyncThreads {private: static std::map <HANDLE, AsyncWait> th_list;public: static void insert(HANDLE, IAsyncPG *, DWORD); static void CleanupThreads(DWORD millisecond); static bool WaitThread(IAsyncPG *, DWORD type, DWORD millisecond);};#define SYNC_AUTOCOMMIT(conn) (SQL_AUTOCOMMIT_OFF != conn->connInfo.autocommit_public ? (conn->transact_status |= CONN_IN_AUTOCOMMIT) : (conn->transact_status &= ~CONN_IN_AUTOCOMMIT))IAsyncPG::IAsyncPG(void) : helper(NULL), RMCookie(0), enlist(NULL), conn(NULL), xaconn(NULL), refcnt(1), prepared(false), requestAccepted(false){ InterlockedIncrement(&g_cComponents); InitializeCriticalSection(&as_spin); InitializeCriticalSection(&as_exec); eThread[0] = eThread[1] = eThread[2] = NULL; memset(&xid, 0, sizeof(xid));#ifdef _LOCK_DEBUG_ spin_cnt = 0; cs_cnt = 0;#endif /* _LOCK_DEBUG_ */}//// invoked from *delete*.// When entered ELOCK -> LIFELOCK -> SLOCK are acquired// and they are released.// IAsyncPG::~IAsyncPG(void){ ConnectionClass *fconn = NULL; if (conn) { conn->asdum = NULL; conn = NULL; } if (xaconn) { fconn = xaconn; xaconn->asdum = NULL; xaconn = NULL; } SLOCK_RELEASE(); LIFELOCK_RELEASE; if (fconn) PGAPI_FreeConnect((HDBC) fconn); DeleteCriticalSection(&as_spin); ELOCK_RELEASE(); DeleteCriticalSection(&as_exec); InterlockedDecrement(&g_cComponents);}HRESULT STDMETHODCALLTYPE IAsyncPG::QueryInterface(REFIID riid, void ** ppvObject){forcelog("%x QueryInterface called\n", this); if (riid == IID_IUnknown || riid == IID_ITransactionResourceAsync) { *ppvObject = this; AddRef(); return S_OK; } *ppvObject = NULL; return E_NOINTERFACE;}//// acquire/releases SLOCK.//ULONG STDMETHODCALLTYPE IAsyncPG::AddRef(void){ mylog("%x->AddRef called\n", this); SLOCK_ACQUIRE(); refcnt++; SLOCK_RELEASE(); return refcnt;}//// acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.//ULONG STDMETHODCALLTYPE IAsyncPG::Release(void){ mylog("%x->Release called refcnt=%d\n", this, refcnt); SLOCK_ACQUIRE(); refcnt--; if (refcnt <= 0) { SLOCK_RELEASE(); ELOCK_ACQUIRE(); LIFELOCK_ACQUIRE; SLOCK_ACQUIRE(); if (refcnt <=0) { mylog("delete %x\n", this); delete this; } else { SLOCK_RELEASE(); LIFELOCK_RELEASE; ELOCK_RELEASE(); } } else SLOCK_RELEASE(); return refcnt;}//// Acquire/release [MLOCK -> ] SLOCK.//void IAsyncPG::Wait_pThread(bool slock_hold){ mylog("Wait_pThread %d in\n", slock_hold); HANDLE wThread; int wait_idx = PrepareExec; bool th_found; if (!slock_hold) SLOCK_ACQUIRE(); while (NULL != eThread[wait_idx]) { wThread = eThread[wait_idx]; SLOCK_RELEASE(); th_found = AsyncThreads::WaitThread(this, wait_idx, 2000); SLOCK_ACQUIRE(); if (th_found) break; } if (!slock_hold) SLOCK_RELEASE(); mylog("Wait_pThread out\n");}//// Acquire/releases [MLOCK -> ] SLOCK.//void IAsyncPG::Wait_cThread(bool slock_hold, bool once){ HANDLE wThread; int wait_idx; bool th_found; mylog("Wait_cThread %d,%d in\n", slock_hold, once); if (!slock_hold) SLOCK_ACQUIRE(); if (NULL != eThread[CommitExec]) wait_idx = CommitExec; else wait_idx = AbortExec; while (NULL != eThread[wait_idx]) { wThread = eThread[wait_idx]; SLOCK_RELEASE(); th_found = AsyncThreads::WaitThread(this, wait_idx, 2000); SLOCK_ACQUIRE(); if (once || th_found) break; } if (!slock_hold) SLOCK_RELEASE(); mylog("Wait_cThread out\n");}/* Processing Prepare/Commit Request */typedefstruct RequestPara { DWORD type; LPVOID lpr; HRESULT res;} RequestPara;//// Acquire/releases LIFELOCK -> SLOCK.// may acquire/release ELOCK.//void IAsyncPG::SetDone(HRESULT res){ LIFELOCK_ACQUIRE; SLOCK_ACQUIRE(); prepared = false; requestAccepted = true; commit_result = res; if (conn || xaconn) { if (conn) { conn->asdum = NULL; SYNC_AUTOCOMMIT(conn); conn = NULL; } SLOCK_RELEASE(); LIFELOCK_RELEASE; ELOCK_ACQUIRE(); if (xaconn) { xaconn->asdum = NULL; PGAPI_FreeConnect(xaconn); xaconn = NULL; } ELOCK_RELEASE(); } else { SLOCK_RELEASE(); LIFELOCK_RELEASE; }} //// Acquire/releases [ELOCK -> LIFELOCK -> ] SLOCK.//ConnectionClass *IAsyncPG::generateXAConn(bool spinAcquired){ if (!spinAcquired) SLOCK_ACQUIRE(); if (prepared && !xaconn) { SLOCK_RELEASE(); ELOCK_ACQUIRE(); LIFELOCK_ACQUIRE; SLOCK_ACQUIRE(); if (prepared && !xaconn) { PGAPI_AllocConnect(conn->henv, (HDBC *) &xaconn); memcpy(&xaconn->connInfo, &conn->connInfo, sizeof(ConnInfo)); conn->asdum = NULL; SYNC_AUTOCOMMIT(conn); conn = NULL; SLOCK_RELEASE(); LIFELOCK_RELEASE; CC_connect(xaconn, AUTH_REQ_OK, NULL); } else { SLOCK_RELEASE(); LIFELOCK_RELEASE; } ELOCK_RELEASE(); } else SLOCK_RELEASE(); return xaconn;}//// [when entered]// ELOCK is acquired.//// Acquire/releases SLOCK.// Try to acquire CONNLOCK also.//// [on exit]// ELOCK is kept acquired.// If the return connection != NULL// the CONNLOCK for the connection is acquired.// ConnectionClass *IAsyncPG::getLockedXAConn(){ SLOCK_ACQUIRE(); if (!xaconn && conn && !CC_is_in_trans(conn)) { if (TRY_ENTER_CONN_CS(conn)) { if (CC_is_in_trans(conn)) { LEAVE_CONN_CS(conn); } else { SLOCK_RELEASE(); return conn; } } } generateXAConn(true); if (xaconn) ENTER_CONN_CS(xaconn); return xaconn;}//// Acquire/release ELOCK [ -> MLOCK] -> SLOCK.//HRESULT IAsyncPG::RequestExec(DWORD type, HRESULT res){ HRESULT ret; bool bReleaseEnlist = false; ConnectionClass *econn; QResultClass *qres; char pgxid[258], cmd[512]; mylog("%x->RequestExec type=%d\n", this, type); XidToText(xid, pgxid);#ifdef _SLEEP_FOR_TEST_ /*Sleep(2000);*/#endif /* _SLEEP_FOR_TEST_ */ ELOCK_ACQUIRE(); switch (type) { case PrepareExec: if (XACT_S_SINGLEPHASE == res) { if (!CC_commit(conn)) res = E_FAIL; bReleaseEnlist = true; } else if (E_FAIL != res) { snprintf(cmd, sizeof(cmd), "PREPARE TRANSACTION '%s'", pgxid); qres = CC_send_query(conn, cmd, NULL, 0, NULL); if (!QR_command_maybe_successful(qres)) res = E_FAIL; QR_Destructor(qres); } ret = enlist->PrepareRequestDone(res, NULL, NULL); SetPrepareResult(res); break; case CommitExec: Wait_pThread(false); if (E_FAIL != res) { econn = getLockedXAConn();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -