📄 msdtc_enlist.cpp
字号:
if (econn) { snprintf(cmd, sizeof(cmd), "COMMIT PREPARED '%s'", pgxid); qres = CC_send_query(econn, cmd, NULL, 0, NULL); if (!QR_command_maybe_successful(qres)) res = E_FAIL; QR_Destructor(qres); LEAVE_CONN_CS(econn); } } SetDone(res); ret = enlist->CommitRequestDone(res); bReleaseEnlist = true; break; case AbortExec: Wait_pThread(false); if (prepared) { econn = getLockedXAConn(); if (econn) { snprintf(cmd, sizeof(cmd), "ROLLBACK PREPARED '%s'", pgxid); qres = CC_send_query(econn, cmd, NULL, 0, NULL); if (!QR_command_maybe_successful(qres)) res = E_FAIL; QR_Destructor(qres); LEAVE_CONN_CS(econn); } } SetDone(res); ret = enlist->AbortRequestDone(res); bReleaseEnlist = true; break; default: ret = -1; } if (bReleaseEnlist) { helper->ReleaseRMCookie(RMCookie, TRUE); enlist->Release(); } ELOCK_RELEASE(); mylog("%x->Done ret=%d\n", this, ret); return ret;}//// Acquire/releses [MLOCK -> ] SLOCK// or [ELOCK -> LIFELOCK -> ] SLOCK.//HRESULT IAsyncPG::ReleaseConnection(void){ mylog("%x->ReleaseConnection\n", this); ConnectionClass *iconn; bool done = false; SLOCK_ACQUIRE(); if (iconn = conn) { Wait_pThread(true); if (NULL != eThread[CommitExec] || NULL != eThread[AbortExec] || requestAccepted) { if (prepared) { Wait_cThread(true, true); if (!prepared) done = true; } else done = true; if (done) Wait_cThread(true, false); } if (conn && CONN_CONNECTED == conn->status && !done) { generateXAConn(true); } else SLOCK_RELEASE(); } else SLOCK_RELEASE(); mylog("%x->ReleaseConnection exit\n", this); return SQL_SUCCESS;}//// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para);HRESULT STDMETHODCALLTYPE IAsyncPG::PrepareRequest(BOOL fRetaining, DWORD grfRM, BOOL fWantMoniker, BOOL fSinglePhase){ HRESULT ret, res; RequestPara *reqp; mylog("%x PrepareRequest called grhRM=%d enl=%x\n", this, grfRM, enlist); SLOCK_ACQUIRE(); if (0 != CC_get_errornumber(conn)) res = ret = E_FAIL; else { ret = S_OK; if (fSinglePhase) { res = XACT_S_SINGLEPHASE; mylog("XACT is singlePhase\n"); } else res = S_OK; } SLOCK_RELEASE(); ELOCK_ACQUIRE();#ifdef _SLEEP_FOR_TEST_ Sleep(2000);#endif /* _SLEEP_FOR_TEST_ */ reqp = new RequestPara; reqp->type = PrepareExec; reqp->lpr = (LPVOID) this; reqp->res = res; AddRef(); HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); if (NULL == hThread) { delete(reqp); ret = E_FAIL; } else { AsyncThreads::insert(hThread, this, reqp->type); } ELOCK_RELEASE(); Release(); return ret;}//// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//HRESULT STDMETHODCALLTYPE IAsyncPG::CommitRequest(DWORD grfRM, XACTUOW * pNewUOW){ HRESULT res = S_OK, ret = S_OK; RequestPara *reqp; mylog("%x CommitRequest called grfRM=%d enl=%x\n", this, grfRM, enlist); SLOCK_ACQUIRE(); if (!prepared) ret = E_UNEXPECTED; else if (S_OK != prepare_result) ret = E_UNEXPECTED; SLOCK_RELEASE(); if (S_OK != ret) return ret; AddRef(); ELOCK_ACQUIRE();#ifdef _SLEEP_FOR_TEST_ Sleep(1000);#endif /* _SLEEP_FOR_TEST_ */ reqp = new RequestPara; reqp->type = CommitExec; reqp->lpr = (LPVOID) this; reqp->res = res; enlist->AddRef(); HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); if (NULL == hThread) { delete(reqp); enlist->Release(); ret = E_FAIL; } else { AsyncThreads::insert(hThread, this, reqp->type); } mylog("CommitRequest ret=%d\n", ret); requestAccepted = true; ELOCK_RELEASE(); Release(); return ret;}//// Acquire/release [ELOCK -> ] [MLOCK -> ] SLOCK.//HRESULT STDMETHODCALLTYPE IAsyncPG::AbortRequest(BOID * pboidReason, BOOL fRetaining, XACTUOW * pNewUOW){ HRESULT res = S_OK, ret = S_OK; RequestPara *reqp; mylog("%x AbortRequest called\n", this); AddRef(); ELOCK_ACQUIRE(); if (!prepared && conn) CC_abort(conn); reqp = new RequestPara; reqp->type = AbortExec; reqp->lpr = (LPVOID) this; reqp->res = res; enlist->AddRef(); HANDLE hThread = (HANDLE) _beginthreadex(NULL, 0, DtcRequestExec, reqp, 0, NULL); if (NULL == hThread) { delete(reqp); enlist->Release(); ret = E_FAIL; } else { AsyncThreads::insert(hThread, this, reqp->type); } mylog("AbortRequest ret=%d\n", ret); requestAccepted = true; ELOCK_RELEASE(); Release(); return ret;}HRESULT STDMETHODCALLTYPE IAsyncPG::TMDown(void){forcelog("%x TMDown called\n", this); return S_OK;}//// Acquire/releases MLOCK -> SLOCK.//std::map<HANDLE, AsyncWait> AsyncThreads::th_list;void AsyncThreads::insert(HANDLE th, IAsyncPG *obj, DWORD type){ if (!obj) return; MLOCK_ACQUIRE; th_list.insert(std::pair<HANDLE, AsyncWait>(th, AsyncWait(obj, type))); obj->SLOCK_ACQUIRE(); obj->eThread[type] = th; obj->SLOCK_RELEASE(); MLOCK_RELEASE;}//// Acquire/releases MLOCK -> SLOCK.//bool AsyncThreads::WaitThread(IAsyncPG *obj, DWORD type, DWORD millisecond){ HANDLE th = NULL; DWORD gtype; bool typematch; int wait_count; MLOCK_ACQUIRE; std::map<HANDLE, AsyncWait>::iterator p; for (p = th_list.begin(); p != th_list.end(); p++) { gtype = p->second.GetType(); typematch = (gtype == type); if (p->second.GetObj() == obj && typematch) { th = p->first; break; } } if (NULL == th) { MLOCK_RELEASE; forcelog("WaitThread thread(%x, %d) not found\n", obj, type); return false; } p->second.StartWaiting(); MLOCK_RELEASE; DWORD ret = WaitForSingleObject(th, millisecond); MLOCK_ACQUIRE; wait_count = p->second.StopWaiting(); if (WAIT_OBJECT_0 == ret) { IAsyncPG *async = p->second.GetObj(); if (type >= 0 && type <= IAsyncPG::AbortExec) async->Reset_eThread(type); if (wait_count <= 0) { th_list.erase(th); MLOCK_RELEASE; CloseHandle(th); if (type >= IAsyncPG::CommitExec) { async->Release(); } } else MLOCK_RELEASE; } else MLOCK_RELEASE; return true;}void AsyncThreads::CleanupThreads(DWORD millisecond){ int msize; DWORD nCount; MLOCK_ACQUIRE; if (msize = th_list.size(), msize <= 0) { MLOCK_RELEASE; return; } mylog("CleanupThreads size=%d\n", msize); HANDLE *hds = new HANDLE[msize]; std::map<HANDLE, AsyncWait>::iterator p; for (p = th_list.begin(), nCount = 0; p != th_list.end(); p++) { hds[nCount++] = p->first; p->second.StartWaiting(); } MLOCK_RELEASE; int i; while (nCount > 0) { DWORD ret = WaitForMultipleObjects(nCount, hds, 0, millisecond); if (ret >= nCount) break; HANDLE th = hds[ret]; MLOCK_ACQUIRE; p = th_list.find(th); if (p != th_list.end()) { int wait_count = p->second.StopWaiting(); DWORD type = p->second.GetType(); IAsyncPG * async = p->second.GetObj(); if (type >= IAsyncPG::PrepareExec && type <= IAsyncPG::AbortExec) async->Reset_eThread(type); if (wait_count <= 0) { th_list.erase(th); MLOCK_RELEASE; CloseHandle(th); if (type >= IAsyncPG::CommitExec) { async->Release(); } } else MLOCK_RELEASE; } else MLOCK_RELEASE; for (i = ret; i < (int) nCount - 1; i++) hds[i] = hds[i + 1]; nCount--; } for (i = 0; i < (int) nCount; i++) { p = th_list.find(hds[i]); if (p != th_list.end()) p->second.StopWaiting(); } delete [] hds;}EXTERN_C static unsigned WINAPI DtcRequestExec(LPVOID para){ RequestPara *reqp = (RequestPara *) para; DWORD type = reqp->type; IAsyncPG *async = (IAsyncPG *) reqp->lpr; HRESULT res = reqp->res, ret; mylog("DtcRequestExec type=%d", reqp->type); delete(reqp); ret = async->RequestExec(type, res); mylog(" Done ret=%d\n", ret); return ret;}CSTR regKey = "SOFTWARE\\Microsoft\\MSDTC\\XADLL";RETCODE static EnlistInDtc_1pipe(ConnectionClass *conn, ITransaction *pTra, ITransactionDispenser *pDtc){ CSTR func = "EnlistInDtc_1pipe"; static IDtcToXaHelperSinglePipe *pHelper = NULL; ITransactionResourceAsync *pRes = NULL; IAsyncPG *asdum; HRESULT res; bool retry, errset; DWORD dwRMCookie; XID xid; if (!pHelper) { res = pDtc->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void **) &pHelper); if (res != S_OK || !pHelper) { forcelog("DtcToXaHelperSingelPipe get error %d\n", res); pHelper = NULL; return SQL_ERROR; } } res = (NULL != (asdum = new IAsyncPG)) ? S_OK : E_FAIL; if (S_OK != res) { mylog("CoCreateInstance error %d\n", res); return SQL_ERROR; }mylog("dllname=%s dsn=%s\n", GetXaLibName(), conn->connInfo.dsn); res = 0; retry = false; errset = false; ConnInfo *ci = &(conn->connInfo); char dtcname[1024]; snprintf(dtcname, sizeof(dtcname), "DRIVER={%s};SERVER=%s;PORT=%s;DATABASE=%s;UID=%s;PWD=%s;" ABBR_SSLMODE "=%s", ci->drivername, ci->server, ci->port, ci->database, ci->username, ci->password, ci->sslmode); do { res = pHelper->XARMCreate(dtcname, (char *) GetXaLibName(), &dwRMCookie); if (S_OK == res) break; mylog("XARMCreate error code=%x\n", res); if (XACT_E_XA_TX_DISABLED == res) { CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMcreate error:Please enable XA transaction in MSDTC security configuration", func); errset = true; } else if (!retry) { LONG ret; HKEY sKey; DWORD rSize; ret = ::RegOpenKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, KEY_QUERY_VALUE | KEY_SET_VALUE, &sKey); if (ERROR_SUCCESS != ret) ret = ::RegCreateKeyEx(HKEY_LOCAL_MACHINE, regKey, 0, NULL, REG_OPTION_NON_VOLATILE, KEY_ALL_ACCESS, NULL, &sKey, NULL); if (ERROR_SUCCESS == ret) { switch (ret = ::RegQueryValueEx(sKey, "XADLL", NULL, NULL, NULL, &rSize)) { case ERROR_SUCCESS: if (rSize > 0) break; default: ret = ::RegSetValueEx(sKey, GetXaLibName(), 0, REG_SZ, (CONST BYTE *) GetXaLibPath(), strlen(GetXaLibPath()) + 1); if (ERROR_SUCCESS == ret) { retry = true; continue; // retry } CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "XARMCreate error:Please register HKLM\\SOFTWARE\\Microsoft\\MSDTC\\XADLL", func); break; } ::RegCloseKey(sKey); } } if (!errset) CC_set_error(conn, CONN_UNSUPPORTED_OPTION, "MSDTC XARMCreate error", func); return SQL_ERROR; } while (1); res = pHelper->ConvertTridToXID((DWORD *) pTra, dwRMCookie, &xid); if (res != S_OK) { mylog("ConvertTridToXid error %d\n", res); return SQL_ERROR; }{char pgxid[258];XidToText(xid, pgxid);mylog("ConvertTridToXID -> %s\n", pgxid);} asdum->SetXid(&xid); /* Create an IAsyncPG instance by myself */ /* DLLGetClassObject(GUID_IAsyncPG, IID_ITransactionResourceAsync, (void **) &asdum); */ asdum->SetHelper(pHelper, dwRMCookie); res = pHelper->EnlistWithRM(dwRMCookie, pTra, asdum, &asdum->enlist); if (res != S_OK) { mylog("EnlistWithRM error %d\n", res); pHelper->ReleaseRMCookie(dwRMCookie, TRUE); return SQL_ERROR; } mylog("asdum=%p start transaction\n", asdum); CC_set_autocommit(conn, FALSE); asdum->SetConnection(conn); conn->asdum = asdum; return SQL_SUCCESS;}EXTERN_C RETCODE EnlistInDtc(ConnectionClass *conn, void *pTra, int method){ static ITransactionDispenser *pDtc = NULL; if (!pTra) { IAsyncPG *asdum = (IAsyncPG *) conn->asdum; if (asdum) { /* asdum->Release(); */ } else SYNC_AUTOCOMMIT(conn); return SQL_SUCCESS; } if (CC_is_in_trans(conn)) { CC_abort(conn); } if (!pDtc) { HRESULT res; res = DtcGetTransactionManager(NULL, NULL, IID_ITransactionDispenser, 0, 0, NULL, (void **) &pDtc); if (res != S_OK || !pDtc) { forcelog("TransactionManager get error %d\n", res); pDtc = NULL; } } return EnlistInDtc_1pipe(conn, (ITransaction *) pTra, pDtc);}EXTERN_C RETCODE DtcOnDisconnect(ConnectionClass *conn){ mylog("DtcOnDisconnect\n"); LIFELOCK_ACQUIRE; IAsyncPG *asdum = (IAsyncPG *) conn->asdum; if (asdum) { asdum->AddRef(); LIFELOCK_RELEASE; asdum->ReleaseConnection(); asdum->Release(); } else LIFELOCK_RELEASE; return SQL_SUCCESS;}EXTERN_C RETCODE DtcOnRelease(void){ AsyncThreads::CleanupThreads(2000); return SQL_SUCCESS;}#endif /* _HANDLE_ENLIST_IN_DTC_ */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -