📄 flexbench.cpp
字号:
ndbout << endl << "Loop # " << loopCount << endl << endl; /**************************************************************** * Perform inserts. * ****************************************************************/ // Reset and start timer START_TIMER; // Give insert-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stInsert); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing insert" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify inserts. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying inserts...\t" ; tellThreads(pThreadsData, stVerify); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed while verifying inserts" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } /**************************************************************** * Perform read. * ****************************************************************/ // Reset and start timer START_TIMER; // Give read-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stRead); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing read" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Perform update. * ****************************************************************/ // Reset and start timer START_TIMER; // Give insert-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stUpdate); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing update" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify updates. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying updates...\t" ; tellThreads(pThreadsData, stVerify); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed while verifying updates" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } /**************************************************************** * Perform read. * ****************************************************************/ // Reset and start timer START_TIMER; // Give insert-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stRead); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing read" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Perform delete. * ****************************************************************/ // Reset and start timer START_TIMER; // Give insert-command to all threads resetThreads(pThreadsData); tellThreads(pThreadsData, stDelete); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in performing delete" << endl; returnValue = NDBT_FAILED; break; } // stop timer and print results. STOP_TIMER; PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables); /**************************************************************** * Verify deletes. * ****************************************************************/ if (VerifyFlag) { resetThreads(pThreadsData); ndbout << "Verifying tuple deletion..." ; tellThreads(pThreadsData, stVerifyDelete); waitForThreads(pThreadsData); if (checkThreadResults(pThreadsData) != 0){ ndbout << "Error: Threads failed in verifying deletes" << endl; returnValue = NDBT_FAILED; break; }else{ ndbout << "\t\tOK" << endl << endl ; } } ndbout << "--------------------------------------------------" << endl; tLoops++; if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops ) break; theErrorData.printErrorCounters(); } resetThreads(pThreadsData); tellThreads(pThreadsData, stStop); waitForThreads(pThreadsData); void * tmp; for(i = 0; i<tNoOfThreads; i++){ NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp); NdbThread_Destroy(&pThreadsData[i].threadLife); } } if (useLongKeys == true) { // Only free these areas if they have been allocated // Otherwise cores will happen for (i = 0; i < tNoOfLongPK; i++) free(longKeyAttrName[i]); free(longKeyAttrName); } // if delete [] pThreadsData; delete pNdb; theErrorData.printErrorCounters(); return NDBT_ProgramExit(returnValue);}////////////////////////////////////////unsigned long get_hash(unsigned long * hash_key, int len){ unsigned long hash_value = 147; unsigned h_key; int i; for (i = 0; i < len; i++) { h_key = hash_key[i]; hash_value = (hash_value << 5) + hash_value + (h_key & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255); hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255); } return hash_value;}// End of warming up phasestatic void* flexBenchThread(void* pArg){ ThreadData* pThreadData = (ThreadData*)pArg; unsigned int threadNo, threadBase; Ndb* pNdb = NULL ; NdbConnection *pTrans = NULL ; NdbOperation** pOps = NULL ; StartType tType ; StartType tSaveType ; NdbRecAttr* tTmp = NULL ; int* attrValue = NULL ; int* attrRefValue = NULL ; int check = 0 ; int loopCountOps, loopCountTables, loopCountAttributes; int tAttemptNo = 0; int tRetryAttempts = 20; int tResult = 0; int tSpecialTrans = 0; int nRefLocalOpOffset = 0 ; int nReadBuffSize = tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ; int nRefBuffSize = tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ; unsigned*** longKeyAttrValue; threadNo = pThreadData->threadNo ; attrValue = (int*)malloc(nReadBuffSize) ; attrRefValue = (int*)malloc(nRefBuffSize) ; pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ; pNdb = new Ndb(g_cluster_connection, "TEST_DB" ); if(!attrValue || !attrRefValue || !pOps || !pNdb){ // Check allocations to make sure we got all the memory we asked for ndbout << "One or more memory allocations failed when starting thread #"; ndbout << threadNo << endl ; ndbout << "Thread #" << threadNo << " will now exit" << endl ; tResult = 13 ; free(attrValue) ; free(attrRefValue) ; free(pOps) ; delete pNdb ; return 0; // thread exits } pNdb->init(); pNdb->waitUntilReady(); // To make sure that two different threads doesn't operate on the same record // Calculate an "unique" number to use as primary key threadBase = (threadNo * 2000000) + (tNodeId * 260000000); if(useLongKeys){ // Allocate and populate the longkey array. longKeyAttrValue = (unsigned ***) malloc(sizeof(unsigned**) * tNoOfOperations ); Uint32 n; for (n = 0; n < tNoOfOperations; n++) longKeyAttrValue[n] = (unsigned **) malloc(sizeof(unsigned*) * tNoOfLongPK ); for (n = 0; n < tNoOfOperations; n++){ for (Uint32 i = 0; i < tNoOfLongPK ; i++) { longKeyAttrValue[n][i] = (unsigned *) malloc(sizeof(unsigned) * tSizeOfLongPK); memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK); for(Uint32 j = 0; j < tSizeOfLongPK; j++) { // Repeat the unique value to fill up the long key. longKeyAttrValue[n][i][j] = threadBase + n; } } } } int nRefOpOffset = 0 ; //Assign reference attribute values to memory for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){ // Calculate offset value before going into the next loop nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ; for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){ *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] = (int)(threadBase + ops + a) ; } }#ifdef CEBIT_STAT // ops not yet reported int statOps = 0;#endif for (;;) { pThreadData->threadResult = tResult; // Report error to main thread, // normally tResult is set to 0 pThreadData->threadReady = 1; while (pThreadData->threadStart == stIdle){ NdbSleep_MilliSleep(100); }//while // Check if signal to exit is received if (pThreadData->threadStart == stStop){ pThreadData->threadReady = 1; // ndbout_c("Thread%d is stopping", threadNo); // In order to stop this thread, the main thread has signaled // stStop, break out of the for loop so that destructors // and the proper exit functions are called break; }//if tType = pThreadData->threadStart; tSaveType = tType; pThreadData->threadStart = stIdle; // Start transaction, type of transaction // is received in the array ThreadStart loopCountOps = tNoOfOperations; loopCountTables = tNoOfTables; loopCountAttributes = tNoOfAttributes; for (int count = 1; count < loopCountOps && tResult == 0;){ pTrans = pNdb->startTransaction(); if (pTrans == NULL) { // This is a fatal error, abort program ndbout << "Could not start transaction in thread" << threadNo; ndbout << endl; ndbout << pNdb->getNdbError() << endl; tResult = 1; // Indicate fatal error break; // Break out of for loop } // Calculate the current operation offset in the reference array nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ; for (int countTables = 0; countTables < loopCountTables && tResult == 0; countTables++) { pOps[countTables] = pTrans->getNdbOperation(tableName[countTables]); if (pOps[countTables] == NULL) { // This is a fatal error, abort program ndbout << "getNdbOperation: " << pTrans->getNdbError(); tResult = 2; // Indicate fatal error break; }//if switch (tType) { case stInsert: // Insert case if (theWriteFlag == 1 && theDirtyFlag == 1) pOps[countTables]->dirtyWrite(); else if (theWriteFlag == 1) pOps[countTables]->writeTuple(); else pOps[countTables]->insertTuple(); break; case stRead: // Read Case if (theSimpleFlag == 1) pOps[countTables]->simpleRead(); else if (theDirtyFlag == 1) pOps[countTables]->dirtyRead(); else pOps[countTables]->readTuple(); break; case stUpdate: // Update Case if (theWriteFlag == 1 && theDirtyFlag == 1) pOps[countTables]->dirtyWrite(); else if (theWriteFlag == 1) pOps[countTables]->writeTuple(); else if (theDirtyFlag == 1) pOps[countTables]->dirtyUpdate(); else pOps[countTables]->updateTuple(); break; case stDelete: // Delete Case pOps[countTables]->deleteTuple(); break; case stVerify: pOps[countTables]->readTuple(); break; case stVerifyDelete: pOps[countTables]->readTuple(); break; default: assert(false); }//switch if(useLongKeys){ // Loop the equal call so the complete key is send to the kernel. for(Uint32 i = 0; i < tNoOfLongPK; i++) pOps[countTables]->equal(longKeyAttrName[i], (char *)longKeyAttrValue[count - 1][i], tSizeOfLongPK*4); } else pOps[countTables]->equal((char*)attrName[0],
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -