📄 flexasynch.cpp
字号:
* Perform update. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stUpdate); STOP_TIMER; PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ; if (0 < failed) { i = retry_opt ; int cu = 1 ; while (0 < failed && 0 < i){ ndbout << failed << " of the transactions returned errors!"<<endl ; ndbout << endl; ndbout <<"Attempting to redo the failed transactions now..." << endl; ndbout << endl <<"Redo attempt " << cu <<" out of "; ndbout << retry_opt << endl << endl; failed = 0 ; START_TIMER; execute(stUpdate); STOP_TIMER; PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans); i-- ; cu++ ; }//while if(0 == failed ){ ndbout << endl <<"Redo attempt succeeded" << endl << endl; } else { ndbout << endl; ndbout <<"Redo attempt failed, moving on now..." << endl << endl; }//if }//if /**************************************************************** * Perform read. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { i = retry_opt ; int cr2 = 1 ; while (0 < failed && 0 < i){ ndbout << failed << " of the transactions returned errors!"<<endl ; ndbout << endl; ndbout <<"Attempting to redo the failed transactions now..." << endl; ndbout << endl <<"Redo attempt " << cr2 <<" out of "; ndbout << retry_opt << endl << endl; failed = 0 ; START_TIMER; execute(stRead); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); i-- ; cr2++ ; }//while if(0 == failed ){ ndbout << endl <<"Redo attempt succeeded" << endl << endl; }else{ ndbout << endl; ndbout << "Redo attempt failed, moving on now..." << endl << endl; }//if }//if /**************************************************************** * Perform delete. * ****************************************************************/ failed = 0 ; START_TIMER; execute(stDelete); STOP_TIMER; PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans); if (0 < failed) { i = retry_opt ; int cd = 1 ; while (0 < failed && 0 < i){ ndbout << failed << " of the transactions returned errors!"<< endl ; ndbout << endl; ndbout <<"Attempting to redo the failed transactions now:" << endl ; ndbout << endl <<"Redo attempt " << cd <<" out of "; ndbout << retry_opt << endl << endl; failed = 0 ; START_TIMER; execute(stDelete); STOP_TIMER; PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans); i-- ; cd++ ; }//while if(0 == failed ){ ndbout << endl <<"Redo attempt succeeded" << endl << endl ; }else{ ndbout << endl; ndbout << "Redo attempt failed, moving on now..." << endl << endl; }//if }//if tLoops++; ndbout << "--------------------------------------------------" << endl; if(tNoOfLoops != 0){ if(tNoOfLoops <= tLoops) break ; } }//for execute(stStop); void * tmp; for(i = 0; i<tNoOfThreads; i++){ NdbThread_WaitFor(threadLife[i], &tmp); NdbThread_Destroy(&threadLife[i]); } } delete [] pThreadData; delete pNdb; //printing errorCounters flexAsynchErrorData->printErrorCounters(ndbout); return NDBT_ProgramExit(returnValue);}//main()static void execute(StartType aType){ resetThreads(); tellThreads(aType); waitForThreads();}//execute()static void*threadLoop(void* ThreadData){ Ndb* localNdb; StartType tType; ThreadNdb* tabThread = (ThreadNdb*)ThreadData; int threadNo = tabThread->ThreadNo; localNdb = new Ndb(g_cluster_connection, "TEST_DB"); localNdb->init(1024); localNdb->waitUntilReady(10000); unsigned int threadBase = (threadNo << 16) + tNodeId ; for (;;){ while (ThreadStart[threadNo] == stIdle) { NdbSleep_MilliSleep(10); }//while // Check if signal to exit is received if (ThreadStart[threadNo] == stStop) { break; }//if tType = ThreadStart[threadNo]; ThreadStart[threadNo] = stIdle; if(!executeThread(tType, localNdb, threadBase)){ break; } ThreadReady[threadNo] = 1; }//for delete localNdb; ThreadReady[threadNo] = 1; return NULL;}//threadLoop()static boolexecuteThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) { int i, j, k; NdbConnection* tConArray[1024]; unsigned int tBase; unsigned int tBase2; for (i = 0; i < tNoOfTransactions; i++) { if (tLocal == false) { tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans; } else { tBase = i * tNoOfParallelTrans * MAX_SEEK; }//if START_REAL_TIME; for (j = 0; j < tNoOfParallelTrans; j++) { if (tLocal == false) { tBase2 = tBase + (j * tNoOfOpsPerTrans); } else { tBase2 = tBase + (j * MAX_SEEK); tBase2 = getKey(threadBase, tBase2); }//if if (startTransGuess == true) { Uint64 Tkey64; Uint32* Tkey32 = (Uint32*)&Tkey64; Tkey32[0] = threadBase; Tkey32[1] = tBase2; tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority (const char*)&Tkey64, //Main PKey (Uint32)4); //Key Length } else { tConArray[j] = aNdbObject->startTransaction(); }//if if (tConArray[j] == NULL && !error_handler(aNdbObject->getNdbError()) ){ ndbout << endl << "Unable to recover! Quiting now" << endl ; return false; }//if for (k = 0; k < tNoOfOpsPerTrans; k++) { //------------------------------------------------------- // Define the operation, but do not execute it yet. //------------------------------------------------------- defineOperation(tConArray[j], aType, threadBase, (tBase2 + k)); }//for tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL); }//for STOP_REAL_TIME; //------------------------------------------------------- // Now we have defined a set of operations, it is now time // to execute all of them. //------------------------------------------------------- int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0); while (Tcomp < tNoOfParallelTrans) { int TlocalComp = aNdbObject->pollNdb(3000, 0); Tcomp += TlocalComp; }//while for (j = 0 ; j < tNoOfParallelTrans ; j++) { aNdbObject->closeTransaction(tConArray[j]); }//for }//for return true;}//executeThread()static Uint32getKey(Uint32 aBase, Uint32 anIndex) { Uint32 Tfound = anIndex; Uint64 Tkey64; Uint32* Tkey32 = (Uint32*)&Tkey64; Tkey32[0] = aBase; Uint32 hash; for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) { Tkey32[1] = (Uint32)i; hash = md5_hash((Uint64*)&Tkey64, (Uint32)2); hash = (hash >> 6) & (MAX_PARTS - 1); if (hash == tLocalPart) { Tfound = i; break; }//if }//for return Tfound;}//getKey()static voidexecuteCallback(int result, NdbConnection* NdbObject, void* aObject){ if (result == -1) { // Add complete error handling here int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError()); if (retCode == 1) { if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){ ndbout_c("execute: %s", NdbObject->getNdbError().message); ndbout_c("Error code = %d", NdbObject->getNdbError().code);} } else if (retCode == 2) { ndbout << "4115 should not happen in flexAsynch" << endl; } else if (retCode == 3) { /* What can we do here? */ ndbout_c("execute: %s", NdbObject->getNdbError().message); }//if(retCode == 3) // ndbout << "Error occured in poll:" << endl; // ndbout << NdbObject->getNdbError() << endl; failed++ ; return; }//if return;}//executeCallback()static voiddefineOperation(NdbConnection* localNdbConnection, StartType aType, Uint32 threadBase, Uint32 aIndex){ NdbOperation* localNdbOperation; unsigned int loopCountAttributes = tNoOfAttributes; unsigned int countAttributes; Uint32 attrValue[MAXATTRSIZE]; //------------------------------------------------------- // Set-up the attribute values for this operation. //------------------------------------------------------- attrValue[0] = threadBase; attrValue[1] = aIndex; for (int k = 2; k < loopCountAttributes; k++) { attrValue[k] = aIndex; }//for localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]); if (localNdbOperation == NULL) { error_handler(localNdbConnection->getNdbError()); }//if switch (aType) { case stInsert: { // Insert case if (theWriteFlag == 1 && theDirtyFlag == 1) { localNdbOperation->dirtyWrite(); } else if (theWriteFlag == 1) { localNdbOperation->writeTuple(); } else { localNdbOperation->insertTuple(); }//if break; }//case case stRead: { // Read Case if (theSimpleFlag == 1) { localNdbOperation->simpleRead(); } else if (theDirtyFlag == 1) { localNdbOperation->dirtyRead(); } else { localNdbOperation->readTuple(); }//if break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -