📄 msa.cpp
字号:
int InsertTransaction(Ndb* pNdb, long iContextID, long iVersion, long iLockFlag, long iLockTime, long iLockTimeUSec, const char* pchContextData, NdbError& err){ int iRes = -1; NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextID, 4); if(pNdbConnection) { NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName); if(pNdbOperation) { if(!(g_bWriteTuple ? pNdbOperation->writeTuple() : pNdbOperation->insertTuple()) && !pNdbOperation->equal(c_szContextId, (Int32)iContextID) && !pNdbOperation->setValue(c_szVersion, (Int32)iVersion) && !pNdbOperation->setValue(c_szLockFlag, (Int32)iLockFlag) && !pNdbOperation->setValue(c_szLockTime, (Int32)iLockTime) && !pNdbOperation->setValue(c_szLockTimeUSec, (Int32)iLockTimeUSec) && !pNdbOperation->setValue(c_szContextData, pchContextData, g_nStatusDataSize)) { if(!pNdbConnection->execute(Commit)) iRes = 0; else err = pNdbConnection->getNdbError(); } else err = pNdbOperation->getNdbError(); } else err = pNdbConnection->getNdbError(); pNdb->closeTransaction(pNdbConnection); } else err = pNdb->getNdbError(); return iRes;}int RetryInsertTransaction(Ndb* pNdb, long iContextId, long iVersion, long iLockFlag, long iLockTime, long iLockTimeUSec, const char* pchContextData, NdbError& err, int& nRetry){ int iRes = -1; nRetry = 0; bool bRetry = true; bool bUnknown = false; while(bRetry && nRetry<g_nMaxRetry) { if(!InsertTransaction(pNdb, iContextId, iVersion, iLockFlag, iLockTime, iLockTimeUSec, pchContextData, err)) { iRes = 0; bRetry = false; } else { switch(err.status) { case NdbError::UnknownResult: bUnknown = true; ++nRetry; break; case NdbError::TemporaryError: bUnknown = false; SleepOneCall(); ++nRetry; break; case NdbError::PermanentError: if(err.code==630 && bUnknown) iRes = 0; bRetry = false; break; default: bRetry = false; break; } } } return iRes;}int UpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err){ int iRes = -1; NdbConnection* pNdbConnection = pNdb->startTransaction(0, (const char*)&iContextId, 4); if(pNdbConnection) { NdbOperation* pNdbOperation = pNdbConnection->getNdbOperation(g_szTableName); if(pNdbOperation) { if(!pNdbOperation->updateTuple() && !pNdbOperation->equal(c_szContextId, (Int32)iContextId) && !pNdbOperation->setValue(c_szContextData, STATUS_DATA, g_nStatusDataSize)) { if(!pNdbConnection->execute(Commit)) iRes = 0; else err = pNdbConnection->getNdbError(); } else err = pNdbOperation->getNdbError(); } else err = pNdbConnection->getNdbError(); pNdb->closeTransaction(pNdbConnection); } else err = pNdb->getNdbError(); return iRes;}int RetryUpdateTransaction(Ndb* pNdb, long iContextId, NdbError& err, int& nRetry){ int iRes = -1; nRetry = 0; bool bRetry = true; while(bRetry && nRetry<g_nMaxRetry) { if(!UpdateTransaction(pNdb, iContextId, err)) { iRes = 0; bRetry = false; } else { switch(err.status) { case NdbError::TemporaryError: case NdbError::UnknownResult: SleepOneCall(); ++nRetry; break; case NdbError::PermanentError: default: bRetry = false; break; } } } return iRes;}int InsertInitialRecords(Ndb* pNdb, long nInsert, long nSeed){ int iRes = -1; char szMsg[100]; for(long i=0; i<nInsert; ++i) { int iContextID = i+nSeed; int nRetry = 0; NdbError err; memset(&err, 0, sizeof(err)); NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond(); iRes = RetryInsertTransaction(pNdb, iContextID, nSeed, iContextID, (long)(tStartTrans/1000), (long)((tStartTrans%1000)*1000), STATUS_DATA, err, nRetry); NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond(); long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans); if(nRetry>0) { sprintf(szMsg, "insert retried %d times, time %ld msec.", nRetry, lMillisecForThisTrans); ReportNdbError(szMsg, err); } if(iRes) { ReportNdbError("Insert initial record failed", err); return iRes; } InterlockedIncrement(&g_nNumberOfInitialInsert); } return iRes;}int VerifyInitialRecords(Ndb* pNdb, long nVerify, long nSeed){ int iRes = -1; char* pchContextData = new char[g_nStatusDataSize]; char szMsg[100]; long iPrevLockTime = -1; long iPrevLockTimeUSec = -1; for(long i=0; i<nVerify; ++i) { int iContextID = i+nSeed; long iVersion = 0; long iLockFlag = 0; long iLockTime = 0; long iLockTimeUSec = 0; int nRetry = 0; NdbError err; memset(&err, 0, sizeof(err)); NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond(); iRes = RetryQueryTransaction(pNdb, iContextID, &iVersion, &iLockFlag, &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry); NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond(); long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans); if(nRetry>0) { sprintf(szMsg, "verify retried %d times, time %ld msec.", nRetry, lMillisecForThisTrans); ReportNdbError(szMsg, err); } if(iRes) { ReportNdbError("Read initial record failed", err); delete[] pchContextData; return iRes; } if(memcmp(pchContextData, STATUS_DATA, g_nStatusDataSize)) { sprintf(szMsg, "wrong context data in tuple %d", iContextID); ReportNdbError(szMsg, err); delete[] pchContextData; return -1; } if(iVersion!=nSeed || iLockFlag!=iContextID || iLockTime<iPrevLockTime || (iLockTime==iPrevLockTime && iLockTimeUSec<iPrevLockTimeUSec)) { sprintf(szMsg, "wrong call data in tuple %d", iContextID); ReportNdbError(szMsg, err); delete[] pchContextData; return -1; } iPrevLockTime = iLockTime; iPrevLockTimeUSec = iLockTimeUSec; InterlockedIncrement(&g_nNumberOfInitialVerify); } delete[] pchContextData; return iRes;}void* RuntimeCallContext(void* lpParam){ long nNumCallsProcessed = 0; int nStartingRecordID = *(int*)lpParam; Ndb* pNdb; char* pchContextData = new char[g_nStatusDataSize]; char szMsg[100]; int iRes; const char* szOp; long iVersion; long iLockFlag; long iLockTime; long iLockTimeUSec; pNdb = new Ndb("TEST_DB"); if(!pNdb) { NdbMutex_Lock(g_pNdbMutexPrintf); printf("new Ndb failed\n"); NdbMutex_Unlock(g_pNdbMutexPrintf); delete[] pchContextData; return 0; } if(pNdb->init(1) || pNdb->waitUntilReady()) { ReportNdbError("init of Ndb failed", pNdb->getNdbError()); delete pNdb; delete[] pchContextData; return 0; } if(g_bInsertInitial) { if(InsertInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread)) { delete pNdb; delete[] pchContextData; return 0; } } if(g_bVerifyInitial) { NdbError err; memset(&err, 0, sizeof(err)); if(VerifyInitialRecords(pNdb, g_nMaxContextIdPerThread, -nStartingRecordID-g_nMaxContextIdPerThread)) { delete pNdb; delete[] pchContextData; return 0; } } if(g_bInsertInitial || g_bVerifyInitial) { delete[] pchContextData; return 0; } long nContextID = nStartingRecordID;#ifdef NDB_WIN32 while(WaitForSingleObject(hShutdownEvent,0) != WAIT_OBJECT_0)#else while(!bShutdownEvent)#endif { ++nContextID; nContextID %= g_nMaxContextIdPerThread; nContextID += nStartingRecordID; bool bTimeLatency = (nContextID==100); NDB_TICKS tStartCall = NdbTick_CurrentMillisecond(); for (int i=0; i < 20; i++) { int nRetry = 0; NdbError err; memset(&err, 0, sizeof(err)); NDB_TICKS tStartTrans = NdbTick_CurrentMillisecond(); switch(i) { case 3: case 6: case 9: case 11: case 12: case 15: case 18: // Query Record szOp = "Read"; iRes = RetryQueryTransaction(pNdb, nContextID, &iVersion, &iLockFlag, &iLockTime, &iLockTimeUSec, pchContextData, err, nRetry); break; case 19: // Delete Record szOp = "Delete"; iRes = RetryDeleteTransaction(pNdb, nContextID, err, nRetry); break; case 0: // Insert Record szOp = "Insert"; iRes = RetryInsertTransaction(pNdb, nContextID, 1, 1, 1, 1, STATUS_DATA, err, nRetry); break; default: // Update Record szOp = "Update"; iRes = RetryUpdateTransaction(pNdb, nContextID, err, nRetry); break; } NDB_TICKS tEndTrans = NdbTick_CurrentMillisecond(); long lMillisecForThisTrans = (long)(tEndTrans-tStartTrans); if(g_bReport) { assert(lMillisecForThisTrans>=0 && lMillisecForThisTrans<c_nMaxMillisecForAllTrans); InterlockedIncrement(g_plCountMillisecForTrans+lMillisecForThisTrans); } if(nRetry>0) { sprintf(szMsg, "%s retried %d times, time %ld msec.", szOp, nRetry, lMillisecForThisTrans); ReportNdbError(szMsg, err); } else if(bTimeLatency) { NdbMutex_Lock(g_pNdbMutexPrintf); printf("%s = %ld msec.\n", szOp, lMillisecForThisTrans); NdbMutex_Unlock(g_pNdbMutexPrintf); } if(iRes) { sprintf(szMsg, "%s failed after %ld calls, terminating thread", szOp, nNumCallsProcessed); ReportNdbError(szMsg, err); delete pNdb; delete[] pchContextData; return 0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -